Flink Connector とインデックスレイヤーの統合

インデックス レイヤー用の表出力と表ソースを作成します

Flink Connector API の主なエントリポイントはです OlpStreamConnectorHelper

Scala
Java
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper;

OlpStreamConnectorHelper Flink.table.api.Schema を作成して SQL ステートメントを構築するために使用されるのインスタンス。 次のコード スニペットで OlpStreamConnectorHelperは、 Flink.table.api.Schema のインスタンスを作成し、指定したスキーマおよびオプションを使用してテーブルを作成する方法を示します。

Scala
Java
// define the properties
val sourceProperties =
  Map(
    "olp.layer.query" -> "event_id=in=(1,2,3)",
    "olp.connector.query-parallelism" -> "100"
  )

// create the Table Connector Descriptor Source
val sourceHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(inputCatalogHrn), "index-layer-protobuf-input", sourceProperties)

val tEnv = StreamTableEnvironment.create(env)
// register the Table Source

tEnv.executeSql(
  s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sourceHelper.options}")
// register the Table Source
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));

ソースファクトリでは、インデックスレイヤーについて次のプロパティがサポートされています。

  • olp.layer.query: インデックス レイヤーのクエリに使用する RSQL クエリを指定します。 定義されていない場合、値「 timestamp=ge=0 」がデフォルトで使用され、すべてのパーティションが読み取られます。
  • olp.connector.query-parallelism: サブクエリの数を指定するフラグ。メタデータ のクエリに使用する並列処理のレベルを間接的に設定します。 デフォルト値は equals 20です。
  • olp.catalog.layer-schema: Parquet および Avro データ形式にのみ適用されます。 JSON 形式を使用する Avro スキーマ文字列です。
  • olp.connector.download-parallelism: 1 つの Flink タスクで並行して読み取られている blob の最大数。 タスクの数は、設定された並列処理に対応します。 その結果、パイプラインが並行して読み取ることができるブロブの数は 、並列処理レベルにこのプロパティの値を掛けた値に等しくなります。 デフォルト値はです 10
  • olp.connector.download-timeout: Blob API からの blob の読み取りに適用される全体的なタイムアウト ( ミリ秒単位 ) 。 デフォルト値は 300000 ミリ秒です。

テーブルスキーマを作成するために、ヘルパー HRNは渡されたを使用してカタログ設定を取得します。 次に、渡されたのデータ形式とスキーマが存在するかどうかを確認 layerIdします。 最後のステップとして、 Flink Connector は自動的にレイヤースキーマを Flink.table.api.Schema に変換します。

Table ソースと同じ方法でシンクを作成するには OlpStreamConnectorHelper、次のものを使用します。

Scala
Java
val sinkHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(outputCatalogHrn), "index-layer-protobuf-output", Map.empty)

tEnv.executeSql(
  s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sinkHelper.options}")
OlpStreamConnectorHelper sinkHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(outputCatalogHrn), outputLayerId, new HashMap<>());

Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));

シンクファクトリでは、インデックスレイヤーの次のプロパティがサポートされています。

  • olp.catalog.layer-schema: Parquet および Avro データ形式にのみ適用されます。 JSON 形式を使用する Avro スキーマ文字列です。
  • olp.connector.aggregation-window: シンクが同じインデックス列を持つ行を集約する頻度を定義するミリ秒単位のインターバル。 デフォルト値は 10000 ミリ秒です。 このプロパティは、 Avro 形式および Parquet 形式にのみ適用されます。
  • olp.connector.publication-window: メタデータを公開 API に公開する頻度を定義します。 デフォルト値は 1000 ミリ秒です。 値がとして定義 -1されている場合、メタデータはパブリッシュされません。

データ形式

