class UploadDataFunction(hrn: HRN)
extends RichMapFunction[PendingPartition, CommitPartition]
with Serializable {
@transient
private lazy val dataClient: FlinkDataClient =
new FlinkDataClient()
@transient
private lazy val writeEngine: FlinkWriteEngine =
dataClient.writeEngine(hrn)
override def close(): Unit =
dataClient.terminate()
override def map(pendingPartition: PendingPartition): CommitPartition =
writeEngine.put(pendingPartition)
}
class PublishBatchWindowFunction(hrn: HRN, layerId: String)
extends RichAllWindowFunction[CommitPartition, String, TimeWindow]
with Serializable {
@transient
private lazy val flinkDataClient: FlinkDataClient =
new FlinkDataClient()
@transient
private lazy val publishApi: FlinkPublishApi =
flinkDataClient.publishApi(hrn)
override def close(): Unit =
flinkDataClient.terminate()
override def apply(window: TimeWindow,
partitions: Iterable[CommitPartition],
out: Collector[String]): Unit = {
val baseVersion = publishApi.getBaseVersion()
publishApi.publishBatch2(baseVersion,
Some(Seq(layerId)),
dependencies = Nil,
partitions.iterator)
out.collect(s"commit on $baseVersion success")
}
}
val pendingPartitions: DataStream[PendingPartition] =
getPendingPartitionsStream()
val newPartitions: DataStream[CommitPartition] =
pendingPartitions.map(new UploadDataFunction(hrn))
val newPartitionsWindow: AllWindowedStream[CommitPartition, TimeWindow] =
newPartitions.windowAll(TumblingEventTimeWindows.of(Time.minutes(1)))
val results: DataStream[String] =
newPartitionsWindow.apply(new PublishBatchWindowFunction(hrn, batchLayer))
class UploadDataFunction extends RichMapFunction<PendingPartition, CommitPartition>
implements Serializable {
private HRN hrn;
private transient FlinkDataClient dataClient;
private transient FlinkWriteEngine writeEngine;
public UploadDataFunction(HRN hrn) {
this.hrn = hrn;
}
@Override
public void open(Configuration parameters) throws Exception {
dataClient = new FlinkDataClient();
writeEngine = dataClient.writeEngine(hrn);
}
@Override
public void close() throws Exception {
dataClient.terminate();
}
@Override
public CommitPartition map(PendingPartition pendingPartition) throws Exception {
return writeEngine.put(pendingPartition);
}
}
class PublishBatchWindowFunction
extends RichAllWindowFunction<CommitPartition, String, TimeWindow> implements Serializable {
private HRN hrn;
private transient FlinkDataClient dataClient;
private transient FlinkPublishApi publishApi;
public PublishBatchWindowFunction(HRN hrn) {
this.hrn = hrn;
}
@Override
public void open(Configuration parameters) throws Exception {
dataClient = new FlinkDataClient();
publishApi = dataClient.publishApi(hrn);
}
@Override
public void close() throws Exception {
dataClient.terminate();
}
@Override
public void apply(
TimeWindow window, Iterable<CommitPartition> commitPartitions, Collector<String> out)
throws Exception {
OptionalLong baseVersion = publishApi.getBaseVersion();
publishApi.publishBatch2(
baseVersion, Optional.empty(), Collections.emptyList(), commitPartitions.iterator());
out.collect("commit on " + baseVersion + " success");
}
}
DataStream<PendingPartition> pendingPartitions = getPendingPartitionsStream();
DataStream<CommitPartition> newPartitions = pendingPartitions.map(new UploadDataFunction(hrn));
AllWindowedStream<CommitPartition, TimeWindow> newPartitionsWindow =
newPartitions.windowAll(TumblingEventTimeWindows.of(Time.minutes(1)));
DataStream<String> results = newPartitionsWindow.apply(new PublishBatchWindowFunction(hrn));