Flink Connector とストリームレイヤーの統合

ストリーム レイヤー用の表出力と表ソースを作成します

Flink Connector API の主なエントリポイントはです OlpStreamConnectorHelper

Scala
Java
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper;

OlpStreamConnectorHelper Flink.table.api.Schema を作成して SQL ステートメントを構築するために使用されるのインスタンス。 次のコード スニペットで OlpStreamConnectorHelperは、 Flink.table.api.Schema のインスタンスを作成し、指定したスキーマおよびオプションを使用してテーブルを作成する方法を示します。

Scala
Java
// define the properties
val properties = Map(
  "olp.kafka.group-name" -> "protobuf-streaming-test",
  "olp.kafka.offset" -> "earliest"
)

// create the Table Connector Descriptor Source
val sourceHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(inputCatalogHrn), "sample-streaming-layer", properties)

// register the Table Source
val tEnv = StreamTableEnvironment.create(env)

tEnv.executeSql(
  s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sourceHelper.options}")
OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);

Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));

ソースファクトリでは、ストリームレイヤーについて次のプロパティがサポートされています。

  • olp.kafka.group-name: 必須。 Kafka Consumer 設定のグループ ID 設定を取得または作成するために使用します。
  • olp.kafka.offset: 「 earthly 」または「 latest 」のいずれかの値を取得できます。値を Kafka auto.offset.reset consumer 設定に変換します。

テーブルスキーマを作成する前 HRNに、ヘルパーは渡されたを使用してカタログ設定を取得します。 次に、渡されたのデータ形式とスキーマが存在するかどうかを確認 layerIdします。 最後のステップとして、 Flink Connector は自動的にレイヤースキーマを Flink.table.api.Schema に変換します。

次のセクションでは、スキーマ変換の仕組みについて説明します。

Table ソースと同じ方法でシンクを作成するには OlpStreamConnectorHelper、次のものを使用します。

Scala
Java
val sinkHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(outputCatalogHrn), "protobuf-stream-in", Map.empty)

tEnv.executeSql(
  s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sinkHelper.options}")
OlpStreamConnectorHelper sinkHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(outputCatalogHrn), outputLayerId, new HashMap<>());

Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));

ストリームレイヤー Sinkに適用されるプロパティがありません Map 上記のコード スニペットでは空の値が渡されます。

データ形式

Flink Connector は、ストリーム レイヤーペイロードの次のデータ形式をサポートしています。

  • 未加工。 デコードおよびエンコードロジックは適用されず、データペイロードがバイトの配列として取得されます。 Table スキーマは次のように表示されます。

    root
      |-- data: Array[Byte]
      |-- mt_partition: String
      |-- mt_timestamp: Long
      |-- mt_checksum: String
      |-- mt_crc: String
      |-- mt_dataSize: Long
      |-- mt_compressedDataSize: Long
    

    ペイロードデータを含む列が呼び出さ dataれます。 メタデータ列はデータ列の後に mt_ あり、接頭辞が付いています。 メタデータ列のリストで mt_partition は、必須列のみが入力されています。 REST は任意 null で、これらの値として使用できます。

    この形式は、レイヤーのコンテンツタイプがとして設定されている場合に使用 application/octet-streamされます。

  • Protobuf 。 Flink は、添付された Protobuf スキーマ(レイヤー設定で指定したもの)を使用して Flink テーブルスキーマを派生します。

    root
      |-- protobuf_field_1: String
      |-- protobuf_field_2: String
      |-- probobuf_field_3.nested_column: Long
      |-- ...
      |-- mt_partition: String
      |-- mt_timestamp: Long
      |-- mt_checksum: String
      |-- mt_crc: String
      |-- mt_dataSize: Long
      |-- mt_compressedDataSize: Long
    

    Flink コネクタ Row は、最上位の Protobuf フィールドを最上位の列に配置し、メタデータ列を追跡します。

    この形式は、レイヤーのコンテンツタイプがとして設定され、 application/x-protobuf スキーマが指定されている場合に使用されます。 スキーマが指定されていない場合、エラーがスローされます。

    注 :

    Protobuf タイプ情報ベースのスキーマでは、自己参照 Flink フィールドを表す方法がないため、このフィールドはサポートされていません。

  • その他の形式

    レイヤーで前述の 2 つ以外の形式を使用している場合は、エラーがスローされます。

表のソースと出力のスキーマが同じレイヤーに設定されています。

標準の Flink API を使用して、いつでも Table スキーマを印刷できます。

Scala
Java
// imagine that we have already registered InputTable
tEnv.from("InputTable").printSchema()
tEnv.from("OutputTable").printSchema();

生データの読み取りと書き込み

SQL の使用 :

