Use Flink Connector to read and write data

Objectives: Understand how to use the Flink Connector to read and write data from different layers and data formats in a catalog.

Complexity: Beginner

Time to complete: 40 min

Prerequisites: Organize your work in projects

Source code: Download

The examples in this tutorial demonstrate how to use the Flink Connector provided by the Data Client Library. This provides support for interacting with Flink for stream processing workloads, allowing the use of all standard APIs and functions in Flink to read, write and delete data. For batch processing workloads, you should use the provided Spark Connector instead.

In the main part of the tutorial, we will cover the following usages:

  • Subscribe to streaming layer in protobuf format
  • Transform data from Table API to DataStream API and change the structure
  • Print data from stream and use index layer as destination for stream
  • Transfer data from one layer to another with different types

As a preparation step, you will need to create your data destination catalog with appropriate layers, so that these are in place when it comes to the main part of this tutorial. The dataset used will be sourced from the HERE Sample SDII Messages catalog, and contains simulated streaming sensor data in form of SDII messages.

Set up the Maven project

Create the following folder structure for the project:

flink-connector
└── src
    └── main
        ├── java
        └── resources
        └── scala

You can do this with a single bash command:

mkdir -p flink-connector/src/main/{java,resources,scala}

Create the output catalog

Create a file named pipeline-config.conf, and populate it with the contents below, replacing {{YOUR_OUTPUT_CATALOG_HRN}} with the HRN to the catalog you created in Organize your work in projects.

pipeline.config {

  output-catalog { hrn = "{{YOUR_OUTPUT_CATALOG_HRN}}" }

  input-catalogs {
    // Be sure to use hrn:here-cn:data::olp-cn-here:sample-data on the China Environment.
    sensorData { hrn = "hrn:here:data::olp-here:olp-sdii-sample-berlin-2" }
  }
}

The Maven POM file is similar to the file in the Verify Maven Settings example. The parent POM and dependencies sections are updated as follows:

Parent POM:

<parent>
    <groupId>com.here.platform</groupId>
    <artifactId>sdk-stream-bom_2.12</artifactId>
    <version>2.54.3</version>
    <relativePath/>
</parent>

Dependencies:

<dependencies>
    <dependency>
        <groupId>com.here.platform.data.client</groupId>
        <artifactId>flink-support_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.platform.pipeline</groupId>
        <artifactId>pipeline-interface_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_${scala.compat.version}</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.compat.version}</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.compat.version}</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_${scala.compat.version}</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.7.3</version>
        <scope>compile</scope>
        <exclusions>
            <exclusion>
                <groupId>org.apache.htrace</groupId>
                <artifactId>htrace-core</artifactId>
            </exclusion>
            <exclusion>
                <groupId>xerces</groupId>
                <artifactId>xercesImpl</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>

You will need to create the output catalog. You can perform these steps using the OLP Command Line Interface (CLI).

You should use a unique identifier name for the catalog, for example {{YOUR_USERNAME}}-flink-connector-output.

For the output catalog, you can name the file flink-connector-ouput.json with the contents below.

{
  "id": "flink-connector-output",
  "name": "Simulated sensor data archive (from tutorial) flink-connector-output",
  "summary": "Archive of simulated sensor data for the FlinkConnector tutorial",
  "description": "Archive of simulated sensor data",
  "tags": [
    "Tutorial",
    "Simulated"
  ],
  "layers": [
    {
      "id": "volatile-layer-avro-data",
      "name": "volatile-layer-avro-data",
      "summary": "Simulated sensor data for the FlinkConnector tutorial",
      "description": "Simulated sensor data for the FlinkConnector tutorial",
      "contentType": "application/x-avro-binary",
      "layerType": "volatile",
      "volume": {
        "volumeType": "durable"
      },
      "partitioning": {
        "scheme": "generic"
      }
    },
    {
      "id": "index-layer-parquet-data",
      "name": "index-layer-parquet-data",
      "summary": "Simulated sensor data for the FlinkConnector tutorial",
      "description": "Simulated sensor data for the FlinkConnector tutorial",
      "contentType": "application/x-parquet",
      "layerType": "index",
      "indexProperties": {
        "indexDefinitions": [
          {
            "name": "tile_id",
            "type": "int"
          },
          {
            "name": "time_window",
            "duration": 600000,
            "type": "timewindow"
          }
        ],
        "ttl": "unlimited"
      },
      "volume": {
        "volumeType": "durable"
      },
      "partitioning": {
        "scheme": "generic"
      }
    }
  ]
}

