Flink Connector を使用してデータの読み取りと書き込みを行います

目的: Flink Connector を使用して、カタログ内のさまざまなレイヤーおよびデータ形式からデータを読み書きする方法について理解します。

複雑さ: 初心者向け

所要時間: 40 分

前提条件: プロジェクトでの作業を整理します

ソースコード: ダウンロード

このチュートリアルの例では、データ クライアント ライブラリが提供する Flink コネクタの使用方法を示します。 これにより、ストリーム処理ワークロードでの Flink との対話がサポートされ、 Flink のすべての標準 API および機能を使用してデータの読み取り、書き込み、および削除を行うことができます。 バッチ処理のワークロードには、代わりに付属の Spark Connector を使用する必要があります。

このチュートリアルの主な部分では、次の用途について説明します。

  • Protobuf 形式のストリーミングレイヤーにサブスクライブします
  • テーブル API からデータストリーム API にデータを変換し、構造を変更します
  • ストリームからデータを印刷し、ストリームをインデックス レイヤーの送信先として使用します
  • 異なる種類のレイヤーから別のレイヤーにデータを転送します

準備のステップとして、適切なレイヤーを含むデータの宛先カタログを作成し、このチュートリアルのメインパートで使用するときにそれらのカタログが配置されるようにする必要があります。 使用されるデータセットは、 HERE サンプル SDII メッセージカタログから取得され、 SDII メッセージの形式でシミュレートされたストリーミングセンサーデータが含まれます。

Maven プロジェクトを設定します

プロジェクトの次のフォルダー構造を作成します。

flink-connector
└── src
    └── main
        ├── java
        └── resources
        └── scala

この操作は、次の bash 1 つのコマンドで実行できます。

mkdir -p flink-connector/src/main/{java,resources,scala}

出力カタログを作成します

pipeline-config.conf という名前のファイルを作成し、以下の内容を入力します。「プロジェクトでの作業の整理」で作成したカタログの HERE リソースネーム {{YOUR_OUTPUT_CATALOG_HRN}} に置き換えます。

pipeline.config {

  output-catalog { hrn = "{{YOUR_OUTPUT_CATALOG_HRN}}" }

  input-catalogs {
    // Be sure to use hrn:here-cn:data::olp-cn-here:sample-data on the China Environment.
    sensorData { hrn = "hrn:here:data::olp-here:olp-sdii-sample-berlin-2" }
  }
}

Maven POM ファイルは 、 Maven 設定の確認 の例のファイルに似ています。 親 POM および依存関係セクションは次のように更新されます。

親 POM :

<parent>
    <groupId>com.here.platform</groupId>
    <artifactId>sdk-stream-bom_2.12</artifactId>
    <version>2.54.3</version>
    <relativePath/>
</parent>

依存関係 :

<dependencies>
    <dependency>
        <groupId>com.here.platform.data.client</groupId>
        <artifactId>flink-support_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.platform.pipeline</groupId>
        <artifactId>pipeline-interface_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_${scala.compat.version}</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.compat.version}</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.compat.version}</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_${scala.compat.version}</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.7.3</version>
        <scope>compile</scope>
        <exclusions>
            <exclusion>
                <groupId>org.apache.htrace</groupId>
                <artifactId>htrace-core</artifactId>
            </exclusion>
            <exclusion>
                <groupId>xerces</groupId>
                <artifactId>xercesImpl</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>

出力カタログを作成する必要があります。 これらの手順は、 OLP コマンド ライン インターフェース( CLI )を使用して実行できます。

たとえば、カタログには一意の識別子名を使用する必要 {{YOUR_USERNAME}}-flink-connector-outputがあります。

出力カタログでは flink-connector-ouput.json 、以下の内容でファイルに名前を付けることができます。

{
  "id": "flink-connector-output",
  "name": "Simulated sensor data archive (from tutorial) flink-connector-output",
  "summary": "Archive of simulated sensor data for the FlinkConnector tutorial",
  "description": "Archive of simulated sensor data",
  "tags": [
    "Tutorial",
    "Simulated"
  ],
  "layers": [
    {
      "id": "volatile-layer-avro-data",
      "name": "volatile-layer-avro-data",
      "summary": "Simulated sensor data for the FlinkConnector tutorial",
      "description": "Simulated sensor data for the FlinkConnector tutorial",
      "contentType": "application/x-avro-binary",
      "layerType": "volatile",
      "volume": {
        "volumeType": "durable"
      },
      "partitioning": {
        "scheme": "generic"
      }
    },
    {
      "id": "index-layer-parquet-data",
      "name": "index-layer-parquet-data",
      "summary": "Simulated sensor data for the FlinkConnector tutorial",
      "description": "Simulated sensor data for the FlinkConnector tutorial",
      "contentType": "application/x-parquet",
      "layerType": "index",
      "indexProperties": {
        "indexDefinitions": [
          {
            "name": "tile_id",
            "type": "int"
          },
          {
            "name": "time_window",
            "duration": 600000,
            "type": "timewindow"
          }
        ],
        "ttl": "unlimited"
      },
      "volume": {
        "volumeType": "durable"
      },
      "partitioning": {
        "scheme": "generic"
      }
    }
  ]
}