Flink Connector は、インデックス レイヤーペイロードの次のデータ形式をサポートしています。

  • 未加工。 デコードおよびエンコードロジックは適用されず、データペイロードはバイトの配列として取得されます。 Table スキーマは次のように表示されます。

    root
      |-- data: Array[Byte]
      |-- idx_index_field1: Long
      |-- idx_index_field2: String
      |-- ...
      |-- mt_metadata: Map[String, String]
      |-- mt_timestamp: Long
      |-- mt_checksum: String
      |-- mt_crc: String
      |-- mt_dataSize: Long
      |-- mt_compressedDataSize: Long
    

    ペイロードデータを含む列が呼び出さ dataれます。 ユーザー定義のインデックスフィールドはデータ列の後に idx_ あり、プレフィックスが付いています。 メタデータ列はインデックス列の後に mt_ あり、プレフィックスが付いています。

    この形式は、レイヤーのコンテンツタイプがとして設定されている場合に使用 application/octet-streamされます。

  • Protobuf 。 Flink は、添付された Protobuf スキーマ(レイヤー設定で指定したもの)を使用して Flink テーブルスキーマを派生します。

    root
      |-- protobuf_field_1: String
      |-- protobuf_field_2: String
      |-- probobuf_field_3.nested_column: Long
      |-- ...
      |-- idx_index_field1: Long
      |-- idx_index_field2: String
      |-- ...
      |-- mt_metadata: Map[String, String]
      |-- mt_timestamp: Long
      |-- mt_checksum: String
      |-- mt_crc: String
      |-- mt_dataSize: Long
      |-- mt_compressedDataSize: Long
    

    Flink コネクタ Row では、最上位の Protobuf フィールドが最上位の列に配置され、インデックス列とメタデータ列が続きます。

    この形式は、レイヤーのコンテンツタイプがとして設定され、 application/x-protobuf スキーマが指定されている場合に使用されます。 スキーマが指定されていない場合、エラーがスローされます。

    注 :

    Protobuf タイプ情報ベースのスキーマでは、自己参照 Flink フィールドを表す方法がないため、このフィールドはサポートされていません。

  • Avro 。 Flink は、渡された Avro スキーマ ( 出荷時のマップで指定したもの ) を使用して Flink テーブルスキーマを取得します。

    root
      |-- avro_field_1: String
      |-- avro_field_2: String
      |-- ...
      |-- idx_index_field1: Long
      |-- idx_index_field2: String
      |-- ...
      |-- mt_metadata: Map[String, String]
      |-- mt_timestamp: Long
      |-- mt_checksum: String
      |-- mt_crc: String
      |-- mt_dataSize: Long
      |-- mt_compressedDataSize: Long
    

    Flink コネクタ Row では、最上位の Avro フィールドが最上位の列に配置され、インデックス列とメタデータ列が続きます。

    この形式は、レイヤーのコンテンツタイプがとして設定され、 application/x-avro-binary スキーマが指定されている場合に使用されます。 スキーマが指定されていない場合、エラーがスローされます。

    警告 : コネクターの新しいバージョンでは、 Avro データ型のメタデータ列はサポートされていません。

  • 寄木細工。 Flink は、渡された Avro スキーマ ( 出荷時のマップで指定したもの ) を使用して Flink テーブルスキーマを取得します。

    root
      |-- parquet_field_1: String
      |-- parquet_field_2: String
      |-- ...
      |-- idx_index_field1: Long
      |-- idx_index_field2: String
      |-- ...
      |-- mt_metadata: Map[String, String]
      |-- mt_timestamp: Long
      |-- mt_checksum: String
      |-- mt_crc: String
      |-- mt_dataSize: Long
      |-- mt_compressedDataSize: Long
    

    Flink コネクタ Row では、最上位の寄木細工のフィールドが最上位の列に配置され、インデックス列とメタデータ列が続きます。

    この形式は、レイヤーのコンテンツタイプがとして設定され、 application/x-parquet スキーマが指定されている場合に使用されます。 スキーマが指定されていない場合、エラーがスローされます。

    警告 : Connector の新バージョンでは、 Parquet データ型のメタデータ列はサポートされていません。

    現在、 Hadoop クライアントはストリーミング環境によって提供されていません。 その結果、寄木細工の形式を使用する場合は、脂肪質の jar に Hadoop クライアントの依存関係を含める必要があります。

Maven
SBT
<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.7.3</version>
        <scope>compile</scope>
        <exclusions>
            <exclusion>
                <groupId>org.apache.htrace</groupId>
                <artifactId>htrace-core</artifactId>
            </exclusion>
            <exclusion>
                <groupId>xerces</groupId>
                <artifactId>xercesImpl</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>
libraryDependencies ++=
Seq("org.apache.hadoop" % "hadoop-client" % "2.7.3" exclude ("org.apache.htrace", "htrace-core") exclude ("xerces", "xercesImpl"))
  • その他の形式

    レイヤーで前述の形式以外の形式を使用している場合は、エラーがスローされます。

表のソースと出力のスキーマが同じレイヤーに設定されています。

標準の Flink API を使用して、いつでも Table スキーマを印刷できます。