Scala
Java
val tEnv = StreamTableEnvironment.create(env)

val sourceProperties =
  Map(
    "olp.kafka.group-name" -> "raw-streaming-group",
    "olp.kafka.offset" -> "earliest",
    "olp.connector.metadata-columns" -> "true"
  )

val sourceHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(inputCatalogHrn), "raw-stream-in", sourceProperties)

tEnv.executeSql(
  s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sourceHelper.options}")

val sinkHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(outputCatalogHrn),
                           "raw-stream-out",
                           Map("olp.connector.metadata-columns" -> "true"))

tEnv.executeSql(
  s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sinkHelper.options}")

tEnv.executeSql(
  "INSERT INTO OutputTable SELECT data, mt_partition, mt_timestamp, mt_checksum, mt_crc, mt_dataSize, mt_compressedDataSize FROM InputTable"
)
// register the Table Source
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));

Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put("olp.connector.metadata-columns", "true");

OlpStreamConnectorHelper sinkHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(outputCatalogHrn), outputLayerId, sinkProperties);

Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));

tEnv.executeSql(
    "INSERT INTO OutputTable SELECT data, mt_partition, mt_timestamp, mt_checksum, mt_crc, mt_dataSize, mt_compressedDataSize FROM InputTable");

Protobuf データの読み取りと書き込み

SQL の使用 :

Scala
Java
/// [create-table-source]
// define the properties
val properties = Map(
  "olp.kafka.group-name" -> "protobuf-streaming-test",
  "olp.kafka.offset" -> "earliest"
)

// create the Table Connector Descriptor Source
val sourceHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(inputCatalogHrn), "sample-streaming-layer", properties)

// register the Table Source
val tEnv = StreamTableEnvironment.create(env)

tEnv.executeSql(
  s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sourceHelper.options}")

/// [create-table-source]

/// [create-table-sink]
val sinkHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(outputCatalogHrn), "protobuf-stream-in", Map.empty)

tEnv.executeSql(
  s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sinkHelper.options}")
/// [create-table-sink]

// we assume that the input and output tables have the same schema
tEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable")
// register the Table Source
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

/// [create-table-source-java]
OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);

Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));
/// [create-table-source-java]

/// [create-table-sink-java]
OlpStreamConnectorHelper sinkHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(outputCatalogHrn), outputLayerId, new HashMap<>());

Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));
/// [create-table-sink-java]

// we assume that the input and output tables have the same schema
tEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable");

Protobuf データの読み取りと生データの書き込み

SDII スキーマを含む Probufif データから座標を抽出し、各座標のペアを個別のメッセージとして Raw 形式のストリーム レイヤーに書き込むことができます。 まず、座標をバイトの配列としてエンコードする方法を定義する必要があります。 次の簡単なアプローチを使用できます。

Scala
Java
/** Decodes an array of bytes of size 16 into an array of doubles of size 2 */
class DecodeCoordinates extends ScalarFunction {
  private def byteArrayToDouble(array: Array[Byte]): Double = ByteBuffer.wrap(array).getDouble

  def eval(bytes: Array[Byte]): Array[Double] =
    Array(byteArrayToDouble(bytes.take(8)), byteArrayToDouble(bytes.slice(8, 16)))

}

/** Encodes two Double values into a byte array of size 16 */
class EncodeCoordinates extends ScalarFunction {
  private def doubleToByteArray(value: Double): Array[Byte] = {
    val longBits = java.lang.Double.doubleToLongBits(value)
    ByteBuffer.allocate(8).putLong(longBits).array()
  }

  def eval(longitude: Double, latitude: Double): Array[Byte] =
    doubleToByteArray(longitude) ++ doubleToByteArray(latitude)

}
/** Encodes two Double values into a byte array of size 16 */
@SuppressWarnings("serial")
public class EncodeCoordinatesJava extends ScalarFunction {
  private byte[] doubleToByteArray(Double value) {
    long longBits = Double.doubleToLongBits(value);
    return ByteBuffer.allocate(8).putLong(longBits).array();
  }

  public byte[] eval(Double longitude, Double latitude) {
    byte[] result = Arrays.copyOf(doubleToByteArray(longitude), 16);
    System.arraycopy(doubleToByteArray(latitude), 0, result, 8, 8);
    return result;
  }
}
/** Decodes an array of bytes of size 16 into an array of doubles of size 2 */
@SuppressWarnings("serial")
public class DecodeCoordinatesJava extends ScalarFunction {
  private Double byteArrayToDouble(byte[] array) {
    return ByteBuffer.wrap(array).getDouble();
  }