{{YOUR_CATALOG_ID}} 以下を自分の識別子に置き換えてから、次のコマンドを実行 olp.batします (Windows では Cygwin および git bash で動作します。それ以外の場合は実行できます ) 。

olp catalog create {{YOUR_CATALOG_ID}} \
    "Simulated sensor data output from tutorial ({{YOUR_USERNAME}})" \
    --config flink-connector-output.json \
    --scope {{YOUR_PROJECT_HRN}}

レルムで請求タグが必要な場合 は、layerセクションにbillingTags: ["YOUR_BILLING_TAG"]プロパティを追加して設定 ファイルを更新します。

このアプリケーションは、パブリックデータソースを使用して、ストリーム レイヤーから Protobuf データ形式で読み取り、受信したデータの一部の変換を実行し、前に作成したカタログから出力ボラタイル レイヤーに書き込みます。

Scala
Java

/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.olp.util.quad.factory.HereQuadFactory
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper
import com.here.platform.pipeline.PipelineContext
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.functions.ScalarFunction
import org.slf4j.LoggerFactory

import java.time.Duration
import java.util.{Timer, TimerTask}

object StreamToVolatileLayerScalaPipeline extends App {

  private val logger = LoggerFactory.getLogger(StreamToVolatileLayerScalaPipeline.getClass)

  private val pipelineContext = new PipelineContext

  // Source and output catalogs / layers
  val sensorDataCatalogHrn = pipelineContext.config.inputCatalogs("sensorData")
  val outputCatalogHrn = pipelineContext.config.outputCatalog

  val streamingLayer = "sample-streaming-layer"
  val outputVolatileLayer = "volatile-layer-avro-data"

