Flink Connector とバージョンレイヤーの統合

バージョンレイヤーのテーブルソースを作成します

Flink Connector API の主なエントリポイントはです OlpStreamConnectorHelper

Scala
Java
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper;

OlpStreamConnectorHelper Flink.table.api.Schema を作成して SQL ステートメントを構築するために使用されるのインスタンス。 次のコード スニペットで OlpStreamConnectorHelperは、 Flink.table.api.Schema のインスタンスを作成し、指定したスキーマおよびオプションを使用してテーブルを作成する方法を示します。

Scala
Java
// define the properties
val properties = Map(
  "olp.layer.query" -> "mt_version==LATEST"
)

// create the Table Source
val sourceHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(inputCatalogHrn), "sample-versioned-layer", properties)

// register the Table Source
val tEnv = StreamTableEnvironment.create(env)

tEnv.executeSql(
  s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sourceHelper.options}")
OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));

ソースファクトリでは、バージョンレイヤーについて次のプロパティがサポートされています。

  • olp.layer.query: RSQL クエリ言語で記述 された文字列で、バージョンレイヤーをクエリします。 定義されていない場合、値 "mt_version==latest" がデフォルトで使用され、最新バージョンに存在するすべてのパーティションが読み取られます。
  • olp.connector.download-parallelism: 1 つの Flink タスクで並行して読み取られている blob の最大数。 タスクの数は、設定された並列処理に対応します。 その結果、パイプラインが並行して読み取ることができるブロブの数は 、並列処理レベルにこのプロパティの値を掛けた値になります。 デフォルト値はです 10
  • olp.connector.download-timeout: Blob API から blob を読み取るときに適用される全体的なタイムアウト ( ミリ秒単位 ) 。 デフォルト値は 300000 ミリ秒です。

テーブルスキーマを作成する前 HRNに、ヘルパーは渡されたを使用してカタログ設定を取得します。 次に、渡されたのデータ形式とスキーマが存在するかどうかを確認 layerIdします。 最後のステップとして、 Flink Connector は自動的にレイヤースキーマを Flink.table.api.Schema に変換します。

データ形式

Flink Connector は、バージョンレイヤーペイロードの次のデータ形式をサポートしています。

  • 未加工。 デコードおよびエンコードロジックは適用されず、データペイロードがバイトの配列として取得されます。 Table スキーマは次のように表示されます。

    root
      |-- data: Array[Byte]
      |-- mt_partition: String
      |-- mt_version: Long
      |-- mt_timestamp: Long
      |-- mt_checksum: String
      |-- mt_crc: String
      |-- mt_dataSize: Long
      |-- mt_compressedDataSize: Long
    

    ペイロードデータを含む列が呼び出さ dataれます。 メタデータ列はデータ列の後に mt_ あり、接頭辞が付いています。 メタデータ列のリストで mt_partition は、必須列のみが入力されています。 REST は任意 null で、これらの値として使用できます。

    この形式は、レイヤーのコンテンツタイプがとして設定されている場合に使用 application/octet-streamされます。

  • Protobuf 。 Flink は、添付された Protobuf スキーマ(レイヤー設定で指定したもの)を使用して Flink テーブルスキーマを派生します。

    root
      |-- protobuf_field_1: String
      |-- protobuf_field_2: String
      |-- probobuf_field_3.nested_column: Long
      |-- ...
      |-- mt_partition: String
      |-- mt_version: Long
      |-- mt_timestamp: Long
      |-- mt_checksum: String
      |-- mt_crc: String
      |-- mt_dataSize: Long
      |-- mt_compressedDataSize: Long
    

    Flink コネクタ Row は、最上位の Protobuf フィールドを最上位の列に配置し、メタデータ列を追跡します。

    この形式は、レイヤーのコンテンツタイプがとして設定され、 application/x-protobuf スキーマが指定されている場合に使用されます。 スキーマが指定されていない場合、エラーがスローされます。

    注 :

    Protobuf タイプ情報ベースのスキーマでは、自己参照 Flink フィールドを表す方法がないため、このフィールドはサポートされていません。

  • Avro 。 Flink は、渡された Avro スキーマ ( 出荷時のマップで指定したもの ) を使用して Flink テーブルスキーマを取得します。

    root
      |-- avro_field_1: String
      |-- avro_field_2: String
      |-- ...
      |-- mt_partition: String
      |-- mt_version: Long
      |-- mt_timestamp: Long
      |-- mt_checksum: String
      |-- mt_crc: String
      |-- mt_dataSize: Long
      |-- mt_compressedDataSize: Long
    

    Flink コネクタ Row は、最上位の Avro フィールドを最上位の列に配置し、メタデータ列を追跡します。

    この形式は、レイヤーのコンテンツタイプがとして設定され、 application/x-avro-binary スキーマが指定されている場合に使用されます。 スキーマが指定されていない場合、エラーがスローされます。

    警告 : コネクターの新しいバージョンでは、 Avro データ型のメタデータ列はサポートされていません。

  • 寄木細工。 Flink は、渡された Avro スキーマ ( 出荷時のマップで指定したもの ) を使用して Flink テーブルスキーマを取得します。

    root
      |-- parquet_field_1: String
      |-- parquet_field_2: String
      |-- ...
      |-- mt_partition: String
      |-- mt_version: Long
      |-- mt_timestamp: Long
      |-- mt_checksum: String
      |-- mt_crc: String
      |-- mt_dataSize: Long
      |-- mt_compressedDataSize: Long
    

    Flink コネクタ Row では、最上位の寄木細工のフィールドが最上位の列に配置され、メタデータの列が続きます。

    この形式は、レイヤーのコンテンツタイプがとして設定され、 application/x-parquet スキーマが指定されている場合に使用されます。 スキーマが指定されていない場合、エラーがスローされます。

    警告 : Connector の新バージョンでは、 Parquet データ型のメタデータ列はサポートされていません。

    Apache paret-Avro モジュールでは、 Hadoop クライアントがクラスパスで使用できることを想定しています。

    現在、 Hadoop クライアントはストリーミング環境によって提供されていません。 その結果、寄木細工の形式を使用する場合は、脂肪質の jar に Hadoop クライアントの依存関係を含める必要があります。