  public Double[] eval(byte[] bytes) {
    byte[] firstDoubleArray = Arrays.copyOfRange(bytes, 0, 8);
    byte[] secondDoubleArray = Arrays.copyOfRange(bytes, 8, 16);
    return new Double[] {byteArrayToDouble(firstDoubleArray), byteArrayToDouble(secondDoubleArray)};
  }
}

次に、上記のエンコード関数を使用して、座標のペアをバイトの配列に変換し、その配列をペイロードとしてストリーム レイヤーに Raw 形式で書き込みます。

Scala
Java
val sourceProperties =
  Map(
    "olp.kafka.group-name" -> "my-consumer-group",
    "olp.kafka.offset" -> "latest",
    "olp.connector.metadata-columns" -> "true"
  )

val sourceHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(inputCatalogHrn), "sample-streaming-layer", sourceProperties)

val tEnv = StreamTableEnvironment.create(env)

tEnv.executeSql(
  s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sourceHelper.options}")

val sinkHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(outputCatalogHrn),
                           outputLayerId,
                           Map("olp.connector.metadata-columns" -> "true"))

tEnv.executeSql(
  s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sinkHelper.options}")

tEnv.createTemporarySystemFunction("ENCODE_COORDINATES", new EncodeCoordinates)
tEnv.from("InputTable").printSchema()

tEnv.executeSql(
  """
    INSERT INTO
        OutputTable
    SELECT
        ENCODE_COORDINATES(longitude_deg, latitude_deg) as data,
        mt_partition, mt_timestamp, mt_checksum, mt_crc, mt_dataSize, mt_compressedDataSize
    FROM
        InputTable CROSS JOIN UNNEST(path.positionEstimate) AS p
  """
)
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));

Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put("olp.connector.metadata-columns", "true");

OlpStreamConnectorHelper sinkHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(outputCatalogHrn), outputLayerId, sinkProperties);

Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));

tEnv.createTemporaryFunction("ENCODE_COORDINATES", EncodeCoordinatesJava.class);

tEnv.executeSql(
    "INSERT INTO "
        + "    OutputTable "
        + "SELECT "
        + "    ENCODE_COORDINATES(longitude_deg, latitude_deg) as data, "
        + "    mt_partition, mt_timestamp, mt_checksum, mt_crc, mt_dataSize, mt_compressedDataSize "
        + "FROM"
        + "    InputTable CROSS JOIN UNNEST(positionEstimate) AS p");

このデータを Raw ストリーム レイヤーから読み取る場合は、上記で説明したデコード機能を使用して、バイトの配列を座標のペアに変換できます。

Scala
Java
val properties = Map(
  GROUP_NAME_PROPERTY.key() -> "my-consumer-group",
  OFFSET_PROPERTY.key() -> "latest"
)

val sourceHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(inputCatalogHrn), inputLayerId, properties)

val tEnv = StreamTableEnvironment.create(env)

tEnv.executeSql(
  s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sourceHelper.options}")

tEnv.createTemporarySystemFunction("DECODE_COORDINATES", new DecodeCoordinates)

val result: Table = tEnv.sqlQuery(
  """
    SELECT
        coordinates[1] AS longitude,
        coordinates[2] as latitude
    FROM (SELECT DECODE_COORDINATES(data) AS coordinates FROM InputTable)"""
)

val stream = tEnv.toAppendStream[(Double, Double)](result)
stream.print()
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));
tEnv.createTemporarySystemFunction("DECODE_COORDINATES", new DecodeCoordinatesJava());

tEnv.executeSql("SELECT * FROM InputTable").print();

Table result =
    tEnv.sqlQuery(
        "SELECT coordinates[1] AS longitude, "
            + "coordinates[2] as latitude "
            + "FROM (SELECT DECODE_COORDINATES(data) AS coordinates "
            + "FROM InputTable)");

TupleTypeInfo<Tuple2<Double, Double>> tupleType =
    new TupleTypeInfo<>(Types.DOUBLE, Types.DOUBLE);

DataStream<Tuple2<Double, Double>> stream = tEnv.toAppendStream(result, tupleType);
stream.print();

Kafka メッセージでのデータの実際のエンコード / デコード方法

mt_partition このフィールドは、メッセージキーとしてエンコード / デコードされます。 data 「 Protobuf データの読み取り」および「未加工データの書き込み」セクションで説明されているように、このフィールドはメッセージ値としてエンコード / デコードされます。 残りのメタデータ列は、メッセージヘッダーとしてエンコード / デコードされます。

application.conf の設定

application.conf この設定を使用して、コネクタタイプ( http-connector または kafka-connector )、および Kafka のコンシューマプロパティとプロデューサプロパティを指定できます。 http-Connector または Kafka-Connector の詳細については 、「ストリーミングコネクタの選択 : Kafka または HTTP

」に一致する結果は 件です

    」に一致する結果はありません