  // Configure stream execution environment settings
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setParallelism(1)
  env.enableCheckpointing(5000)
  env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 10000L))

  // Define the properties
  val properties = Map(
    "olp.kafka.group-name" -> "protobuf-streaming",
    "olp.kafka.offset" -> "earliest",
  )

  // create the Table Connector Descriptor Source
  val helper = OlpStreamConnectorHelper(sensorDataCatalogHrn, streamingLayer, properties)

  // Register the TableSource
  val tEnv = StreamTableEnvironment.create(env)
  val schema = helper.prebuiltSchema(tEnv).build()
  tEnv.executeSql(s"CREATE TABLE SensorDataTable $schema WITH ${helper.options}")

  // Register the user-defined functions to be used in the SQL query
  tEnv.createTemporarySystemFunction("computeHereTile", new ComputeTileFunction())
  tEnv.createTemporarySystemFunction("assignUuid", new AssignUuidFunction())

  // Define a Table containing the fields to be used in computing results
  val observationsTable: Table =
    tEnv.sqlQuery("""SELECT
                    |  assignUuid(envelope.version) AS eventId,
                    |  timeStampUTC_ms AS timestampUtc,
                    |  computeHereTile(latitude_deg, longitude_deg) AS tile
                    |FROM
                    |  SensorDataTable CROSS JOIN UNNEST(positionEstimate)
                    |""".stripMargin)

  // Domain case classes
  case class PositionEvent(eventId: String, timestampUtc: Long, tile: Long)
  case class PositionStatistics(timestampUtc: Long, tile: Long, totalObservations: Int)

  // Create watermark strategy
  val watermarkStrategy: WatermarkStrategy[PositionEvent] = WatermarkStrategy
    .forBoundedOutOfOrderness[PositionEvent](Duration.ofSeconds(10))
    .withTimestampAssigner(new SerializableTimestampAssigner[PositionEvent] {
      override def extractTimestamp(event: PositionEvent, recordTimestamp: Long): Long =
        event.timestampUtc
    })

  // Compute values for the number of observations by tile and time window
  val outputStream: DataStream[PositionStatistics] = tEnv
    .toAppendStream[PositionEvent](observationsTable) // Convert the records to a stream of PositionEvent
    .assignTimestampsAndWatermarks(watermarkStrategy) // Define how the event time is assigned
    .map(v => (v.timestampUtc, v.tile, 1)) // Map fields to a tuple
    .keyBy(_._2) // Key by tile ID
    .window(SlidingEventTimeWindows.of(Time.seconds(15), Time.seconds(5))) // Define the time window to use
    .sum(2)
    .map(PositionStatistics.tupled(_))
  outputStream.print()

  tEnv.createTemporaryView("PositionStatsTable", outputStream)

  // Define the avro schema in which the output data will be written
  val outputAvroSchema =
    """{
      |  "type" : "record",
      |  "name" : "Event",
      |  "namespace" : "my.flink.tutorial",
      |  "fields" : [
      |    {"name" : "city", "type" : "string"},
      |    {"name" : "total", "type" : "int"},
      |    {"name" : "timestampUtc", "type" : "long"}
      |  ]
      |}
      |""".stripMargin

  // Create TableSink for the output
  val sinkHelper: OlpStreamConnectorHelper =
    OlpStreamConnectorHelper(outputCatalogHrn,
                             outputVolatileLayer,
                             Map("olp.catalog.layer-schema" -> outputAvroSchema))
  tEnv.executeSql(
    s"CREATE TABLE OutputIndexTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
      s"WITH ${sinkHelper.options}")

  // Write the result into the output table, indexed by timestamp and HERE tile ID
  tEnv.executeSql("""INSERT INTO
                    |  OutputIndexTable
                    |SELECT
                    |  'Berlin' AS city,
                    |  totalObservations AS total,
                    |  timestampUtc,
                    |  CAST(tile AS STRING) AS mt_partition
                    |FROM
                    |  PositionStatsTable
                    |""".stripMargin)

  try {
    env.executeAsync()
    logger.info(s"Stream to $outputCatalogHrn executed")

    // Remove this part to run as infinite pipeline
    new Timer(true).schedule(new TimerTask {
      override def run(): Unit = {
        logger.info(s"Stream to $outputCatalogHrn canceled")
        System.exit(0)
      }
    }, 30000)

  } catch {
    case ex: Exception =>
      ex.printStackTrace()
  }
}

class ComputeTileFunction() extends ScalarFunction {
  private val tileLevel = 14

  // java.lang.Double is used to avoid DOUBLE NOT NULL semantics of the function
  // https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/#data-type-extraction
  def eval(latitude: java.lang.Double, longitude: java.lang.Double): Long =
    HereQuadFactory.INSTANCE
      .getMapQuadByLocation(latitude, longitude, tileLevel)
      .getLongKey
}

class AssignUuidFunction() extends ScalarFunction {

  def eval(input: String): String =
    java.util.UUID.randomUUID.toString
}