Maven
SBT
<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.7.3</version>
        <scope>compile</scope>
        <exclusions>
            <exclusion>
                <groupId>org.apache.htrace</groupId>
                <artifactId>htrace-core</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>
libraryDependencies ++=
Seq("org.apache.hadoop" % "hadoop-client" % "2.7.3" exclude ("org.apache.htrace", "htrace-core"))
  • その他の形式

    レイヤーで前述の形式以外の形式を使用している場合は、エラーがスローされます。

同じレイヤーのテーブルソースのスキーマが同じです。

標準の Flink API を使用して、いつでも Table スキーマを印刷できます。

Scala
Java
// imagine that we have already registered InputTable
tEnv.from("InputTable").printSchema()
// imagine that we have already registered InputTable
tEnv.from("InputTable").printSchema();

未加工データを読み取ります

SQL の使用 :

Scala
Java
/// [create-table-source]
// define the properties
val properties = Map(
  "olp.layer.query" -> "mt_version==LATEST"
)

// create the Table Source
val sourceHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(inputCatalogHrn), "sample-versioned-layer", properties)

// register the Table Source
val tEnv = StreamTableEnvironment.create(env)

tEnv.executeSql(
  s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sourceHelper.options}")

/// [create-table-source]
// register the Table Source
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

/// [create-table-source-java]
OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));
/// [create-table-source-java]

Table result =
    tEnv.sqlQuery(
        "SELECT tileId, mt_timestamp, mt_checksum, mt_dataSize, mt_crc FROM InputTable");

Protobuf データを読み取ります

SQL の使用 :

Scala
Java
val sourceProperties = Map[String, String]("olp.layer.query" -> "mt_version==LATEST",
                                           "olp.connector.metadata-columns" -> "true")

val sourceHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(inputCatalogHrn), "sample-versioned-layer", sourceProperties)

val tEnv = StreamTableEnvironment.create(env)

tEnv.executeSql(
  s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sourceHelper.options}")

tEnv.from("InputTable").printSchema()

val result: Table = tEnv.sqlQuery("""
SELECT
    tileId,
    mt_timestamp,
    mt_checksum,
    mt_dataSize,
    mt_crc
FROM InputTable
  """)

tEnv
  .toAppendStream[Row](result)
  .print()
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));

Table result =
    tEnv.sqlQuery(
        "SELECT tileId, messages, mt_timestamp, mt_checksum, mt_crc, mt_dataSize, mt_compressedDataSize FROM InputTable");

DataStream<Row> stream = tEnv.toAppendStream(result, Row.class);
stream.print();

Avro データを読み取ります

SQL の使用 :

Scala
Java
val tEnv = StreamTableEnvironment.create(env)

    val layerSchema =
      """
{
  "type" : "record",
  "name" : "Event",
  "namespace" : "my.example",
  "fields" : [
    {"name" : "city", "type" : "string"},
    {"name" : "event_timestamp", "type" : "long"},
    {"name" : "latitude", "type" : "double"},
    {"name" : "longitude", "type" : "double"}
  ]
}
    """
    val sourceProperties = Map(
      "olp.catalog.layer-schema" -> layerSchema,
      "olp.layer.query" -> "mt_version==LATEST"
    )

    val sourceHelper: OlpStreamConnectorHelper =
      OlpStreamConnectorHelper(HRN(inputCatalogHrn), "version-layer-avro-output", sourceProperties)

    tEnv.executeSql(
      s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
        s"WITH ${sourceHelper.options}")

    val result: Table = tEnv.sqlQuery("""
    SELECT
        city,
        event_timestamp,
        latitude,
        longitude
    FROM InputTable
      """)

    tEnv
      .toAppendStream[Row](result)
      .print()
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));

Table result = tEnv.sqlQuery("SELECT 'Berlin', refs, messages FROM InputTable");

DataStream<Row> stream = tEnv.toAppendStream(result, Row.class);
stream.print();

寄木細工のデータを読み取ります

SQL の使用 :

Scala
Java
val tEnv = StreamTableEnvironment.create(env)

    val inputLayerSchema = """
{
  "type" : "record",
  "name" : "Event",
  "namespace" : "my.example",
  "fields" : [
    {"name" : "event_timestamp", "type" : "long"},
    {"name" : "latitude", "type" : "double"},
    {"name" : "longitude", "type" : "double"}
  ]
}
    """

    val sourceProperties = Map(
      "olp.catalog.layer-schema" -> inputLayerSchema,
      "olp.layer.query" -> "mt_version==LATEST"
    )

    val sourceHelper: OlpStreamConnectorHelper =
      OlpStreamConnectorHelper(HRN(inputCatalogHrn),
                               "version-layer-parquet-input",
                               sourceProperties)

    tEnv.executeSql(
      s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
        s"WITH ${sourceHelper.options}")

    val result: Table = tEnv.sqlQuery("""
    SELECT
        city,
        event_timestamp,
        latitude,
        longitude
    FROM InputTable
      """)

    tEnv
      .toAppendStream[Row](result)
      .print()
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));

Table result = tEnv.sqlQuery("SELECT 'Berlin', refs, messages FROM InputTable");

DataStream<Row> stream = tEnv.toAppendStream(result, Row.class);
stream.print();

」に一致する結果は 件です

    」に一致する結果はありません