Replace {{YOUR_CATALOG_ID}} below with your own identifier and then run the following command (on Windows, this works in Cygwin and git bash; otherwise you can run olp.bat):

olp catalog create {{YOUR_CATALOG_ID}} \
    "Simulated sensor data output from tutorial ({{YOUR_USERNAME}})" \
    --config flink-connector-output.json \
    --scope {{YOUR_PROJECT_HRN}}

Note

If a billing tag is required in your realm, update the config file by adding the billingTags: ["YOUR_BILLING_TAG"] property to the layer section.

This application uses the public data source to read from the stream layer in protobuf data format, performing some transformations on the received data, and writing to the output volatile layer from the previously created catalog.

Scala
Java

/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.olp.util.quad.factory.HereQuadFactory
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper
import com.here.platform.pipeline.PipelineContext
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.functions.ScalarFunction
import org.slf4j.LoggerFactory

import java.time.Duration
import java.util.{Timer, TimerTask}

object StreamToVolatileLayerScalaPipeline extends App {

  private val logger = LoggerFactory.getLogger(StreamToVolatileLayerScalaPipeline.getClass)

  private val pipelineContext = new PipelineContext

  // Source and output catalogs / layers
  val sensorDataCatalogHrn = pipelineContext.config.inputCatalogs("sensorData")
  val outputCatalogHrn = pipelineContext.config.outputCatalog

  val streamingLayer = "sample-streaming-layer"
  val outputVolatileLayer = "volatile-layer-avro-data"

  // Configure stream execution environment settings
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setParallelism(1)
  env.enableCheckpointing(5000)
  env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 10000L))

  // Define the properties
  val properties = Map(
    "olp.kafka.group-name" -> "protobuf-streaming",
    "olp.kafka.offset" -> "earliest",
  )

  // create the Table Connector Descriptor Source
  val helper = OlpStreamConnectorHelper(sensorDataCatalogHrn, streamingLayer, properties)

  // Register the TableSource
  val tEnv = StreamTableEnvironment.create(env)
  val schema = helper.prebuiltSchema(tEnv).build()
  tEnv.executeSql(s"CREATE TABLE SensorDataTable $schema WITH ${helper.options}")

  // Register the user-defined functions to be used in the SQL query
  tEnv.createTemporarySystemFunction("computeHereTile", new ComputeTileFunction())
  tEnv.createTemporarySystemFunction("assignUuid", new AssignUuidFunction())

  // Define a Table containing the fields to be used in computing results
  val observationsTable: Table =
    tEnv.sqlQuery("""SELECT
                    |  assignUuid(envelope.version) AS eventId,
                    |  timeStampUTC_ms AS timestampUtc,
                    |  computeHereTile(latitude_deg, longitude_deg) AS tile
                    |FROM
                    |  SensorDataTable CROSS JOIN UNNEST(positionEstimate)
                    |""".stripMargin)

  // Domain case classes
  case class PositionEvent(eventId: String, timestampUtc: Long, tile: Long)
  case class PositionStatistics(timestampUtc: Long, tile: Long, totalObservations: Int)

  // Create watermark strategy
  val watermarkStrategy: WatermarkStrategy[PositionEvent] = WatermarkStrategy
    .forBoundedOutOfOrderness[PositionEvent](Duration.ofSeconds(10))
    .withTimestampAssigner(new SerializableTimestampAssigner[PositionEvent] {
      override def extractTimestamp(event: PositionEvent, recordTimestamp: Long): Long =
        event.timestampUtc
    })

  // Compute values for the number of observations by tile and time window
  val outputStream: DataStream[PositionStatistics] = tEnv
    .toAppendStream[PositionEvent](observationsTable) // Convert the records to a stream of PositionEvent
    .assignTimestampsAndWatermarks(watermarkStrategy) // Define how the event time is assigned
    .map(v => (v.timestampUtc, v.tile, 1)) // Map fields to a tuple
    .keyBy(_._2) // Key by tile ID
    .window(SlidingEventTimeWindows.of(Time.seconds(15), Time.seconds(5))) // Define the time window to use
    .sum(2)
    .map(PositionStatistics.tupled(_))
  outputStream.print()

  tEnv.createTemporaryView("PositionStatsTable", outputStream)

  // Define the avro schema in which the output data will be written
  val outputAvroSchema =
    """{
      |  "type" : "record",
      |  "name" : "Event",
      |  "namespace" : "my.flink.tutorial",
      |  "fields" : [
      |    {"name" : "city", "type" : "string"},
      |    {"name" : "total", "type" : "int"},
      |    {"name" : "timestampUtc", "type" : "long"}
      |  ]
      |}
      |""".stripMargin

  // Create TableSink for the output
  val sinkHelper: OlpStreamConnectorHelper =
    OlpStreamConnectorHelper(outputCatalogHrn,
                             outputVolatileLayer,
                             Map("olp.catalog.layer-schema" -> outputAvroSchema))
  tEnv.executeSql(
    s"CREATE TABLE OutputIndexTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
      s"WITH ${sinkHelper.options}")

  // Write the result into the output table, indexed by timestamp and HERE tile ID
  tEnv.executeSql("""INSERT INTO
                    |  OutputIndexTable
                    |SELECT
                    |  'Berlin' AS city,
                    |  totalObservations AS total,
                    |  timestampUtc,
                    |  CAST(tile AS STRING) AS mt_partition
                    |FROM
                    |  PositionStatsTable
                    |""".stripMargin)

  try {
    env.executeAsync()
    logger.info(s"Stream to $outputCatalogHrn executed")

    // Remove this part to run as infinite pipeline
    new Timer(true).schedule(new TimerTask {
      override def run(): Unit = {
        logger.info(s"Stream to $outputCatalogHrn canceled")
        System.exit(0)
      }
    }, 30000)

  } catch {
    case ex: Exception =>
      ex.printStackTrace()
  }
}