/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.hrn.HRN;
import com.here.olp.util.quad.factory.HereQuadFactory;
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper;
import com.here.platform.pipeline.PipelineContext;
import java.time.Duration;
import java.util.*;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamToVolatileLayerPipeline {

  private static final PipelineContext pipelineContext = new PipelineContext();

  // Source for the sensor data to be used as input
  private static final HRN sensorDataCatalogHrn =
      pipelineContext.getConfig().getInputCatalogs().get("sensorData");
  private static final HRN outputCatalogHrn = pipelineContext.getConfig().getOutputCatalog();

  private static final String streamingLayer = "sample-streaming-layer";
  private static final String outputIndexLayer = "volatile-layer-avro-data";

  public static void main(String[] args) {
    Logger logger = LoggerFactory.getLogger(StreamToVolatileLayerPipeline.class);

    // Configure stream execution environment settings
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.enableCheckpointing(5000);
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 10000L));

    // Define the properties
    Map<String, String> properties = new HashMap<>();
    properties.put("olp.kafka.group-name", "protobuf-streaming");
    properties.put("olp.kafka.offset", "earliest");

    // create the Table Connector Descriptor Source
    OlpStreamConnectorHelper helper =
        OlpStreamConnectorHelper.create(sensorDataCatalogHrn, streamingLayer, properties);

    // Register the TableSource
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    Schema schema = helper.prebuiltSchema(tEnv).build();
    tEnv.executeSql(
        String.format("CREATE TABLE SensorDataTable %s WITH %s", schema, helper.options()));

    // Register the user-defined functions to be used in the SQL query
    tEnv.createTemporarySystemFunction("computeHereTile", new ComputeTileFunction());
    tEnv.createTemporarySystemFunction("assignUuid", new AssignUuidFunction());

    // Define a Table containing the fields to be used in computing results
    Table observationsTable =
        tEnv.sqlQuery(
            "SELECT"
                + "  assignUuid(envelope.version) AS eventId, "
                + "  timeStampUTC_ms AS timestampUtc, "
                + "  computeHereTile(latitude_deg, longitude_deg) AS tile "
                + "FROM SensorDataTable "
                + "CROSS JOIN UNNEST(positionEstimate)");

    // Create watermark strategy
    WatermarkStrategy<Tuple3<String, Long, Long>> watermarkStrategy =
        WatermarkStrategy.<Tuple3<String, Long, Long>>forBoundedOutOfOrderness(
                Duration.ofSeconds(10))
            .withTimestampAssigner((event, timestamp) -> event.f1);

    // Compute values for the number of observations by tile and time window
    DataStream<PositionStatistics> outputStream =
        tEnv.toAppendStream(
                observationsTable,
                new TupleTypeInfo<Tuple3<String, Long, Long>>(Types.STRING, Types.LONG, Types.LONG))
            .assignTimestampsAndWatermarks(watermarkStrategy)
            .map(
                new MapFunction<Tuple3<String, Long, Long>, Tuple3<Long, Long, Integer>>() {
                  @Override
                  public Tuple3<Long, Long, Integer> map(Tuple3<String, Long, Long> observation) {
                    return new Tuple3<>(observation.f1, observation.f2, 1);
                  }
                })
            .keyBy(selector -> selector.f1)
            .window(SlidingEventTimeWindows.of(Time.seconds(15), Time.seconds(5)))
            .sum(2)
            .map(
                new MapFunction<Tuple3<Long, Long, Integer>, PositionStatistics>() {
                  @Override
                  public PositionStatistics map(Tuple3<Long, Long, Integer> result) {
                    return new PositionStatistics(result.f0, result.f1, result.f2);
                  }
                });

    outputStream.print();

    tEnv.createTemporaryView("PositionStatsTable", outputStream);

    // Define the avro schema in which the output data will be written
    String outputAvroSchema =
        "  {\n"
            + "    \"type\" : \"record\",\n"
            + "    \"name\" : \"Event\",\n"
            + "    \"namespace\" : \"my.flink.tutorial\",\n"
            + "    \"fields\" : [\n"
            + "      {\"name\" : \"city\", \"type\" : \"string\"},\n"
            + "      {\"name\" : \"total\", \"type\" : \"int\"},\n"
            + "      {\"name\" : \"timestampUtc\", \"type\" : \"long\"}\n"
            + "    ]\n"
            + "  }\n";

    Map<String, String> sinkProperties = new HashMap<>();
    sinkProperties.put("olp.catalog.layer-schema", outputAvroSchema);

    // Create TableSink for the output
    OlpStreamConnectorHelper sinkHelper =
        OlpStreamConnectorHelper.create(outputCatalogHrn, outputIndexLayer, sinkProperties);
    Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
    tEnv.executeSql(
        String.format(
            "CREATE TABLE OutputIndexTable %s WITH %s", sinkSchema, sinkHelper.options()));

    // Write the result into the output table, indexed by timestamp and HERE tile ID
    tEnv.executeSql(
        "INSERT INTO"
            + "  OutputIndexTable "
            + "SELECT"
            + "  'Berlin' AS city,"
            + "  totalObservations AS total,"
            + "  timestampUtc,"
            + "  CAST(tile AS STRING) AS mt_partition "
            + "FROM"
            + "  PositionStatsTable");

    try {

      env.executeAsync();
      logger.info("Stream to {} executed", outputCatalogHrn);

      // Remove this part to run as infinite pipeline
      new Timer(true)
          .schedule(
              new TimerTask() {
                @Override
                public void run() {
                  logger.info("Stream to {} canceled", outputCatalogHrn);
                  System.exit(0);
                }
              },
              30000);
    } catch (Exception ex) {
      ex.printStackTrace();
    }
  }

  public static class ComputeTileFunction extends ScalarFunction {
    private final int tileLevel = 14;

    public long eval(Double latitude, Double longitude) {
      return HereQuadFactory.INSTANCE
          .getMapQuadByLocation(latitude, longitude, tileLevel)
          .getLongKey();
    }
  }

  public static class AssignUuidFunction extends ScalarFunction {

    public String eval(String input) {
      return UUID.randomUUID().toString();
    }
  }

  public static class PositionStatistics {
    public long tile;
    public long timestampUtc;
    public int totalObservations;

    public PositionStatistics() {}

    public PositionStatistics(long timestampUtc, long tile, int totalObservations) {
      this.tile = tile;
      this.timestampUtc = timestampUtc;
      this.totalObservations = totalObservations;
    }

    @Override
    public String toString() {
      return tile + " " + timestampUtc + " " + totalObservations;
    }
  }
}

