val sourceProperties =
Map(
"olp.kafka.group-name" -> "my-consumer-group",
"olp.kafka.offset" -> "latest",
"olp.connector.metadata-columns" -> "true"
)
val sourceHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(HRN(inputCatalogHrn), "sample-streaming-layer", sourceProperties)
val tEnv = StreamTableEnvironment.create(env)
tEnv.executeSql(
s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sourceHelper.options}")
val sinkHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(HRN(outputCatalogHrn),
outputLayerId,
Map("olp.connector.metadata-columns" -> "true"))
tEnv.executeSql(
s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sinkHelper.options}")
tEnv.createTemporarySystemFunction("ENCODE_COORDINATES", new EncodeCoordinates)
tEnv.from("InputTable").printSchema()
tEnv.executeSql(
"""
INSERT INTO
OutputTable
SELECT
ENCODE_COORDINATES(longitude_deg, latitude_deg) as data,
mt_partition, mt_timestamp, mt_checksum, mt_crc, mt_dataSize, mt_compressedDataSize
FROM
InputTable CROSS JOIN UNNEST(path.positionEstimate) AS p
"""
)
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
OlpStreamConnectorHelper sourceHelper =
OlpStreamConnectorHelper.create(
HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));
Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put("olp.connector.metadata-columns", "true");
OlpStreamConnectorHelper sinkHelper =
OlpStreamConnectorHelper.create(
HRN.fromString(outputCatalogHrn), outputLayerId, sinkProperties);
Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));
tEnv.createTemporaryFunction("ENCODE_COORDINATES", EncodeCoordinatesJava.class);
tEnv.executeSql(
"INSERT INTO "
+ " OutputTable "
+ "SELECT "
+ " ENCODE_COORDINATES(longitude_deg, latitude_deg) as data, "
+ " mt_partition, mt_timestamp, mt_checksum, mt_crc, mt_dataSize, mt_compressedDataSize "
+ "FROM"
+ " InputTable CROSS JOIN UNNEST(positionEstimate) AS p");