class ComputeTileFunction() extends ScalarFunction {
  private val tileLevel = 14

  // java.lang.Double is used to avoid DOUBLE NOT NULL semantics of the function
  // https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/#data-type-extraction
  def eval(latitude: java.lang.Double, longitude: java.lang.Double): Long =
    HereQuadFactory.INSTANCE
      .getMapQuadByLocation(latitude, longitude, tileLevel)
      .getLongKey
}

class AssignUuidFunction() extends ScalarFunction {

  def eval(input: String): String =
    java.util.UUID.randomUUID.toString
}



/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.hrn.HRN;
import com.here.olp.util.quad.factory.HereQuadFactory;
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper;
import com.here.platform.pipeline.PipelineContext;
import java.time.Duration;
import java.util.*;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamToVolatileLayerPipeline {

  private static final PipelineContext pipelineContext = new PipelineContext();

  // Source for the sensor data to be used as input
  private static final HRN sensorDataCatalogHrn =
      pipelineContext.getConfig().getInputCatalogs().get("sensorData");
  private static final HRN outputCatalogHrn = pipelineContext.getConfig().getOutputCatalog();

  private static final String streamingLayer = "sample-streaming-layer";
  private static final String outputIndexLayer = "volatile-layer-avro-data";

  public static void main(String[] args) {
    Logger logger = LoggerFactory.getLogger(StreamToVolatileLayerPipeline.class);

    // Configure stream execution environment settings
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.enableCheckpointing(5000);
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 10000L));

    // Define the properties
    Map<String, String> properties = new HashMap<>();
    properties.put("olp.kafka.group-name", "protobuf-streaming");
    properties.put("olp.kafka.offset", "earliest");

    // create the Table Connector Descriptor Source
    OlpStreamConnectorHelper helper =
        OlpStreamConnectorHelper.create(sensorDataCatalogHrn, streamingLayer, properties);

    // Register the TableSource
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    Schema schema = helper.prebuiltSchema(tEnv).build();
    tEnv.executeSql(
        String.format("CREATE TABLE SensorDataTable %s WITH %s", schema, helper.options()));

    // Register the user-defined functions to be used in the SQL query
    tEnv.createTemporarySystemFunction("computeHereTile", new ComputeTileFunction());
    tEnv.createTemporarySystemFunction("assignUuid", new AssignUuidFunction());

    // Define a Table containing the fields to be used in computing results
    Table observationsTable =
        tEnv.sqlQuery(
            "SELECT"
                + "  assignUuid(envelope.version) AS eventId, "
                + "  timeStampUTC_ms AS timestampUtc, "
                + "  computeHereTile(latitude_deg, longitude_deg) AS tile "
                + "FROM SensorDataTable "
                + "CROSS JOIN UNNEST(positionEstimate)");

    // Create watermark strategy
    WatermarkStrategy<Tuple3<String, Long, Long>> watermarkStrategy =
        WatermarkStrategy.<Tuple3<String, Long, Long>>forBoundedOutOfOrderness(
                Duration.ofSeconds(10))
            .withTimestampAssigner((event, timestamp) -> event.f1);

    // Compute values for the number of observations by tile and time window
    DataStream<PositionStatistics> outputStream =
        tEnv.toAppendStream(
                observationsTable,
                new TupleTypeInfo<Tuple3<String, Long, Long>>(Types.STRING, Types.LONG, Types.LONG))
            .assignTimestampsAndWatermarks(watermarkStrategy)
            .map(
                new MapFunction<Tuple3<String, Long, Long>, Tuple3<Long, Long, Integer>>() {
                  @Override
                  public Tuple3<Long, Long, Integer> map(Tuple3<String, Long, Long> observation) {
                    return new Tuple3<>(observation.f1, observation.f2, 1);
                  }
                })
            .keyBy(selector -> selector.f1)
            .window(SlidingEventTimeWindows.of(Time.seconds(15), Time.seconds(5)))
            .sum(2)
            .map(
                new MapFunction<Tuple3<Long, Long, Integer>, PositionStatistics>() {
                  @Override
                  public PositionStatistics map(Tuple3<Long, Long, Integer> result) {
                    return new PositionStatistics(result.f0, result.f1, result.f2);
                  }
                });

    outputStream.print();

    tEnv.createTemporaryView("PositionStatsTable", outputStream);

    // Define the avro schema in which the output data will be written
    String outputAvroSchema =
        "  {\n"
            + "    \"type\" : \"record\",\n"
            + "    \"name\" : \"Event\",\n"
            + "    \"namespace\" : \"my.flink.tutorial\",\n"
            + "    \"fields\" : [\n"
            + "      {\"name\" : \"city\", \"type\" : \"string\"},\n"
            + "      {\"name\" : \"total\", \"type\" : \"int\"},\n"
            + "      {\"name\" : \"timestampUtc\", \"type\" : \"long\"}\n"
            + "    ]\n"
            + "  }\n";

    Map<String, String> sinkProperties = new HashMap<>();
    sinkProperties.put("olp.catalog.layer-schema", outputAvroSchema);

    // Create TableSink for the output
    OlpStreamConnectorHelper sinkHelper =
        OlpStreamConnectorHelper.create(outputCatalogHrn, outputIndexLayer, sinkProperties);
    Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
    tEnv.executeSql(
        String.format(
            "CREATE TABLE OutputIndexTable %s WITH %s", sinkSchema, sinkHelper.options()));

    // Write the result into the output table, indexed by timestamp and HERE tile ID
    tEnv.executeSql(
        "INSERT INTO"
            + "  OutputIndexTable "
            + "SELECT"
            + "  'Berlin' AS city,"
            + "  totalObservations AS total,"
            + "  timestampUtc,"
            + "  CAST(tile AS STRING) AS mt_partition "
            + "FROM"
            + "  PositionStatsTable");

    try {

      env.executeAsync();
      logger.info("Stream to {} executed", outputCatalogHrn);

      // Remove this part to run as infinite pipeline
      new Timer(true)
          .schedule(
              new TimerTask() {
                @Override
                public void run() {
                  logger.info("Stream to {} canceled", outputCatalogHrn);
                  System.exit(0);
                }
              },
              30000);
    } catch (Exception ex) {
      ex.printStackTrace();
    }
  }

  public static class ComputeTileFunction extends ScalarFunction {
    private final int tileLevel = 14;

    public long eval(Double latitude, Double longitude) {
      return HereQuadFactory.INSTANCE
          .getMapQuadByLocation(latitude, longitude, tileLevel)
          .getLongKey();
    }
  }

  public static class AssignUuidFunction extends ScalarFunction {

    public String eval(String input) {
      return UUID.randomUUID().toString();
    }
  }

  public static class PositionStatistics {
    public long tile;
    public long timestampUtc;
    public int totalObservations;

    public PositionStatistics() {}

    public PositionStatistics(long timestampUtc, long tile, int totalObservations) {
      this.tile = tile;
      this.timestampUtc = timestampUtc;
      this.totalObservations = totalObservations;
    }

    @Override
    public String toString() {
      return tile + " " + timestampUtc + " " + totalObservations;
    }
  }
}

