Flink Connector とインタラクティブなマップレイヤーの統合

Interactive マップ レイヤーの表の出力と表のソースを作成します

Flink Connector API の主なエントリポイントはです OlpStreamConnectorHelper

Scala
Java
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper;

OlpStreamConnectorHelper Flink.table.api.Schema を作成して SQL ステートメントを構築するために使用されるのインスタンス。 次のコード スニペットで OlpStreamConnectorHelperは、 Flink.table.api.Schema のインスタンスを作成し、指定したスキーマおよびオプションを使用してテーブルを作成する方法を示します。

Scala
Java
// define the properties
val sourceProperties =
  Map(
    "olp.connector.mode" -> "read",
    "olp.connector-refresh-interval" -> "-1", // no continuous reading
    "olp.connector.metadata-columns" -> "true"
  )

// create the Table Connector Descriptor Source
val sourceHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(inputCatalogHrn), inputLayerId, sourceProperties)

// register the Table Source
val tEnv = StreamTableEnvironment.create(env)

tEnv.executeSql(
  s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sourceHelper.options}")
OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));

ソースファクトリでは、インタラクティブなマップソースについて次のプロパティがサポートされています。

  • olp.connector.mode: 読み取り
  • olp.layer.query: ボラタイル レイヤーのクエリに使用する RSQL クエリを指定します。 定義されていない場合、または空の場合、ウェブメルカトルメントの範囲内のすべてのフィーチャー (-85.05 ° ~ +85.05 ° の範囲の緯度 ) が読み取られます。
  • olp.connector-refresh-interval: レイヤー内の変更を検出する間隔 ( ミリ秒単位 ) 。 値 -1 を使用すると、連続読み取りが無効になります。 デフォルト値は 60000 です。
  • olp.connector.max-features-per-request: 単一のコールで、コネクタによってインタラクティブマップ レイヤーから要求された機能の数を制限します。 レイヤーに非常に大きな機能が含まれている場合は、この設定を調整します。デフォルトは 10000 です。
  • olp.connector.ignore-invalid-partitions: 最上位のズーム レベルの同じ場所に、許可さ れている数を超える機能がある場合 (OLP.Connector.max-features-per-request プロパティを使用 ) 、 true に設定すると、コネクタはそれらの機能をロードしようとしません。 デフォルト値は falseです。
  • olp.connector.download-parallelism: 1 つの Flink タスクで並行して実行された読み取りリクエストの最大数。 タスクの数が設定された並列処理に対応するため、パイプラインが並列に実行できる読み取り要求の数は、このプロパティの値にタスク並列処理を掛けた値になります。 デフォルト値は 5 です。
  • olp.connector.download-timeout: インタラクティブマップ レイヤーからの読み取りに適用される全体のタイムアウト ( ミリ秒単位 ) 。 デフォルト値は 300000 です。

Table ソースと同じ方法でシンクを作成するには OlpStreamConnectorHelper、次のものを使用します。

Scala
Java
// define the properties
val sinkProperties =
  Map(
    "olp.connector.mode" -> "write",
    "olp.connector.metadata-columns" -> "true"
  )

// create the Table Connector Descriptor Sink
val sinkHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(outputCatalogHrn), outputLayerId, sinkProperties)

// register the Table Sink

tEnv.executeSql(
  s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sinkHelper.options}")
OlpStreamConnectorHelper sinkHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(outputCatalogHrn), outputLayerId, sinkProperties);

Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));

シンクファクトリでは、インタラクティブマップシンク用に次のプロパティがサポートされています。

  • olp.connector.mode: 書き込み
  • olp.connector.aggregation-window: パイプラインによって変更された機能を使用してインタラクティブマップ レイヤーを更新する時間間隔をミリ秒単位で定義します。 デフォルト値は 1000 ミリ秒です。
  • olp.connector.upload-parallelism: 1 つの Flink タスクで並行して実行された書き込みリクエストの最大数。 タスクの数が設定された並列処理に対応するため、パイプラインが並列に実行できる読み取り要求の数は、このプロパティの値にタスク並列処理を掛けた値になります。 デフォルト値は 5 です。
  • olp.connector.upload-timeout: 対話型マップ レイヤーへの書き込みに適用される全体のタイムアウト ( ミリ秒単位 ) 。 デフォルト値は 300000 です。

Flink コネクタの設定の一般的な説明について は、 HERE を参照してください。

データスキーマ

テーブルスキーマを作成する前 HRNに、ヘルパーは渡されたを使用してカタログ設定を取得します。 次に、渡されたのデータ形式を確認します layerId。 最後のステップとして、 Flink Connector は自動的に Flink.table.api.Schema を派生します。

