val tEnv = StreamTableEnvironment.create(env)
val inputLayerSchema = """
{
"type" : "record",
"name" : "Event",
"namespace" : "my.example",
"fields" : [
{"name" : "event_timestamp", "type" : "long"},
{"name" : "latitude", "type" : "double"},
{"name" : "longitude", "type" : "double"}
]
}
"""
val outputLayerSchema = """
{
"type" : "record",
"name" : "Event",
"namespace" : "my.example",
"fields" : [
{"name" : "city", "type" : "string"},
{"name" : "event_timestamp", "type" : "long"},
{"name" : "latitude", "type" : "double"},
{"name" : "longitude", "type" : "double"}
]
}
"""
val sourceProperties =
Map(
"olp.catalog.layer-schema" -> inputLayerSchema,
"olp.layer.query" -> "mt_partition=in=(1,2,3)"
)
val sourceHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(HRN(inputCatalogHrn),
"volatile-layer-parquet-input",
sourceProperties)
tEnv.executeSql(
s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sourceHelper.options}")
val sinkHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(HRN(outputCatalogHrn),
"volatile-layer-parquet-output",
Map("olp.catalog.layer-schema" -> outputLayerSchema))
tEnv.executeSql(
s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sinkHelper.options}")
tEnv.executeSql(
"""
INSERT INTO OutputTable
SELECT
'Berlin',
event_timestamp,
latitude,
longitude
FROM InputTable"""
)
String inputLayerSchema =
"{\"type\" : \"record\", \"name\" : \"Event\", \"namespace\" : \"my.example\", \"fields\" : [ {\"name\" : \"event_timestamp\", \"type\" : \"long\"}, {\"name\" : \"latitude\", \"type\" : \"double\"}, {\"name\" : \"longitude\", \"type\" : \"double\"} ] }";
String outputLayerSchema =
"{\"type\" : \"record\", \"name\" : \"Event\", \"namespace\" : \"my.example\", \"fields\" : [ {\"name\" : \"city\", \"type\" : \"string\"}, {\"name\" : \"event_timestamp\", \"type\" : \"long\"}, {\"name\" : \"latitude\", \"type\" : \"double\"}, {\"name\" : \"longitude\", \"type\" : \"double\"} ] }";
Map<String, String> sourceProperties = new HashMap<>();
sourceProperties.put("olp.catalog.layer-schema", inputLayerSchema);
sourceProperties.put("olp.layer.query", "mt_partition=in=(1,2,3)");
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
OlpStreamConnectorHelper sourceHelper =
OlpStreamConnectorHelper.create(
HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));
Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put("olp.catalog.layer-schema", outputLayerSchema);
OlpStreamConnectorHelper sinkHelper =
OlpStreamConnectorHelper.create(
HRN.fromString(outputCatalogHrn), outputLayerId, sinkProperties);
Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));
tEnv.executeSql(
"INSERT INTO OutputTable SELECT 'Berlin', event_timestamp, latitude, longitude, mt_partition FROM InputTable");