Flink Connector と揮発性レイヤーの統合

ボラタイル レイヤー用の表出力と表ソースを作成します

Flink Connector API の主なエントリポイントはです OlpStreamConnectorHelper

Scala
Java
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper;

OlpStreamConnectorHelper Flink.table.api.Schema を作成して SQL ステートメントを構築するために使用されるのインスタンス。 次のコード スニペットで OlpStreamConnectorHelperは、 Flink.table.api.Schema のインスタンスを作成し、指定したスキーマおよびオプションを使用してテーブルを作成する方法を示します。

Scala
Java
// define the properties
val sourceProperties =
  Map(
    "olp.layer.query" -> "mt_partition=in=(1,2,3)"
  )

// create the Table Connector Helper Source
val sourceHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(inputCatalogHrn),
                           "volatile-layer-protobuf-input",
                           sourceProperties)

val tEnv = StreamTableEnvironment.create(env)
// register the Table Source
tEnv.executeSql(
  s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sourceHelper.options}")
OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));

ソースファクトリでは、次のボラタイル レイヤープロパティがサポートされています。

  • olp.layer.query: ボラタイル レイヤーのクエリに使用する RSQL クエリを指定します。 定義されていない場合は、値 "mt_timestamp=ge=0 " がデフォルトで使用され、すべてのパーティションが読み取られます。
  • olp.catalog.layer-schema: Parquet および Avro データ形式にのみ適用されます。 JSON 形式を使用する Avro スキーマ文字列です。
  • olp.connector.download-parallelism: 1 つの Flink タスクで並行して読み取られている blob の最大数。 タスクの数は、設定された並列処理に対応します。 その結果、パイプラインが並行して読み取ることができるブロブの数は 、このプロパティの値の並列処理レベルに等しくなります。 デフォルト値はです 10
  • olp.connector.download-timeout: Blob API からの blob の読み取りに適用される全体的なタイムアウト ( ミリ秒単位 ) 。 デフォルト値は 300000 ミリ秒です。

OlpStreamConnectorHelper Flink.table.api.Schema を作成して SQL ステートメントを構築するために使用されるのインスタンス。 次のコード スニペットで OlpStreamConnectorHelperは、 Flink.table.api.Schema のインスタンスを作成し、指定したスキーマおよびオプションを使用してテーブルを作成する方法を示します。

Table ソースと同じ方法でシンクを作成するには OlpStreamConnectorHelper、次のものを使用します。

Scala
Java
val sinkHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(outputCatalogHrn), "volatile-layer-protobuf-output", Map.empty)

tEnv.executeSql(
  s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sinkHelper.options}")
OlpStreamConnectorHelper sinkHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(outputCatalogHrn), outputLayerId, new HashMap<>());

Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));

シンクファクトリでは、揮発性レイヤーについて次のプロパティがサポートされています。

  • olp.catalog.layer-schema: Parquet および Avro データ形式にのみ適用されます。 JSON 形式を使用する Avro スキーマ文字列です。
  • olp.connector.aggregation-window: シンクが同じパーティション ID を持つ行を集約する頻度を定義するミリ秒単位のインターバル。 デフォルト値は 10000 ミリ秒です。 このプロパティは、 Avro 形式および Parquet 形式にのみ適用されます。
  • olp.connector.upload-parallelism: 1 つの Flink タスクで並列に書き込まれている blob の最大数。 タスクの数は、設定された並列処理に対応します。 その結果、パイプラインが並列に書き込むことができるブロブの数は 、このプロパティの値の並列処理レベルに等しくなります。 デフォルト値はです 10
  • olp.connector.upload-timeout: Blob API からの blob の書き込みに適用される全体的なタイムアウト ( ミリ秒単位 ) 。 デフォルト値は 300000 ミリ秒です。
  • olp.connector.publication-window: メタデータを公開 API に公開する頻度を定義します。 デフォルト値は 1000 ミリ秒です。 値がとして定義 -1されている場合、メタデータはパブリッシュされません。

データ形式

