Flink Connector 移行ガイド( Flink 1.10 -> 1.13 )

Flink Connector の Flink バージョンを 1.10 から 1.13 にアップグレードした後、 API にいくつかの変更があります。

依存関係

"Flink-table-planner" の代わりに、次の 2 つの新しい依存関係を追加します。"Flink-clients" および "Flink-table-planner-bink"

変更前 :

Maven
SBT
<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.12</artifactId>
        <version>1.10.3</version>
        <scope>provided</scope>
    </dependency>
"org.apache.flink" %% "flink-table-planner" % "1.10.3" % "provided"

現在 :

Maven
SBT
<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>1.13.2</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_2.12</artifactId>
        <version>1.13.2</version>
        <scope>provided</scope>
    </dependency>
"org.apache.flink" %% "flink-table-planner" % "1.13.5" % "provided"

アップグレードインポート

Flink Connector API の新しいバージョンのメインエントリポイントの OlpStreamConnectorHelper場合、ではなくです

Scala
Java
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnection
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorDescriptorFactory
import com.here.platform.data.client.flink.javadsl.OlpStreamConnectorDescriptorFactory;
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnection;

インポートする必要があります

Scala
Java
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper;

テーブルの作成

以前のバージョンの Flink Connector では、OlpStreamConnectionのインスタンスを作成 し、 OlpStreamConnection テーブルスキーマによって提供されたConnectorDescriptorを使用して接続し、一時テーブルを作成しました。

Scala
Java
val streamSource: OlpStreamConnection =
    OlpStreamConnectorDescriptorFactory(HRN(inputCatalogHrn), inputLayerId)
    .createConnectorDescriptorWithSchema(sourceProperties)

tEnv
  .connect(streamSource.connectorDescriptor)
  .withSchema(streamSource.schema)
  .inAppendMode()
  .createTemporaryTable("InputTable")
OlpStreamConnection streamSource =
    OlpStreamConnectorDescriptorFactory.create(HRN.fromString(inputCatalogHrn), inputLayerId)
    .createConnectorDescriptorWithSchema(sourceProperties);

tEnv.connect(streamSource.connectorDescriptor())
    .withSchema(streamSource.schema())
    .inAppendMode()
    .createTemporaryTable("InputTable");

In Flink 1.13 メソッド connect は廃止されたため、 Flink Connector は新しい API を提供します。

Scala
Java
val helper = OlpStreamConnectorHelper(
    HRN(catalogHrn),
    layerId,
    properties
)

val schema = helper.prebuiltSchema(tEnv).build()

tEnv.executeSql(s"CREATE TABLE OutputTable $schema WITH ${helper.options}")
OlpStreamConnectorHelper helper = OlpStreamConnectorHelper.create(
    HRN.fromString(catalogHrn),
    layerId,
    properties
);

Schema schema = helper.prebuiltSchema(tEnv).build();

tEnv.executeSql(String.format("CREATE TABLE InputTable %s WITH %s", schema, helper.options()));

HERE では、まず カタログ HERE リソースネーム 、レイヤー ID 、およびプロパティを使用してOlpStreamConnectorHelperを作成します 。次に、スキーマを使用して TENV.executeSql を呼び出し、OlpStreamConnectorHelperプロパティで書式設定して、flink.table.api.Schemaテーブルを作成します。 ソーステーブルとシンクテーブルの作成方法は同じです。

SQL

メソッド sqlUpdate は廃止されまし executeSql たが、同じ方法で使用できます。 その代わりに、次のこと

Scala
Java
tEnv.sqlUpdate("INSERT INTO OutputTable SELECT * FROM InputTable")
tEnv.sqlUpdate("INSERT INTO OutputTable SELECT * FROM InputTable");

を使用できます

Scala
Java
tEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable")
tEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable");

」に一致する結果は 件です

    」に一致する結果はありません