バージョン管理されたデータ

バージョン付レイヤーに書き込みます

バッチパブリケーションをパブリッシュする場合は、次の手順を実行します。

  • を使用してデータをアップロードします WriteEngine.put()
  • メタデータをアップロードします

バッチパブリケーションにはバージョンの整合性が必要なため、メタデータ(時間帯など)を集約して、 1 つのノードにパブリッシュする必要があります。

Scala
Java
/** Map function that upload data and return metadata */
class UploadDataFunction(hrn: HRN)
    extends RichMapFunction[PendingPartition, CommitPartition]
    with Serializable {
  // initialize DataClient
  @transient
  private lazy val dataClient: FlinkDataClient =
    new FlinkDataClient()

  @transient
  private lazy val writeEngine: FlinkWriteEngine =
    dataClient.writeEngine(hrn)

  // terminate DataClient
  override def close(): Unit =
    dataClient.terminate()

  // read data and publish a tuple with partition and data
  override def map(pendingPartition: PendingPartition): CommitPartition =
    writeEngine.put(pendingPartition)
}

/** Window function that publish a new batch version for all CommitPartition in the window*/
class PublishBatchWindowFunction(hrn: HRN, layerId: String)
    extends RichAllWindowFunction[CommitPartition, String, TimeWindow]
    with Serializable {
  // initialize DataClient
  @transient
  private lazy val flinkDataClient: FlinkDataClient =
    new FlinkDataClient()

  @transient
  private lazy val publishApi: FlinkPublishApi =
    flinkDataClient.publishApi(hrn)

  // terminate DataClient
  override def close(): Unit =
    flinkDataClient.terminate()

  override def apply(window: TimeWindow,
                     partitions: Iterable[CommitPartition],
                     out: Collector[String]): Unit = {
    val baseVersion = publishApi.getBaseVersion()
    publishApi.publishBatch2(baseVersion,
                             Some(Seq(layerId)),
                             dependencies = Nil,
                             partitions.iterator)
    out.collect(s"commit on $baseVersion success")
  }
}

// given a stream of new partitions
val pendingPartitions: DataStream[PendingPartition] =
  getPendingPartitionsStream()

// for each input: upload data and collect metadata
val newPartitions: DataStream[CommitPartition] =
  pendingPartitions.map(new UploadDataFunction(hrn))

// group metadata by 1 minute window
val newPartitionsWindow: AllWindowedStream[CommitPartition, TimeWindow] =
  newPartitions.windowAll(TumblingEventTimeWindows.of(Time.minutes(1)))

// publish new version on window trigger, propagating the base version of the commit
val results: DataStream[String] =
  newPartitionsWindow.apply(new PublishBatchWindowFunction(hrn, batchLayer))
/** Map function that upload data and return metadata */
class UploadDataFunction extends RichMapFunction<PendingPartition, CommitPartition>
    implements Serializable {

  private HRN hrn;

  private transient FlinkDataClient dataClient;

  private transient FlinkWriteEngine writeEngine;

  public UploadDataFunction(HRN hrn) {
    this.hrn = hrn;
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    dataClient = new FlinkDataClient();
    writeEngine = dataClient.writeEngine(hrn);
  }

  @Override
  public void close() throws Exception {
    dataClient.terminate();
  }

  @Override
  public CommitPartition map(PendingPartition pendingPartition) throws Exception {
    return writeEngine.put(pendingPartition);
  }
}

/** Window function that publish a new batch version for all CommitPartition in the window */
class PublishBatchWindowFunction
    extends RichAllWindowFunction<CommitPartition, String, TimeWindow> implements Serializable {

  private HRN hrn;

  private transient FlinkDataClient dataClient;

  private transient FlinkPublishApi publishApi;

  public PublishBatchWindowFunction(HRN hrn) {
    this.hrn = hrn;
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    dataClient = new FlinkDataClient();
    publishApi = dataClient.publishApi(hrn);
  }

  @Override
  public void close() throws Exception {
    dataClient.terminate();
  }

  @Override
  public void apply(
      TimeWindow window, Iterable<CommitPartition> commitPartitions, Collector<String> out)
      throws Exception {
    OptionalLong baseVersion = publishApi.getBaseVersion();
    publishApi.publishBatch2(
        baseVersion, Optional.empty(), Collections.emptyList(), commitPartitions.iterator());
    out.collect("commit on " + baseVersion + " success");
  }
}

// given a stream of new partitions
DataStream<PendingPartition> pendingPartitions = getPendingPartitionsStream();

// for each input: upload data and collect metadata
DataStream<CommitPartition> newPartitions = pendingPartitions.map(new UploadDataFunction(hrn));

// group metadata by 1 minute window
AllWindowedStream<CommitPartition, TimeWindow> newPartitionsWindow =
    newPartitions.windowAll(TumblingEventTimeWindows.of(Time.minutes(1)));

// publish new version on window trigger, propagating the base version of the commit
DataStream<String> results = newPartitionsWindow.apply(new PublishBatchWindowFunction(hrn));

」に一致する結果は 件です

    」に一致する結果はありません