Scala
Java
// imagine that we have already registered InputTable
tEnv.from("InputTable").printSchema()
tEnv.from("InputTable").printSchema();

生データの読み取りと書き込み

SQL の使用 :

Scala
Java
val tEnv = StreamTableEnvironment.create(env)

val sourceProperties =
  Map(
    "olp.layer.query" -> "event_id=in=(1,2,3)",
    "olp.connector.query-parallelism" -> "100",
    "olp.connector.metadata-columns" -> "true"
  )

val sourceHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(inputCatalogHrn), "index-layer-raw-input", sourceProperties)

tEnv.executeSql(
  s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sourceHelper.options}")

val sinkHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(outputCatalogHrn),
                           "index-layer-raw-output",
                           Map("olp.connector.metadata-columns" -> "true"))

tEnv.executeSql(
  s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sinkHelper.options}")

tEnv.executeSql(
  """
INSERT INTO
    OutputTable
SELECT
    data,
    idx_ingestion_time,
    idx_event_id,
    idx_event_type,
    mt_metadata,
    mt_timestamp,
    mt_checksum,
    mt_crc,
    mt_dataSize,
    mt_compressedDataSize
FROM InputTable"""
)
// define the properties
Map<String, String> sourceProperties = new HashMap<>();
sourceProperties.put("olp.layer.query", "event_id=in=(1,2,3)");
sourceProperties.put("olp.connector.query-parallelism", "100");
sourceProperties.put("olp.connector.metadata-columns", "true");

// register the Table Source
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));

Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put("olp.connector.metadata-columns", "true");

OlpStreamConnectorHelper sinkHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(outputCatalogHrn), outputLayerId, sinkProperties);

Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));

tEnv.executeSql(
    "INSERT INTO OutputTable SELECT data, idx_ingestion_time, idx_event_id, idx_event_type, mt_metadata, mt_timestamp, mt_checksum, mt_crc, mt_dataSize, mt_compressedDataSize FROM InputTable");

Protobuf データの読み取りと書き込み

SQL の使用 :

Scala
Java
/// [create-table-source]
// define the properties
val sourceProperties =
  Map(
    "olp.layer.query" -> "event_id=in=(1,2,3)",
    "olp.connector.query-parallelism" -> "100"
  )

// create the Table Connector Descriptor Source
val sourceHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(inputCatalogHrn), "index-layer-protobuf-input", sourceProperties)

val tEnv = StreamTableEnvironment.create(env)
// register the Table Source

tEnv.executeSql(
  s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sourceHelper.options}")
/// [create-table-source]

/// [create-table-sink]
val sinkHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(outputCatalogHrn), "index-layer-protobuf-output", Map.empty)

tEnv.executeSql(
  s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sinkHelper.options}")
/// [create-table-sink]

tEnv.executeSql(
  "INSERT INTO OutputTable SELECT * FROM InputTable"
)
// define the properties
Map<String, String> sourceProperties = new HashMap<>();
sourceProperties.put("olp.layer.query", "event_id=in=(1,2,3)");
sourceProperties.put("olp.connector.query-parallelism", "100");

/// [create-table-source-java]
// register the Table Source
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));
/// [create-table-source-java]

/// [create-table-sink-java]
OlpStreamConnectorHelper sinkHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(outputCatalogHrn), outputLayerId, new HashMap<>());

Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));
/// [create-table-sink-java]

// we assume that the input and output tables have the same schema
tEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable");

Avro データの読み取りと書き込み

SQL の使用 :

Scala
Java
val tEnv = StreamTableEnvironment.create(env)

    val inputLayerSchema = """
{
  "type" : "record",
  "name" : "Event",
  "namespace" : "my.example",
  "fields" : [
    {"name" : "event_timestamp", "type" : "long"},
    {"name" : "latitude", "type" : "double"},
    {"name" : "longitude", "type" : "double"}
  ]
}
    """

    val outputLayerSchema = """
{
  "type" : "record",
  "name" : "Event",
  "namespace" : "my.example",
  "fields" : [
    {"name" : "city", "type" : "string"},
    {"name" : "event_timestamp", "type" : "long"},
    {"name" : "latitude", "type" : "double"},
    {"name" : "longitude", "type" : "double"}
  ]
}
    """

    val sourceProperties =
      Map(
        "olp.catalog.layer-schema" -> inputLayerSchema,
        "olp.layer.query" -> "event_id=in=(1,2,3)",
        "olp.connector.query-parallelism" -> "100"
      )

    val sourceHelper: OlpStreamConnectorHelper =
      OlpStreamConnectorHelper(HRN(inputCatalogHrn), "index-layer-avro-input", sourceProperties)

    tEnv.executeSql(
      s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
        s"WITH ${sourceHelper.options}")

    val sinkProperties =
      Map(
        "olp.catalog.layer-schema" -> outputLayerSchema
      )

    val sinkHelper: OlpStreamConnectorHelper =
      OlpStreamConnectorHelper(HRN(outputCatalogHrn), "index-layer-avro-output", sinkProperties)

    tEnv.executeSql(
      s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
        s"WITH ${sinkHelper.options}")

    tEnv.executeSql(
      """
INSERT INTO OutputTable
    SELECT
        'Berlin',
        event_timestamp,
        latitude,
        longitude,
        idx_ingestion_time,
        idx_event_id,
        idx_event_type
    FROM InputTable"""
    )
