Flink Connector を使用してデータの読み取りと書き込みを行います
目的: Flink Connector を使用して、カタログ内のさまざまなレイヤーおよびデータ形式からデータを読み書きする方法について理解します。
複雑さ: 初心者向け
所要時間: 40 分
前提条件: プロジェクトでの作業を整理します
ソースコード: ダウンロード
このチュートリアルの例では、データ クライアント ライブラリが提供する Flink コネクタの使用方法を示します。 これにより、ストリーム処理ワークロードでの Flink との対話がサポートされ、 Flink のすべての標準 API および機能を使用してデータの読み取り、書き込み、および削除を行うことができます。 バッチ処理のワークロードには、代わりに付属の Spark Connector を使用する必要があります。
このチュートリアルの主な部分では、次の用途について説明します。
- Protobuf 形式のストリーミングレイヤーにサブスクライブします
- テーブル API からデータストリーム API にデータを変換し、構造を変更します
- ストリームからデータを印刷し、ストリームをインデックス レイヤーの送信先として使用します
- 異なる種類のレイヤーから別のレイヤーにデータを転送します
準備のステップとして、適切なレイヤーを含むデータの宛先カタログを作成し、このチュートリアルのメインパートで使用するときにそれらのカタログが配置されるようにする必要があります。 使用されるデータセットは、 HERE サンプル SDII メッセージカタログから取得され、 SDII メッセージの形式でシミュレートされたストリーミングセンサーデータが含まれます。
Maven プロジェクトを設定します
プロジェクトの次のフォルダー構造を作成します。
flink-connector
└── src
└── main
├── java
└── resources
└── scala
この操作は、次の bash
1 つのコマンドで実行できます。
mkdir -p flink-connector/src/main/{java,resources,scala}
出力カタログを作成します
pipeline-config.conf
という名前のファイルを作成し、以下の内容を入力します。「プロジェクトでの作業の整理」で作成したカタログの HERE リソースネーム {{YOUR_OUTPUT_CATALOG_HRN}}
に置き換えます。
pipeline.config {
output-catalog { hrn = "{{YOUR_OUTPUT_CATALOG_HRN}}" }
input-catalogs {
// Be sure to use hrn:here-cn:data::olp-cn-here:sample-data on the China Environment.
sensorData { hrn = "hrn:here:data::olp-here:olp-sdii-sample-berlin-2" }
}
}
Maven POM ファイルは 、 Maven 設定の確認 の例のファイルに似ています。 親 POM および依存関係セクションは次のように更新されます。
親 POM :
<parent>
<groupId>com.here.platform</groupId>
<artifactId>sdk-stream-bom_2.12</artifactId>
<version>2.54.3</version>
<relativePath/>
</parent>
依存関係 :
<dependencies>
<dependency>
<groupId>com.here.platform.data.client</groupId>
<artifactId>flink-support_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.pipeline</groupId>
<artifactId>pipeline-interface_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.compat.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.compat.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.compat.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.compat.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core</artifactId>
</exclusion>
<exclusion>
<groupId>xerces</groupId>
<artifactId>xercesImpl</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
出力カタログを作成する必要があります。 これらの手順は、 OLP コマンド ライン インターフェース( CLI )を使用して実行できます。
たとえば、カタログには一意の識別子名を使用する必要 {{YOUR_USERNAME}}-flink-connector-output
があります。
出力カタログでは flink-connector-ouput.json
、以下の内容でファイルに名前を付けることができます。
{
"id": "flink-connector-output",
"name": "Simulated sensor data archive (from tutorial) flink-connector-output",
"summary": "Archive of simulated sensor data for the FlinkConnector tutorial",
"description": "Archive of simulated sensor data",
"tags": [
"Tutorial",
"Simulated"
],
"layers": [
{
"id": "volatile-layer-avro-data",
"name": "volatile-layer-avro-data",
"summary": "Simulated sensor data for the FlinkConnector tutorial",
"description": "Simulated sensor data for the FlinkConnector tutorial",
"contentType": "application/x-avro-binary",
"layerType": "volatile",
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "generic"
}
},
{
"id": "index-layer-parquet-data",
"name": "index-layer-parquet-data",
"summary": "Simulated sensor data for the FlinkConnector tutorial",
"description": "Simulated sensor data for the FlinkConnector tutorial",
"contentType": "application/x-parquet",
"layerType": "index",
"indexProperties": {
"indexDefinitions": [
{
"name": "tile_id",
"type": "int"
},
{
"name": "time_window",
"duration": 600000,
"type": "timewindow"
}
],
"ttl": "unlimited"
},
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "generic"
}
}
]
}
{{YOUR_CATALOG_ID}}
以下を自分の識別子に置き換えてから、次のコマンドを実行 olp.bat
します (Windows では Cygwin および git bash で動作します。それ以外の場合は実行できます ) 。
olp catalog create {{YOUR_CATALOG_ID}} \
"Simulated sensor data output from tutorial ({{YOUR_USERNAME}})" \
--config flink-connector-output.json \
--scope {{YOUR_PROJECT_HRN}}
注
レルムで請求タグが必要な場合 は、layer
セクションにbillingTags: ["YOUR_BILLING_TAG"]
プロパティを追加して設定 ファイルを更新します。
Flink Connector アプリケーションを実装します
このアプリケーションは、パブリックデータソースを使用して、ストリーム レイヤーから Protobuf データ形式で読み取り、受信したデータの一部の変換を実行し、前に作成したカタログから出力ボラタイル レイヤーに書き込みます。
import com.here.olp.util.quad.factory.HereQuadFactory
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper
import com.here.platform.pipeline.PipelineContext
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.functions.ScalarFunction
import org.slf4j.LoggerFactory
import java.time.Duration
import java.util.{Timer, TimerTask}
object StreamToVolatileLayerScalaPipeline extends App {
private val logger = LoggerFactory.getLogger(StreamToVolatileLayerScalaPipeline.getClass)
private val pipelineContext = new PipelineContext
val sensorDataCatalogHrn = pipelineContext.config.inputCatalogs("sensorData")
val outputCatalogHrn = pipelineContext.config.outputCatalog
val streamingLayer = "sample-streaming-layer"
val outputVolatileLayer = "volatile-layer-avro-data"
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.enableCheckpointing(5000)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 10000L))
val properties = Map(
"olp.kafka.group-name" -> "protobuf-streaming",
"olp.kafka.offset" -> "earliest",
)
val helper = OlpStreamConnectorHelper(sensorDataCatalogHrn, streamingLayer, properties)
val tEnv = StreamTableEnvironment.create(env)
val schema = helper.prebuiltSchema(tEnv).build()
tEnv.executeSql(s"CREATE TABLE SensorDataTable $schema WITH ${helper.options}")
tEnv.createTemporarySystemFunction("computeHereTile", new ComputeTileFunction())
tEnv.createTemporarySystemFunction("assignUuid", new AssignUuidFunction())
val observationsTable: Table =
tEnv.sqlQuery("""SELECT
| assignUuid(envelope.version) AS eventId,
| timeStampUTC_ms AS timestampUtc,
| computeHereTile(latitude_deg, longitude_deg) AS tile
|FROM
| SensorDataTable CROSS JOIN UNNEST(positionEstimate)
|""".stripMargin)
case class PositionEvent(eventId: String, timestampUtc: Long, tile: Long)
case class PositionStatistics(timestampUtc: Long, tile: Long, totalObservations: Int)
val watermarkStrategy: WatermarkStrategy[PositionEvent] = WatermarkStrategy
.forBoundedOutOfOrderness[PositionEvent](Duration.ofSeconds(10))
.withTimestampAssigner(new SerializableTimestampAssigner[PositionEvent] {
override def extractTimestamp(event: PositionEvent, recordTimestamp: Long): Long =
event.timestampUtc
})
val outputStream: DataStream[PositionStatistics] = tEnv
.toAppendStream[PositionEvent](observationsTable)
.assignTimestampsAndWatermarks(watermarkStrategy)
.map(v => (v.timestampUtc, v.tile, 1))
.keyBy(_._2)
.window(SlidingEventTimeWindows.of(Time.seconds(15), Time.seconds(5)))
.sum(2)
.map(PositionStatistics.tupled(_))
outputStream.print()
tEnv.createTemporaryView("PositionStatsTable", outputStream)
val outputAvroSchema =
"""{
| "type" : "record",
| "name" : "Event",
| "namespace" : "my.flink.tutorial",
| "fields" : [
| {"name" : "city", "type" : "string"},
| {"name" : "total", "type" : "int"},
| {"name" : "timestampUtc", "type" : "long"}
| ]
|}
|""".stripMargin
val sinkHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(outputCatalogHrn,
outputVolatileLayer,
Map("olp.catalog.layer-schema" -> outputAvroSchema))
tEnv.executeSql(
s"CREATE TABLE OutputIndexTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sinkHelper.options}")
tEnv.executeSql("""INSERT INTO
| OutputIndexTable
|SELECT
| 'Berlin' AS city,
| totalObservations AS total,
| timestampUtc,
| CAST(tile AS STRING) AS mt_partition
|FROM
| PositionStatsTable
|""".stripMargin)
try {
env.executeAsync()
logger.info(s"Stream to $outputCatalogHrn executed")
new Timer(true).schedule(new TimerTask {
override def run(): Unit = {
logger.info(s"Stream to $outputCatalogHrn canceled")
System.exit(0)
}
}, 30000)
} catch {
case ex: Exception =>
ex.printStackTrace()
}
}
class ComputeTileFunction() extends ScalarFunction {
private val tileLevel = 14
def eval(latitude: java.lang.Double, longitude: java.lang.Double): Long =
HereQuadFactory.INSTANCE
.getMapQuadByLocation(latitude, longitude, tileLevel)
.getLongKey
}
class AssignUuidFunction() extends ScalarFunction {
def eval(input: String): String =
java.util.UUID.randomUUID.toString
}
import com.here.hrn.HRN;
import com.here.olp.util.quad.factory.HereQuadFactory;
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper;
import com.here.platform.pipeline.PipelineContext;
import java.time.Duration;
import java.util.*;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StreamToVolatileLayerPipeline {
private static final PipelineContext pipelineContext = new PipelineContext();
private static final HRN sensorDataCatalogHrn =
pipelineContext.getConfig().getInputCatalogs().get("sensorData");
private static final HRN outputCatalogHrn = pipelineContext.getConfig().getOutputCatalog();
private static final String streamingLayer = "sample-streaming-layer";
private static final String outputIndexLayer = "volatile-layer-avro-data";
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger(StreamToVolatileLayerPipeline.class);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(5000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 10000L));
Map<String, String> properties = new HashMap<>();
properties.put("olp.kafka.group-name", "protobuf-streaming");
properties.put("olp.kafka.offset", "earliest");
OlpStreamConnectorHelper helper =
OlpStreamConnectorHelper.create(sensorDataCatalogHrn, streamingLayer, properties);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Schema schema = helper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE SensorDataTable %s WITH %s", schema, helper.options()));
tEnv.createTemporarySystemFunction("computeHereTile", new ComputeTileFunction());
tEnv.createTemporarySystemFunction("assignUuid", new AssignUuidFunction());
Table observationsTable =
tEnv.sqlQuery(
"SELECT"
+ " assignUuid(envelope.version) AS eventId, "
+ " timeStampUTC_ms AS timestampUtc, "
+ " computeHereTile(latitude_deg, longitude_deg) AS tile "
+ "FROM SensorDataTable "
+ "CROSS JOIN UNNEST(positionEstimate)");
WatermarkStrategy<Tuple3<String, Long, Long>> watermarkStrategy =
WatermarkStrategy.<Tuple3<String, Long, Long>>forBoundedOutOfOrderness(
Duration.ofSeconds(10))
.withTimestampAssigner((event, timestamp) -> event.f1);
DataStream<PositionStatistics> outputStream =
tEnv.toAppendStream(
observationsTable,
new TupleTypeInfo<Tuple3<String, Long, Long>>(Types.STRING, Types.LONG, Types.LONG))
.assignTimestampsAndWatermarks(watermarkStrategy)
.map(
new MapFunction<Tuple3<String, Long, Long>, Tuple3<Long, Long, Integer>>() {
@Override
public Tuple3<Long, Long, Integer> map(Tuple3<String, Long, Long> observation) {
return new Tuple3<>(observation.f1, observation.f2, 1);
}
})
.keyBy(selector -> selector.f1)
.window(SlidingEventTimeWindows.of(Time.seconds(15), Time.seconds(5)))
.sum(2)
.map(
new MapFunction<Tuple3<Long, Long, Integer>, PositionStatistics>() {
@Override
public PositionStatistics map(Tuple3<Long, Long, Integer> result) {
return new PositionStatistics(result.f0, result.f1, result.f2);
}
});
outputStream.print();
tEnv.createTemporaryView("PositionStatsTable", outputStream);
String outputAvroSchema =
" {\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"Event\",\n"
+ " \"namespace\" : \"my.flink.tutorial\",\n"
+ " \"fields\" : [\n"
+ " {\"name\" : \"city\", \"type\" : \"string\"},\n"
+ " {\"name\" : \"total\", \"type\" : \"int\"},\n"
+ " {\"name\" : \"timestampUtc\", \"type\" : \"long\"}\n"
+ " ]\n"
+ " }\n";
Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put("olp.catalog.layer-schema", outputAvroSchema);
OlpStreamConnectorHelper sinkHelper =
OlpStreamConnectorHelper.create(outputCatalogHrn, outputIndexLayer, sinkProperties);
Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format(
"CREATE TABLE OutputIndexTable %s WITH %s", sinkSchema, sinkHelper.options()));
tEnv.executeSql(
"INSERT INTO"
+ " OutputIndexTable "
+ "SELECT"
+ " 'Berlin' AS city,"
+ " totalObservations AS total,"
+ " timestampUtc,"
+ " CAST(tile AS STRING) AS mt_partition "
+ "FROM"
+ " PositionStatsTable");
try {
env.executeAsync();
logger.info("Stream to {} executed", outputCatalogHrn);
new Timer(true)
.schedule(
new TimerTask() {
@Override
public void run() {
logger.info("Stream to {} canceled", outputCatalogHrn);
System.exit(0);
}
},
30000);
} catch (Exception ex) {
ex.printStackTrace();
}
}
public static class ComputeTileFunction extends ScalarFunction {
private final int tileLevel = 14;
public long eval(Double latitude, Double longitude) {
return HereQuadFactory.INSTANCE
.getMapQuadByLocation(latitude, longitude, tileLevel)
.getLongKey();
}
}
public static class AssignUuidFunction extends ScalarFunction {
public String eval(String input) {
return UUID.randomUUID().toString();
}
}
public static class PositionStatistics {
public long tile;
public long timestampUtc;
public int totalObservations;
public PositionStatistics() {}
public PositionStatistics(long timestampUtc, long tile, int totalObservations) {
this.tile = tile;
this.timestampUtc = timestampUtc;
this.totalObservations = totalObservations;
}
@Override
public String toString() {
return tile + " " + timestampUtc + " " + totalObservations;
}
}
}
別の例では、前のステップで入力したボラタイル レイヤーを使用して、 Avro データ形式でデータを読み取り、受信したデータの一部の変換を実行し、前に作成したカタログから出力インデックス レイヤーに書き込みます。
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper
import com.here.platform.pipeline.PipelineContext
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.{tableConversions, StreamTableEnvironment}
import org.apache.flink.types.Row
import org.apache.flink.streaming.api.scala._
import org.slf4j.LoggerFactory
import java.util.{Timer, TimerTask}
object VolatileToIndexLayerScalaPipeline extends App {
private val logger = LoggerFactory.getLogger(VolatileToIndexLayerScalaPipeline.getClass)
private val pipelineContext = new PipelineContext
val catalogHrn = pipelineContext.config.outputCatalog
val inputVolatileLayer = "volatile-layer-avro-data"
val outputIndexLayer = "index-layer-parquet-data"
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.enableCheckpointing(5000)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 10000L))
val inputSchema =
"""{
| "type" : "record",
| "name" : "Event",
| "namespace" : "my.flink.tutorial",
| "fields" : [
| {"name" : "city", "type" : "string"},
| {"name" : "total", "type" : "int"},
| {"name" : "timestampUtc", "type" : "long"}
| ]
|}
|""".stripMargin
val tiles = List("377894440", "377894441", "377894442")
val properties =
Map(
"olp.layer.query" -> s"mt_partition=in=${tiles.mkString("(", ", ", ")")}",
"olp.catalog.layer-schema" -> inputSchema,
"olp.connector-refresh-interval" -> "-1"
)
val helper = OlpStreamConnectorHelper(catalogHrn, inputVolatileLayer, properties)
val tEnv = StreamTableEnvironment.create(env)
val schema = helper.prebuiltSchema(tEnv).build()
tEnv.executeSql(s"CREATE TABLE TableSource $schema WITH ${helper.options}")
tEnv.from("TableSource").toAppendStream[Row].print()
val outputSchema =
"""{
| "type" : "record",
| "name" : "Event",
| "namespace" : "my.flink.tutorial",
| "fields" : [
| {"name" : "city", "type" : "string"},
| {"name" : "total", "type" : "int"}
| ]
|}
|""".stripMargin
val sinkHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(catalogHrn,
outputIndexLayer,
Map("olp.catalog.layer-schema" -> outputSchema))
tEnv.executeSql(
s"CREATE TABLE Sink ${sinkHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sinkHelper.options}")
tEnv.executeSql("""INSERT INTO Sink
|SELECT
| city,
| total,
| CAST(mt_partition as BIGINT) as idx_tile_id,
| timestampUtc as idx_time_window
|FROM TableSource
|""".stripMargin)
try {
env.executeAsync()
logger.info(s"Stream to $catalogHrn executed")
new Timer(true).schedule(new TimerTask {
override def run(): Unit = {
logger.info(s"Stream to $catalogHrn canceled")
System.exit(0)
}
}, 30000)
} catch {
case ex: Exception =>
ex.printStackTrace()
}
}
import com.here.hrn.HRN;
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper;
import com.here.platform.pipeline.PipelineContext;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class VolatileToIndexLayerPipeline {
private static final PipelineContext pipelineContext = new PipelineContext();
private static final HRN catalogHrn = pipelineContext.config().getOutputCatalog();
private static final String inputVolatileLayer = "volatile-layer-avro-data";
private static final String outputIndexLayer = "index-layer-parquet-data";
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger(VolatileToIndexLayerPipeline.class);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(5000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 10000L));
String inputAvroSchema =
" {\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"Event\",\n"
+ " \"namespace\" : \"my.flink.tutorial\",\n"
+ " \"fields\" : [\n"
+ " {\"name\" : \"city\", \"type\" : \"string\"},\n"
+ " {\"name\" : \"total\", \"type\" : \"int\"},\n"
+ " {\"name\" : \"timestampUtc\", \"type\" : \"long\"}\n"
+ " ]\n"
+ " }\n";
String[] tiles = (new String[] {"377894440", "377894441", "377894442"});
String tilesQueryString = Arrays.stream(tiles).collect(Collectors.joining(", ", "(", ")"));
Map<String, String> properties = new HashMap<>();
properties.put("olp.layer.query", "mt_partition=in=" + tilesQueryString);
properties.put("olp.catalog.layer-schema", inputAvroSchema);
properties.put("olp.connector-refresh-interval", "-1");
OlpStreamConnectorHelper helper =
OlpStreamConnectorHelper.create(catalogHrn, inputVolatileLayer, properties);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Schema schema = helper.prebuiltSchema(tEnv).build();
tEnv.executeSql(String.format("CREATE TABLE TableSource %s WITH %s", schema, helper.options()));
tEnv.toAppendStream(tEnv.from("TableSource"), Row.class).print();
String outputParquetSchema =
" {\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"Event\",\n"
+ " \"namespace\" : \"my.flink.tutorial\",\n"
+ " \"fields\" : [\n"
+ " {\"name\" : \"city\", \"type\" : \"string\"},\n"
+ " {\"name\" : \"total\", \"type\" : \"int\"}\n"
+ " ]\n"
+ " }\n";
Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put("olp.catalog.layer-schema", outputParquetSchema);
OlpStreamConnectorHelper sinkHelper =
OlpStreamConnectorHelper.create(catalogHrn, outputIndexLayer, sinkProperties);
Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE Sink %s WITH %s", sinkSchema, sinkHelper.options()));
tEnv.executeSql(
"INSERT INTO Sink "
+ "SELECT"
+ " city,"
+ " total,"
+ " CAST(mt_partition as BIGINT) as idx_tile_id,"
+ " timestampUtc as idx_time_window "
+ "FROM TableSource");
try {
env.executeAsync();
logger.info("Stream to {} executed", catalogHrn);
new Timer(true)
.schedule(
new TimerTask() {
@Override
public void run() {
logger.info("Stream to {} canceled", catalogHrn);
System.exit(0);
}
},
30000);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
ローカルでコンパイルおよび実行します
アプリケーションをローカルで実行するには、次のコマンドを実行します。
mvn compile exec:java \
-Dexec.mainClass=StreamToVolatileLayerScalaPipeline \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf
mvn compile exec:java \
-Dexec.mainClass=StreamToVolatileLayerPipeline \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf
2 番目のアプリケーションを実行するには、次のコマンドを実行します。
mvn compile exec:java \
-Dexec.mainClass=VolatileToIndexLayerScalaPipeline \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf
mvn compile exec:java \
-Dexec.mainClass=VolatileToIndexLayerPipeline \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf
このチュートリアルで扱うトピックの詳細については、次のソースを参照してください。