val tEnv = StreamTableEnvironment.create(env)
val inputLayerSchema = """
{
"type" : "record",
"name" : "Event",
"namespace" : "my.example",
"fields" : [
{"name" : "event_timestamp", "type" : "long"},
{"name" : "latitude", "type" : "double"},
{"name" : "longitude", "type" : "double"}
]
}
"""
val outputLayerSchema = """
{
"type" : "record",
"name" : "Event",
"namespace" : "my.example",
"fields" : [
{"name" : "city", "type" : "string"},
{"name" : "event_timestamp", "type" : "long"},
{"name" : "latitude", "type" : "double"},
{"name" : "longitude", "type" : "double"}
]
}
"""
val sourceProperties =
Map(
"olp.catalog.layer-schema" -> inputLayerSchema,
"olp.layer.query" -> "event_id=in=(1,2,3)",
"olp.connector.query-parallelism" -> "100"
)
val sourceHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(HRN(inputCatalogHrn), "index-layer-avro-input", sourceProperties)
tEnv.executeSql(
s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sourceHelper.options}")
val sinkProperties =
Map(
"olp.catalog.layer-schema" -> outputLayerSchema
)
val sinkHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(HRN(outputCatalogHrn), "index-layer-avro-output", sinkProperties)
tEnv.executeSql(
s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sinkHelper.options}")
tEnv.executeSql(
"""
INSERT INTO OutputTable
SELECT
'Berlin',
event_timestamp,
latitude,
longitude,
idx_ingestion_time,
idx_event_id,
idx_event_type
FROM InputTable"""
)
Map<String, String> sourceProperties = new HashMap<>();
sourceProperties.put("olp.catalog.layer-schema", inputLayerSchema);
sourceProperties.put("olp.layer.query", "event_id=in=(1,2,3)");
sourceProperties.put("olp.connector.query-parallelism", "100");
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
OlpStreamConnectorHelper sourceHelper =
OlpStreamConnectorHelper.create(
HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));
Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put("olp.catalog.layer-schema", outputLayerSchema);
OlpStreamConnectorHelper sinkHelper =
OlpStreamConnectorHelper.create(
HRN.fromString(outputCatalogHrn), outputLayerId, sinkProperties);
Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));
tEnv.executeSql(
"INSERT INTO OutputTable SELECT 'Berlin', event_timestamp, latitude, longitude, idx_ingestion_time, idx_event_id, idx_event_type FROM InputTable");