表のソースと出力には、同じインタラクティブマップ レイヤーで異なるスキーマがあります。

表ソーススキーマにはすべての列が含まれています

root
|-- geometry: ROW<`type` STRING, `coordinates` STRING>
|-- properties: MAP<STRING, STRING>
|-- customMembers: MAP<STRING, STRING>
|-- mt_id: STRING
|-- mt_tags: ARRAY<STRING>
|-- mt_datahub: ROW<`mt_updatedAt` BIGINT, `mt_createdAt` BIGINT>

Table Sink スキーマには書き込み可能な列のみが含まれています

root
|-- geometry: ROW<`type` STRING, `coordinates` STRING>
|-- properties: MAP<STRING, STRING>
|-- customMembers: MAP<STRING, STRING>
|-- mt_id: STRING
|-- mt_tags: ARRAY<STRING>

列は、 GeoJSON 機能のフィールドから取得されます

説明
Geometry.type Point 、 LineString 、 Polygon 、 MultiPoint 、 MultiLineString 、または MultiPolygon
Geometry.coordinates 文字列化された GeoJSON 座標。 GeoJSON 座標の意味 については、ジオ座標の章を参照してください。
プロパティ GeoJSON フィーチャのプロパティのマップ。各値は文字列化された JSON として表示されます
顧客メンバー GeoJSON 仕様に記述されていないメンバーのマップ。各値は文字列化された JSON です
mt_id 機能 ID
mt_tags 機能に関連付けられているタグの配列
mt_datahub.mt_updatedAt 対話型マップ レイヤーの最後の更新の UNIX タイムスタンプ ( ミリ秒単位 )
mt_datahub.mt_createdAt インタラクティブマップ レイヤーで作成された UNIX タイムスタンプ(ミリ秒単位)

JSON テキストをエンコードまたはデコードする SQL 関数

文字列化された JSON は、ユーザー定義の関数によって型指定されたデータに変換できます

Scala
Java
tEnv.executeSql(
  "CREATE FUNCTION fromJsonString AS 'com.here.platform.data.client.flink.common.sqlfunctions.FromJsonString'")
tEnv.executeSql(
  "CREATE FUNCTION toJsonString AS 'com.here.platform.data.client.flink.common.sqlfunctions.ToJsonString'")
tEnv.createTemporaryFunction("fromJsonString", FromJsonString.class);
tEnv.createTemporaryFunction("toJsonString", ToJsonString.class);

Interactive マップ データを参照してください

Scala
Java
val table = tEnv.sqlQuery("SELECT fromJsonString(geometry.coordinates) FROM InputTable")
tEnv.sqlQuery("SELECT fromJsonString(geometry.coordinates) FROM InputTable");

Interactive マップ データを書き込みます

Scala
Java
// column order is geometry, properties, customMembers, mt_id, mt_tags
tEnv.executeSql("""INSERT OVERWRITE OutputTable VALUES
                  |(('Point','[8.0, 50.0]'),
                  | MAP[
                  |  'p1', toJsonString(1),
                  |  'p2', toJsonString('value2'),
                  |  'p3', '{
                  |    "a": false,
                  |    "b": 3.2,
                  |    "c": "p3.3"
                  |   }'
                  | ],
                  | MAP['r1', toJsonString(true)],
                  | 'id1',
                  | ARRAY['tagA','tagB']
                  |)""".stripMargin)
// column order is geometry, properties, customMembers, mt_id, mt_tags
tEnv.executeSql(
    "INSERT OVERWRITE OutputTable VALUES "
        + "(('Point','[8.0, 50.0]'), "
        + " MAP[ "
        + "  'p1', toJsonString(1), "
        + "  'p2', toJsonString('value2'), "
        + "  'p3', '{ "
        + "    \"a\": false, "
        + "    \"b\": 3.2, "
        + "    \"c\": \"p3.3\" "
        + "   }' "
        + " ], "
        + " MAP['r1', toJsonString(true)], "
        + " 'id1', "
        + " ARRAY['tagA','tagB'] "
        + ")");

Interactive マップ データの読み取りと書き込み

Scala
Java
tEnv.executeSql(
  "INSERT INTO OutputTable " +
    "SELECT geometry, properties, customMembers, mt_id, mt_tags FROM InputTable")
tEnv.executeSql(
    "INSERT INTO OutputTable "
        + "SELECT geometry, properties, customMembers, mt_id, mt_tags FROM InputTable");

」に一致する結果は 件です

    」に一致する結果はありません