別の例では、前のステップで入力したボラタイル レイヤーを使用して、 Avro データ形式でデータを読み取り、受信したデータの一部の変換を実行し、前に作成したカタログから出力インデックス レイヤーに書き込みます。

Scala
Java

/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper
import com.here.platform.pipeline.PipelineContext
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.{tableConversions, StreamTableEnvironment}
import org.apache.flink.types.Row
import org.apache.flink.streaming.api.scala._
import org.slf4j.LoggerFactory

import java.util.{Timer, TimerTask}

object VolatileToIndexLayerScalaPipeline extends App {
  private val logger = LoggerFactory.getLogger(VolatileToIndexLayerScalaPipeline.getClass)

  private val pipelineContext = new PipelineContext

  // Source and output catalogs / layers
  val catalogHrn = pipelineContext.config.outputCatalog

  val inputVolatileLayer = "volatile-layer-avro-data"
  val outputIndexLayer = "index-layer-parquet-data"

  // Configure stream execution environment settings
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setParallelism(1)
  env.enableCheckpointing(5000)
  env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 10000L))

  val inputSchema =
    """{
      |  "type" : "record",
      |  "name" : "Event",
      |  "namespace" : "my.flink.tutorial",
      |  "fields" : [
      |    {"name" : "city", "type" : "string"},
      |    {"name" : "total", "type" : "int"},
      |    {"name" : "timestampUtc", "type" : "long"}
      |  ]
      |}
      |""".stripMargin

  // For the China environment, change this to `List("389695390", "389695391", "389695392")`.
  val tiles = List("377894440", "377894441", "377894442")
  // Define the properties
  val properties =
    Map(
      "olp.layer.query" -> s"mt_partition=in=${tiles.mkString("(", ", ", ")")}",
      "olp.catalog.layer-schema" -> inputSchema,
      "olp.connector-refresh-interval" -> "-1"
    )

  // create the Table Connector Descriptor Source
  val helper = OlpStreamConnectorHelper(catalogHrn, inputVolatileLayer, properties)

  // Register the TableSource
  val tEnv = StreamTableEnvironment.create(env)
  val schema = helper.prebuiltSchema(tEnv).build()
  tEnv.executeSql(s"CREATE TABLE TableSource $schema WITH ${helper.options}")

  tEnv.from("TableSource").toAppendStream[Row].print()

  val outputSchema =
    """{
      |  "type" : "record",
      |  "name" : "Event",
      |  "namespace" : "my.flink.tutorial",
      |  "fields" : [
      |    {"name" : "city", "type" : "string"},
      |    {"name" : "total", "type" : "int"}
      |  ]
      |}
      |""".stripMargin

  // Create TableSink for the output
  val sinkHelper: OlpStreamConnectorHelper =
    OlpStreamConnectorHelper(catalogHrn,
                             outputIndexLayer,
                             Map("olp.catalog.layer-schema" -> outputSchema))
  tEnv.executeSql(
    s"CREATE TABLE Sink ${sinkHelper.prebuiltSchema(tEnv).build()} " +
      s"WITH ${sinkHelper.options}")

  tEnv.executeSql("""INSERT INTO Sink
                    |SELECT
                    |  city,
                    |  total,
                    |  CAST(mt_partition as BIGINT) as idx_tile_id,
                    |  timestampUtc as idx_time_window
                    |FROM TableSource
                    |""".stripMargin)

  try {
    env.executeAsync()
    logger.info(s"Stream to $catalogHrn executed")

    // Remove this part to run as infinite pipeline
    new Timer(true).schedule(new TimerTask {
      override def run(): Unit = {
        logger.info(s"Stream to $catalogHrn canceled")
        System.exit(0)
      }
    }, 30000)
  } catch {
    case ex: Exception =>
      ex.printStackTrace()
  }
}



