Flink のエントリポイント

FlinkDataClient API の主なエントリポイントです FlinkDataClient 。は、一度作成して再利用し、終了する必要がある重量の大きいオブジェクトです。 このオブジェクトを終了しないと、ジョブが完了せず、 ClassNotFound などの例外が発生する可能性があります。

ドライバーでは、以下のスニペットに示されているように、実行後にクライアントを終了する必要があります。

Scala
Java
val env: StreamExecutionEnvironment =
  StreamExecutionEnvironment.getExecutionEnvironment

val client = new FlinkDataClient()

// create sources
// apply functions
// add sinks
// ...

// block until job finish
env.execute()

// terminate the client on finish
client.terminate()
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FlinkDataClient client = new FlinkDataClient();

// create sources
// apply functions
// add sinks
// ...

// block until job finish
env.execute();

// terminate the client on finish
client.terminate();

Flink 関数を終了するには、close()コールバックのRichクラスによって提供される Flink (RichSinkFunction, RichMaFunction, RichSourceFunctionまたはその他 ) を使用します。

Scala
Java
/** Flink function with access to DataClient. */
abstract class CustomFunction extends RichFunction with Serializable {
  // initialize DataClient
  @transient
  private lazy val flinkDataClient: FlinkDataClient =
    new FlinkDataClient()

  // terminate DataClient
  override def close(): Unit =
    flinkDataClient.terminate()
}
/** Flink function with access to DataClient. */
abstract class CustomFunction implements RichFunction, Serializable {
  private transient FlinkDataClient flinkDataClient;

  @Override
  public void open(Configuration parameters) throws Exception {
    flinkDataClient = new FlinkDataClient();
  }

  @Override
  public void close() throws Exception {
    flinkDataClient.terminate();
  }
}

」に一致する結果は 件です

    」に一致する結果はありません