val objects: DataStream[ObjectStoreMessage] =
streamOfObjectStoreMessages()
objects.addSink(new UploadObjectStoreDataSink(hrn, objectStoreLayer))
class UploadObjectStoreDataSink(hrn: HRN, layerId: String)
extends RichSinkFunction[ObjectStoreMessage]
with Serializable {
@transient
private lazy val dataClient: FlinkDataClient =
new FlinkDataClient()
@transient
private lazy val writeEngine: FlinkWriteEngine =
dataClient.writeEngine(hrn)
override def close(): Unit =
dataClient.terminate()
override def invoke(value: ObjectStoreMessage, context: SinkFunction.Context): Unit = {
val key = value.key
val data = value.data
writeEngine.uploadObject2(layerId, key, data)
}
}
case class ObjectStoreMessage(key: String, data: NewPartition.Blob)
class UploadObjectStoreDataSink extends RichSinkFunction<JObjectStoreMessage>
implements Serializable {
private HRN hrn;
private String layerId;
private transient FlinkDataClient dataClient;
private transient FlinkWriteEngine writeEngine;
public UploadObjectStoreDataSink(HRN hrn, String layerId) {
this.hrn = hrn;
this.layerId = layerId;
}
@Override
public void open(Configuration parameters) throws Exception {
dataClient = new FlinkDataClient();
writeEngine = dataClient.writeEngine(hrn);
}
@Override
public void close() {
dataClient.terminate();
}
@Override
public void invoke(JObjectStoreMessage message, Context context) {
writeEngine.uploadObject2(
layerId, message.getKey(), message.getData(), Optional.empty(), Optional.empty());
}
}
DataStream<JObjectStoreMessage> objects = getObjectStreamMessages();
objects.addSink(new UploadObjectStoreDataSink(hrn, objectStoreLayer));
public class JObjectStoreMessage {
private String key;
private NewPartition.Blob data;
public JObjectStoreMessage(String key, NewPartition.Blob data) {
this.key = key;
this.data = data;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public NewPartition.Blob getData() {
return data;
}
public void setData(NewPartition.Blob data) {
this.data = data;
}
}