Flink Connector は、ボラタイル レイヤーペイロードの次のデータ形式をサポートしています。

  • 未加工。 デコードおよびエンコードロジックは適用されず、データペイロードがバイトの配列として取得されます。 Table スキーマは次のように表示されます。

    root
      |-- data: Array[Byte]
      |-- mt_partition: String
      |-- mt_timestamp: Long
      |-- mt_checksum: String
      |-- mt_crc: String
      |-- mt_dataSize: Long
      |-- mt_compressedDataSize: Long
    

    ペイロードデータを含む列が呼び出さ dataれます。 メタデータ列はデータ列の後に mt_ あり、接頭辞が付いています。

    この形式は、レイヤーのコンテンツタイプがとして設定されている場合に使用 application/octet-streamされます。

  • Protobuf 。 Flink は、添付された Protobuf スキーマ(レイヤー設定で指定したもの)を使用して Flink テーブルスキーマを派生します。

    root
      |-- protobuf_field_1: String
      |-- protobuf_field_2: String
      |-- probobuf_field_3.nested_column: Long
      |-- ...
      |-- mt_partition: String
      |-- mt_timestamp: Long
      |-- mt_checksum: String
      |-- mt_crc: String
      |-- mt_dataSize: Long
      |-- mt_compressedDataSize: Long
    

    Flink コネクタ Row は、最上位の Protobuf フィールドを最上位の列に配置し、メタデータ列を追跡します。

    この形式は、レイヤーのコンテンツタイプがとして設定され、 application/x-protobuf スキーマが指定されている場合に使用されます。 スキーマが指定されていない場合、エラーがスローされます。

    注 :

    Protobuf タイプ情報ベースのスキーマでは、自己参照 Flink フィールドを表す方法がないため、このフィールドはサポートされていません。

  • Avro 。 Flink は、渡された Avro スキーマ ( 出荷時のマップで指定したもの ) を使用して Flink テーブルスキーマを取得します。

    root
      |-- avro_field_1: String
      |-- avro_field_2: String
      |-- ...
      |-- mt_partition: String
      |-- mt_timestamp: Long
      |-- mt_checksum: String
      |-- mt_crc: String
      |-- mt_dataSize: Long
      |-- mt_compressedDataSize: Long
    

    Flink コネクタ Row は、最上位の Avro フィールドを最上位の列に配置し、メタデータ列を追跡します。

    この形式は、レイヤーのコンテンツタイプがとして設定され、 application/x-avro-binary スキーマが指定されている場合に使用されます。 スキーマが指定されていない場合、エラーがスローされます。

    警告 : コネクターの新しいバージョンでは、 Avro データ型のメタデータ列はサポートされていません。

  • 寄木細工。 Flink は、渡された Avro スキーマ ( 出荷時のマップで指定したもの ) を使用して Flink テーブルスキーマを取得します。

    root
      |-- parquet_field_1: String
      |-- parquet_field_2: String
      |-- ...
      |-- mt_partition: String
      |-- mt_timestamp: Long
      |-- mt_checksum: String
      |-- mt_crc: String
      |-- mt_dataSize: Long
      |-- mt_compressedDataSize: Long
    

    Flink コネクタ Row では、最上位の寄木細工のフィールドが最上位の列に配置され、メタデータの列が続きます。

    この形式は、レイヤーのコンテンツタイプがとして設定され、 application/x-parquet スキーマが指定されている場合に使用されます。 スキーマが指定されていない場合、エラーがスローされます。

    警告 : Connector の新バージョンでは、 Parquet データ型のメタデータ列はサポートされていません。

    現在、 Hadoop クライアントはストリーミング環境によって提供されていません。 その結果、寄木細工の形式を使用する場合は、脂肪質の jar に Hadoop クライアントの依存関係を含める必要があります。

Maven
SBT
<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.7.3</version>
        <scope>compile</scope>
        <exclusions>
            <exclusion>
                <groupId>org.apache.htrace</groupId>
                <artifactId>htrace-core</artifactId>
            </exclusion>
            <exclusion>
                <groupId>xerces</groupId>
                <artifactId>xercesImpl</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>
libraryDependencies ++=
Seq("org.apache.hadoop" % "hadoop-client" % "2.7.3" exclude ("org.apache.htrace", "htrace-core") exclude ("xerces", "xercesImpl"))
  • その他の形式

    レイヤーで前述の形式以外の形式を使用している場合は、エラーがスローされます。

表のソースと出力のスキーマが同じレイヤーに設定されています。

標準の Flink API を使用して、いつでも Table スキーマを印刷できます。

Scala
Java
// imagine that we have already registered InputTable
tEnv.from("InputTable").printSchema()
// imagine that we have already registered InputTable
tEnv.from("InputTable").printSchema();