// define source properties
Map<String, String> sourceProperties = new HashMap<>();
sourceProperties.put("olp.catalog.layer-schema", inputLayerSchema);
sourceProperties.put("olp.layer.query", "event_id=in=(1,2,3)");
sourceProperties.put("olp.connector.query-parallelism", "100");

// register the Table Source
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));

// define sink properties
Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put("olp.catalog.layer-schema", outputLayerSchema);

OlpStreamConnectorHelper sinkHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(outputCatalogHrn), outputLayerId, sinkProperties);

Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));

tEnv.executeSql(
    "INSERT INTO OutputTable SELECT 'Berlin', event_timestamp, latitude, longitude, idx_ingestion_time, idx_event_id, idx_event_type FROM InputTable");

寄木細工のデータの読み取りと書き込み

SQL の使用 :

Scala
Java
val tEnv = StreamTableEnvironment.create(env)

    val inputLayerSchema = """
{
  "type" : "record",
  "name" : "Event",
  "namespace" : "my.example",
  "fields" : [
    {"name" : "event_timestamp", "type" : "long"},
    {"name" : "latitude", "type" : "double"},
    {"name" : "longitude", "type" : "double"}
  ]
}
    """

    val outputLayerSchema = """
{
  "type" : "record",
  "name" : "Event",
  "namespace" : "my.example",
  "fields" : [
    {"name" : "city", "type" : "string"},
    {"name" : "event_timestamp", "type" : "long"},
    {"name" : "latitude", "type" : "double"},
    {"name" : "longitude", "type" : "double"}
  ]
}
    """

    val sourceProperties =
      Map(
        "olp.catalog.layer-schema" -> inputLayerSchema,
        "olp.layer.query" -> "event_id=in=(1,2,3)",
        "olp.connector.query-parallelism" -> "100"
      )

    val sourceHelper: OlpStreamConnectorHelper =
      OlpStreamConnectorHelper(HRN(inputCatalogHrn), "index-layer-parquet-input", sourceProperties)

    tEnv.executeSql(
      s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
        s"WITH ${sourceHelper.options}")

    val sinkHelper: OlpStreamConnectorHelper =
      OlpStreamConnectorHelper(HRN(outputCatalogHrn),
                               "index-layer-parquet-output",
                               Map("olp.catalog.layer-schema" -> outputLayerSchema))

    tEnv.executeSql(
      s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
        s"WITH ${sinkHelper.options}")

    tEnv.executeSql(
      """
INSERT INTO OutputTable
    SELECT
        'Berlin',
        event_timestamp,
        latitude,
        longitude,
        idx_ingestion_time,
        idx_event_id,
        idx_event_type
    FROM InputTable"""
    )
// define source properties
Map<String, String> sourceProperties = new HashMap<>();
sourceProperties.put("olp.catalog.layer-schema", inputLayerSchema);
sourceProperties.put("olp.layer.query", "event_id=in=(1,2,3)");
sourceProperties.put("olp.connector.query-parallelism", "100");

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), "index-layer-parquet-input", sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));

// define sink properties
Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put("olp.catalog.layer-schema", outputLayerSchema);

OlpStreamConnectorHelper sinkHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(outputCatalogHrn), "index-layer-parquet-output", sinkProperties);

Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();

tEnv.executeSql(
    String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));

tEnv.executeSql(
    "INSERT INTO OutputTable SELECT 'Berlin', event_timestamp, latitude, longitude, idx_ingestion_time, idx_event_id, idx_event_type FROM InputTable");

」に一致する結果は 件です

    」に一致する結果はありません