Another example uses the volatile layer filled in the previous step to read the data in avro data format, performing some transformations on the received data, and writing to the output index layer from the previously created catalog.

Scala
Java

/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper
import com.here.platform.pipeline.PipelineContext
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.{tableConversions, StreamTableEnvironment}
import org.apache.flink.types.Row
import org.apache.flink.streaming.api.scala._
import org.slf4j.LoggerFactory

import java.util.{Timer, TimerTask}

object VolatileToIndexLayerScalaPipeline extends App {
  private val logger = LoggerFactory.getLogger(VolatileToIndexLayerScalaPipeline.getClass)

  private val pipelineContext = new PipelineContext

  // Source and output catalogs / layers
  val catalogHrn = pipelineContext.config.outputCatalog

  val inputVolatileLayer = "volatile-layer-avro-data"
  val outputIndexLayer = "index-layer-parquet-data"

  // Configure stream execution environment settings
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setParallelism(1)
  env.enableCheckpointing(5000)
  env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 10000L))

  val inputSchema =
    """{
      |  "type" : "record",
      |  "name" : "Event",
      |  "namespace" : "my.flink.tutorial",
      |  "fields" : [
      |    {"name" : "city", "type" : "string"},
      |    {"name" : "total", "type" : "int"},
      |    {"name" : "timestampUtc", "type" : "long"}
      |  ]
      |}
      |""".stripMargin

  // For the China environment, change this to `List("389695390", "389695391", "389695392")`.
  val tiles = List("377894440", "377894441", "377894442")
  // Define the properties
  val properties =
    Map(
      "olp.layer.query" -> s"mt_partition=in=${tiles.mkString("(", ", ", ")")}",
      "olp.catalog.layer-schema" -> inputSchema,
      "olp.connector-refresh-interval" -> "-1"
    )

  // create the Table Connector Descriptor Source
  val helper = OlpStreamConnectorHelper(catalogHrn, inputVolatileLayer, properties)

  // Register the TableSource
  val tEnv = StreamTableEnvironment.create(env)
  val schema = helper.prebuiltSchema(tEnv).build()
  tEnv.executeSql(s"CREATE TABLE TableSource $schema WITH ${helper.options}")

  tEnv.from("TableSource").toAppendStream[Row].print()

  val outputSchema =
    """{
      |  "type" : "record",
      |  "name" : "Event",
      |  "namespace" : "my.flink.tutorial",
      |  "fields" : [
      |    {"name" : "city", "type" : "string"},
      |    {"name" : "total", "type" : "int"}
      |  ]
      |}
      |""".stripMargin

  // Create TableSink for the output
  val sinkHelper: OlpStreamConnectorHelper =
    OlpStreamConnectorHelper(catalogHrn,
                             outputIndexLayer,
                             Map("olp.catalog.layer-schema" -> outputSchema))
  tEnv.executeSql(
    s"CREATE TABLE Sink ${sinkHelper.prebuiltSchema(tEnv).build()} " +
      s"WITH ${sinkHelper.options}")

  tEnv.executeSql("""INSERT INTO Sink
                    |SELECT
                    |  city,
                    |  total,
                    |  CAST(mt_partition as BIGINT) as idx_tile_id,
                    |  timestampUtc as idx_time_window
                    |FROM TableSource
                    |""".stripMargin)

  try {
    env.executeAsync()
    logger.info(s"Stream to $catalogHrn executed")

    // Remove this part to run as infinite pipeline
    new Timer(true).schedule(new TimerTask {
      override def run(): Unit = {
        logger.info(s"Stream to $catalogHrn canceled")
        System.exit(0)
      }
    }, 30000)
  } catch {
    case ex: Exception =>
      ex.printStackTrace()
  }
}