生データの読み取りと書き込み

SQL の使用 :

Scala
Java
val tEnv = StreamTableEnvironment.create(env)

val partitions = (1 to 5).mkString(",")
val sourceProperties = Map("olp.layer.query" -> s"mt_partition=in=($partitions)",
                           "olp.connector.metadata-columns" -> "true")

val sourceHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(inputCatalogHrn), "volatile-layer-raw-input", sourceProperties)

tEnv.executeSql(
  s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sourceHelper.options}")

val sinkHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(outputCatalogHrn),
                           "volatile-layer-raw-output",
                           Map("olp.connector.metadata-columns" -> "true"))
tEnv.executeSql(
  s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sinkHelper.options}")

tEnv.executeSql("""
                  |INSERT INTO OutputTable
                  |SELECT
                  |  data,
                  |        mt_partition,
                  |        mt_timestamp,
                  |        mt_checksum,
                  |        mt_crc,
                  |        mt_dataSize,
                  |        mt_compressedDataSize
                  |    FROM InputTable
                  |""".stripMargin)
// define the properties
Map<String, String> sourceProperties = new HashMap<>();
sourceProperties.put("olp.layer.query", "mt_partition=in=(1,2,3)");
sourceProperties.put("olp.connector.metadata-columns", "true");

/// [read-write-raw-sql-java]
// register the Table Source
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));

Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put("olp.connector.metadata-columns", "true");

OlpStreamConnectorHelper sinkHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(outputCatalogHrn), outputLayerId, sinkProperties);

Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));

tEnv.executeSql(
    "INSERT INTO OutputTable SELECT data, mt_partition, mt_timestamp, mt_checksum, mt_crc, mt_dataSize, mt_compressedDataSize FROM InputTable");

Protobuf データの読み取りと書き込み

SQL の使用 :

Scala
Java
/// [create-table-source]
// define the properties
val sourceProperties =
  Map(
    "olp.layer.query" -> "mt_partition=in=(1,2,3)"
  )

// create the Table Connector Helper Source
val sourceHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(inputCatalogHrn),
                           "volatile-layer-protobuf-input",
                           sourceProperties)

val tEnv = StreamTableEnvironment.create(env)
// register the Table Source
tEnv.executeSql(
  s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sourceHelper.options}")
/// [create-table-source]

/// [create-table-sink]
val sinkHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(outputCatalogHrn), "volatile-layer-protobuf-output", Map.empty)

tEnv.executeSql(
  s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sinkHelper.options}")
/// [create-table-sink]

tEnv.executeSql(
  "INSERT INTO OutputTable SELECT * FROM InputTable"
)
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

/// [create-table-source-java]
OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));
/// [create-table-source-java]

/// [create-table-sink-java]
OlpStreamConnectorHelper sinkHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(outputCatalogHrn), outputLayerId, new HashMap<>());

Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));
/// [create-table-sink-java]

tEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable");

Avro データの読み取りと書き込み

SQL の使用 :

Scala
Java
val tEnv = StreamTableEnvironment.create(env)

    val inputLayerSchema = """
{
  "type" : "record",
  "name" : "Event",
  "namespace" : "my.example",
  "fields" : [
    {"name" : "event_timestamp", "type" : "long"},
    {"name" : "latitude", "type" : "double"},
    {"name" : "longitude", "type" : "double"}
  ]
}
    """

    val outputLayerSchema = """
{
  "type" : "record",
  "name" : "Event",
  "namespace" : "my.example",
  "fields" : [
    {"name" : "city", "type" : "string"},
    {"name" : "event_timestamp", "type" : "long"},
    {"name" : "latitude", "type" : "double"},
    {"name" : "longitude", "type" : "double"}
  ]
}
    """

    val sourceProperties =
      Map(
        "olp.catalog.layer-schema" -> inputLayerSchema,
        "olp.layer.query" -> "mt_partition=in=(1,2,3)"
      )

    val sourceHelper: OlpStreamConnectorHelper =
      OlpStreamConnectorHelper(HRN(inputCatalogHrn), "volatile-layer-avro-input", sourceProperties)

    tEnv.executeSql(
      s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
        s"WITH ${sourceHelper.options}")

    val sinkHelper: OlpStreamConnectorHelper =
      OlpStreamConnectorHelper(HRN(outputCatalogHrn),
                               "volatile-layer-avro-output",
                               Map("olp.catalog.layer-schema" -> outputLayerSchema))

    tEnv.executeSql(
      s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
        s"WITH ${sinkHelper.options}")

    tEnv.executeSql(
      """
INSERT INTO OutputTable
    SELECT
        'Berlin',
        event_timestamp,
        latitude,
        longitude
    FROM InputTable"""
    )