/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.hrn.HRN;
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper;
import com.here.platform.pipeline.PipelineContext;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VolatileToIndexLayerPipeline {
  private static final PipelineContext pipelineContext = new PipelineContext();

  // Source and output catalogs / layers
  private static final HRN catalogHrn = pipelineContext.config().getOutputCatalog();

  private static final String inputVolatileLayer = "volatile-layer-avro-data";
  private static final String outputIndexLayer = "index-layer-parquet-data";

  public static void main(String[] args) {
    Logger logger = LoggerFactory.getLogger(VolatileToIndexLayerPipeline.class);

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.enableCheckpointing(5000);
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 10000L));

    String inputAvroSchema =
        "  {\n"
            + "    \"type\" : \"record\",\n"
            + "    \"name\" : \"Event\",\n"
            + "    \"namespace\" : \"my.flink.tutorial\",\n"
            + "    \"fields\" : [\n"
            + "      {\"name\" : \"city\", \"type\" : \"string\"},\n"
            + "      {\"name\" : \"total\", \"type\" : \"int\"},\n"
            + "      {\"name\" : \"timestampUtc\", \"type\" : \"long\"}\n"
            + "    ]\n"
            + "  }\n";

    // For the China environment, change this to `(new String[] {"389695390", "389695391",
    // "389695392"})`.
    String[] tiles = (new String[] {"377894440", "377894441", "377894442"});
    String tilesQueryString = Arrays.stream(tiles).collect(Collectors.joining(", ", "(", ")"));

    Map<String, String> properties = new HashMap<>();
    properties.put("olp.layer.query", "mt_partition=in=" + tilesQueryString);
    properties.put("olp.catalog.layer-schema", inputAvroSchema);
    properties.put("olp.connector-refresh-interval", "-1");

    // create the Table Connector Descriptor Source
    OlpStreamConnectorHelper helper =
        OlpStreamConnectorHelper.create(catalogHrn, inputVolatileLayer, properties);

    // Register the TableSource
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    Schema schema = helper.prebuiltSchema(tEnv).build();
    tEnv.executeSql(String.format("CREATE TABLE TableSource %s WITH %s", schema, helper.options()));

    tEnv.toAppendStream(tEnv.from("TableSource"), Row.class).print();
    // Define the parquet schema in which the output data will be written
    String outputParquetSchema =
        "  {\n"
            + "    \"type\" : \"record\",\n"
            + "    \"name\" : \"Event\",\n"
            + "    \"namespace\" : \"my.flink.tutorial\",\n"
            + "    \"fields\" : [\n"
            + "       {\"name\" : \"city\", \"type\" : \"string\"},\n"
            + "       {\"name\" : \"total\", \"type\" : \"int\"}\n"
            + "    ]\n"
            + "  }\n";

    Map<String, String> sinkProperties = new HashMap<>();
    sinkProperties.put("olp.catalog.layer-schema", outputParquetSchema);

    // Create TableSink for the output
    OlpStreamConnectorHelper sinkHelper =
        OlpStreamConnectorHelper.create(catalogHrn, outputIndexLayer, sinkProperties);
    Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
    tEnv.executeSql(
        String.format("CREATE TABLE Sink %s WITH %s", sinkSchema, sinkHelper.options()));

    tEnv.executeSql(
        "INSERT INTO Sink "
            + "SELECT"
            + "  city,"
            + "  total,"
            + "  CAST(mt_partition as BIGINT) as idx_tile_id,"
            + "  timestampUtc as idx_time_window "
            + "FROM TableSource");

    try {
      env.executeAsync();
      logger.info("Stream to {} executed", catalogHrn);
      // Remove this part to run as infinite pipeline
      new Timer(true)
          .schedule(
              new TimerTask() {
                @Override
                public void run() {
                  logger.info("Stream to {} canceled", catalogHrn);
                  System.exit(0);
                }
              },
              30000);
    } catch (Exception ex) {
      ex.printStackTrace();
    }
  }
}

ローカルでコンパイルおよび実行します

アプリケーションをローカルで実行するには、次のコマンドを実行します。

Scala
Java

mvn compile exec:java \
    -Dexec.mainClass=StreamToVolatileLayerScalaPipeline \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf


mvn compile exec:java \
    -Dexec.mainClass=StreamToVolatileLayerPipeline \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf

2 番目のアプリケーションを実行するには、次のコマンドを実行します。

Scala
Java

mvn compile exec:java \
    -Dexec.mainClass=VolatileToIndexLayerScalaPipeline \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf


mvn compile exec:java \
    -Dexec.mainClass=VolatileToIndexLayerPipeline \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf

詳細情報

このチュートリアルで扱うトピックの詳細については、次のソースを参照してください。

」に一致する結果は 件です

    」に一致する結果はありません