/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.hrn.HRN;
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper;
import com.here.platform.pipeline.PipelineContext;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VolatileToIndexLayerPipeline {
  private static final PipelineContext pipelineContext = new PipelineContext();

  // Source and output catalogs / layers
  private static final HRN catalogHrn = pipelineContext.config().getOutputCatalog();

  private static final String inputVolatileLayer = "volatile-layer-avro-data";
  private static final String outputIndexLayer = "index-layer-parquet-data";

  public static void main(String[] args) {
    Logger logger = LoggerFactory.getLogger(VolatileToIndexLayerPipeline.class);

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.enableCheckpointing(5000);
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 10000L));

    String inputAvroSchema =
        "  {\n"
            + "    \"type\" : \"record\",\n"
            + "    \"name\" : \"Event\",\n"
            + "    \"namespace\" : \"my.flink.tutorial\",\n"
            + "    \"fields\" : [\n"
            + "      {\"name\" : \"city\", \"type\" : \"string\"},\n"
            + "      {\"name\" : \"total\", \"type\" : \"int\"},\n"
            + "      {\"name\" : \"timestampUtc\", \"type\" : \"long\"}\n"
            + "    ]\n"
            + "  }\n";

    // For the China environment, change this to `(new String[] {"389695390", "389695391",
    // "389695392"})`.
    String[] tiles = (new String[] {"377894440", "377894441", "377894442"});
    String tilesQueryString = Arrays.stream(tiles).collect(Collectors.joining(", ", "(", ")"));

    Map<String, String> properties = new HashMap<>();
    properties.put("olp.layer.query", "mt_partition=in=" + tilesQueryString);
    properties.put("olp.catalog.layer-schema", inputAvroSchema);
    properties.put("olp.connector-refresh-interval", "-1");

    // create the Table Connector Descriptor Source
    OlpStreamConnectorHelper helper =
        OlpStreamConnectorHelper.create(catalogHrn, inputVolatileLayer, properties);

    // Register the TableSource
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    Schema schema = helper.prebuiltSchema(tEnv).build();
    tEnv.executeSql(String.format("CREATE TABLE TableSource %s WITH %s", schema, helper.options()));

    tEnv.toAppendStream(tEnv.from("TableSource"), Row.class).print();
    // Define the parquet schema in which the output data will be written
    String outputParquetSchema =
        "  {\n"
            + "    \"type\" : \"record\",\n"
            + "    \"name\" : \"Event\",\n"
            + "    \"namespace\" : \"my.flink.tutorial\",\n"
            + "    \"fields\" : [\n"
            + "       {\"name\" : \"city\", \"type\" : \"string\"},\n"
            + "       {\"name\" : \"total\", \"type\" : \"int\"}\n"
            + "    ]\n"
            + "  }\n";

    Map<String, String> sinkProperties = new HashMap<>();
    sinkProperties.put("olp.catalog.layer-schema", outputParquetSchema);

    // Create TableSink for the output
    OlpStreamConnectorHelper sinkHelper =
        OlpStreamConnectorHelper.create(catalogHrn, outputIndexLayer, sinkProperties);
    Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
    tEnv.executeSql(
        String.format("CREATE TABLE Sink %s WITH %s", sinkSchema, sinkHelper.options()));

    tEnv.executeSql(
        "INSERT INTO Sink "
            + "SELECT"
            + "  city,"
            + "  total,"
            + "  CAST(mt_partition as BIGINT) as idx_tile_id,"
            + "  timestampUtc as idx_time_window "
            + "FROM TableSource");

    try {
      env.executeAsync();
      logger.info("Stream to {} executed", catalogHrn);
      // Remove this part to run as infinite pipeline
      new Timer(true)
          .schedule(
              new TimerTask() {
                @Override
                public void run() {
                  logger.info("Stream to {} canceled", catalogHrn);
                  System.exit(0);
                }
              },
              30000);
    } catch (Exception ex) {
      ex.printStackTrace();
    }
  }
}

Compile and run locally

To run the application locally, execute the following command:

Scala
Java

mvn compile exec:java \
    -Dexec.mainClass=StreamToVolatileLayerScalaPipeline \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf


mvn compile exec:java \
    -Dexec.mainClass=StreamToVolatileLayerPipeline \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf

to run the second application, execute the following command:

Scala
Java

mvn compile exec:java \
    -Dexec.mainClass=VolatileToIndexLayerScalaPipeline \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf


mvn compile exec:java \
    -Dexec.mainClass=VolatileToIndexLayerPipeline \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf

Further information

For more details on the topics covered in this tutorial, you can refer to the following sources:

results matching ""

    No results matching ""