// register the Table Source
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));

// define sink properties
Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put("olp.catalog.layer-schema", outputLayerSchema);

OlpStreamConnectorHelper sinkHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(outputCatalogHrn), outputLayerId, sinkProperties);

tEnv.executeSql(
    String.format(
        "CREATE TABLE OutputTable(" + "`city` STRING" + ") WITH %s", sinkHelper.options()));

tEnv.executeSql("INSERT INTO OutputTable SELECT 'Berlin' FROM InputTable");

寄木細工のデータの読み取りと書き込み

SQL の使用 :

Scala
Java
val tEnv = StreamTableEnvironment.create(env)

    val inputLayerSchema = """
{
  "type" : "record",
  "name" : "Event",
  "namespace" : "my.example",
  "fields" : [
    {"name" : "event_timestamp", "type" : "long"},
    {"name" : "latitude", "type" : "double"},
    {"name" : "longitude", "type" : "double"}
  ]
}
    """

    val outputLayerSchema = """
{
  "type" : "record",
  "name" : "Event",
  "namespace" : "my.example",
  "fields" : [
    {"name" : "city", "type" : "string"},
    {"name" : "event_timestamp", "type" : "long"},
    {"name" : "latitude", "type" : "double"},
    {"name" : "longitude", "type" : "double"}
  ]
}
    """

    val sourceProperties =
      Map(
        "olp.catalog.layer-schema" -> inputLayerSchema,
        "olp.layer.query" -> "mt_partition=in=(1,2,3)"
      )

    val sourceHelper: OlpStreamConnectorHelper =
      OlpStreamConnectorHelper(HRN(inputCatalogHrn),
                               "volatile-layer-parquet-input",
                               sourceProperties)

    tEnv.executeSql(
      s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
        s"WITH ${sourceHelper.options}")

    val sinkHelper: OlpStreamConnectorHelper =
      OlpStreamConnectorHelper(HRN(outputCatalogHrn),
                               "volatile-layer-parquet-output",
                               Map("olp.catalog.layer-schema" -> outputLayerSchema))

    tEnv.executeSql(
      s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
        s"WITH ${sinkHelper.options}")

    tEnv.executeSql(
      """
INSERT INTO OutputTable
    SELECT
        'Berlin',
        event_timestamp,
        latitude,
        longitude
    FROM InputTable"""
    )
String inputLayerSchema =
    "{\"type\" : \"record\", \"name\" : \"Event\", \"namespace\" : \"my.example\", \"fields\" : [ {\"name\" : \"event_timestamp\", \"type\" : \"long\"}, {\"name\" : \"latitude\", \"type\" : \"double\"}, {\"name\" : \"longitude\", \"type\" : \"double\"} ] }";
String outputLayerSchema =
    "{\"type\" : \"record\", \"name\" : \"Event\", \"namespace\" : \"my.example\", \"fields\" : [ {\"name\" : \"city\", \"type\" : \"string\"}, {\"name\" : \"event_timestamp\", \"type\" : \"long\"}, {\"name\" : \"latitude\", \"type\" : \"double\"}, {\"name\" : \"longitude\", \"type\" : \"double\"} ] }";

// define source properties
Map<String, String> sourceProperties = new HashMap<>();
sourceProperties.put("olp.catalog.layer-schema", inputLayerSchema);
sourceProperties.put("olp.layer.query", "mt_partition=in=(1,2,3)");

/// [read-write-parquet-sql-java]
// register the Table Source
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));

// define sink properties
Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put("olp.catalog.layer-schema", outputLayerSchema);

OlpStreamConnectorHelper sinkHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(outputCatalogHrn), outputLayerId, sinkProperties);

Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();

tEnv.executeSql(
    String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));

tEnv.executeSql(
    "INSERT INTO OutputTable SELECT 'Berlin', event_timestamp, latitude, longitude, mt_partition FROM InputTable");

」に一致する結果は 件です

    」に一致する結果はありません