Flink アプリケーションを開発します

目的: シンプルな Flink アプリケーションを開発します。

複雑さ: 初心者向け

所要時間: 30 分

ソースコード: ダウンロード

このチュートリアルでは、ストリーム レイヤー からデータを読み取る簡単な Flink アプリケーションを開発、デバッグ、および実行する方法を示します。また、このすべてのデータを log4j を使用してログに記録します。

このチュートリアルでは、次のトピックについて説明します。

Maven プロジェクトを設定します

チュートリアルの最初にソースコードをダウンロードして任意のフォルダーに配置するか、プロジェクトのフォルダー構造を最初から作成します。

develop-flink-application
└── src
    └── main
        ├── java
        └── resources
        └── scala

この操作は、次の bash 1 つのコマンドで実行できます。

mkdir -p develop-flink-application/src/main/{java,resources,scala}

Maven POM ファイルは 、 Maven 設定の確認 の例のファイルと似ていますが、更新された親 POM と依存関係セクションがあります。

親 POM sdk-stream-bom_${scala.compat.version}は、 Flink 関連のライブラリを使用する必要があるためです。

<parent>
    <groupId>com.here.platform</groupId>
    <artifactId>sdk-stream-bom_2.12</artifactId>
    <version>2.54.3</version>
    <relativePath/>
</parent>

次の依存関係が使用されます。

  • com.here.platform.data.client:local-support_${scala.compat.version} ローカルデータカタログからデータを読み取る。
  • org.apache.flink:flink-clients_${scala.compat.version} ExecutorFactory Flink アプリケーションを提供します。
  • org.apache.flink:flink-streaming-java_${scala.compat.version} Java Flink アプリケーションを実行します。
  • org.apache.flink:flink-streaming-scala_${scala.compat.version} Scala Flink アプリケーションを実行します。
  • com.here.platform.data.client:flink-support_${scala.compat.version} プラットフォーム のデータカタログからデータを読み取る。
  • org.slf4j:slf4j-log4j12 アプリケーションの結果をコンソールおよびプラットフォーム の Splunk に記録します。
  • com.here.platform.pipeline:pipeline-interface_${scala.compat.version} から入力カタログに関する情報を取得 PipelineContextします。

依存関係 :

<dependencies>
    <dependency>
        <groupId>com.here.platform.data.client</groupId>
        <artifactId>flink-support_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.platform.data.client</groupId>
        <artifactId>local-support_2.12</artifactId>
        <exclusions>
            <exclusion>
                <groupId>com.here.platform.data.client</groupId>
                <artifactId>client-core_2.12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>com.here.platform.pipeline</groupId>
        <artifactId>pipeline-interface_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
    </dependency>
</dependencies>

pom.xml 必要なすべての依存関係をファイルに追加したら、次のステップでアプリケーションのコードを記述して実行します。

ソースコードを入力します

前に説明したように、このチュートリアルでは、ストリーム レイヤー からデータを読み取り、コンソールにデータを出力する単純な Flink アプリケーションを記述する方法を示します。 OLP CLI を使用してストリーム アプリケーションを実行すると、データがストリーム レイヤー に追加されます。レイヤーから読み取られたすべてのデータは 、 log4j を使用して同時にコンソールに記録されます。 log4j の設定は src/main/resources/log4j.properties ファイルにあります。

この Flink アプリケーションの実装について見てみましょう。 以下のコード スニペット では、が FlinkQueryApi ストリーム レイヤー に追加された新しいデータを受け取るサブスクリプションの作成に使用されていることを確認できます。 サブスクリプションが作成され DataStreamSource 、ストリーム レイヤー から新しいデータを送信するソース関数が宣言されると SampleMaper 、拡張さ RichMapFunctionれたクラスを使用してパーティションをマップします。このクラスは、セットアップメソッドとティアダウンメソッドを提供します。 SampleMapper このクラスは、パーティションおよびログを人間が読める文字列でダウンロードします。

Java
Scala

/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.hrn.HRN;
import com.here.platform.data.client.flink.javadsl.FlinkDataClient;
import com.here.platform.data.client.flink.javadsl.FlinkQueryApi;
import com.here.platform.data.client.flink.javadsl.FlinkReadEngine;
import com.here.platform.data.client.javadsl.Partition;
import com.here.platform.data.client.settings.ConsumerSettings;
import com.here.platform.pipeline.PipelineContext;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DevelopFlinkApplication {

  private static final Logger LOGGER = LoggerFactory.getLogger(DevelopFlinkApplication.class);

  private static final String STREAMING_LAYER = "streaming-layer";
  private static final String CONSUMER_GROUP_NAME = "flink";
  private static HRN catalogHrn;

  public static void main(String[] args) throws Exception {

    // Create pipeline context
    PipelineContext pipelineContext = new PipelineContext();

    // Get input catalog hrn from the pipeline-config.conf file
    catalogHrn = pipelineContext.getConfig().getInputCatalogs().get("input-catalog");

    // Create queryApi for the source catalog
    final FlinkDataClient flinkDataClient = new FlinkDataClient();
    FlinkQueryApi queryApi = flinkDataClient.queryApi(catalogHrn);

    // Create the context in which a streaming program is executed
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // Specify Kafka consumer group name
    ConsumerSettings consumerSettings =
        new ConsumerSettings.Builder()
            .withGroupName(CONSUMER_GROUP_NAME)
            .withLatestOffset()
            .build();

    // Subscription to receive new data added to the stream layer
    SourceFunction<Partition> subscriptionFunction =
        queryApi.subscribe(STREAMING_LAYER, consumerSettings);

    // Source function that emits new data from stream layer
    DataStreamSource<Partition> partitions = env.addSource(subscriptionFunction);

    // Read partition bytes, map to human-readable string
    // using the Sample Mapper class that implements RichMapFunction recommended to use by Data
    // Client Library,
    // and log using log4j
    partitions.map(new SampleMapper());

    // Trigger the program execution
    env.execute();
  }

  private static class SampleMapper extends RichMapFunction<Partition, String> {
    private transient FlinkDataClient flinkDataClient;
    private transient FlinkReadEngine flinkReadEngine;

    @Override
    public void open(Configuration parameters) {
      // Create read engine to read the partition bytes
      flinkDataClient = new FlinkDataClient();
      flinkReadEngine = flinkDataClient.readEngine(catalogHrn);
    }

    @Override
    public String map(Partition partition) {
      String partitionContent = new String(flinkReadEngine.getDataAsBytes(partition));
      LOGGER.info(partitionContent);
      return partitionContent;
    }

    @Override
    public void close() {
      // FlinkDataClient is a heavyweight object that needs to be created once, reused and
      // terminated.
      flinkDataClient.terminate();
    }
  }
}

 

/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.hrn.HRN
import com.here.platform.data.client.flink.javadsl.{FlinkDataClient, FlinkReadEngine}
import com.here.platform.data.client.javadsl.Partition
import com.here.platform.data.client.settings.{ConsumerSettings, LatestOffset}
import com.here.platform.pipeline.PipelineContext
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.slf4j.LoggerFactory

import java.io.Serializable

object DevelopFlinkApplicationScala {

  private final val Logger = LoggerFactory.getLogger(classOf[DevelopFlinkApplication])

  private final val StreamingLayer = "streaming-layer"
  private final val ConsumerGroupName = "flink"

  def main(args: Array[String]): Unit = {

    // Create pipeline context
    val pipelineContext = new PipelineContext

    // Get input catalog hrn from the pipeline-config.conf file
    val catalogHrn = pipelineContext.getConfig.getInputCatalogs.get("input-catalog")

    // Create queryApi for the source catalog
    val client = new FlinkDataClient
    val queryApi = client.queryApi(catalogHrn)

    // Create the context in which a streaming program is executed
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // Specify Kafka consumer group name
    val consumerSettings = ConsumerSettings(ConsumerGroupName, LatestOffset)

    // Subscription to receive new data added to the stream layer
    val subscriptionFunction = queryApi.subscribe(StreamingLayer, consumerSettings)

    // Source function that emits new data from stream layer
    val partitions = env.addSource(subscriptionFunction)

    // Read partition bytes, map to human-readable string
    // using the Sample Mapper class that implements RichMapFunction recommended to use by Data Client Library,
    // and log using log4j
    partitions.map(new SampleMapper(catalogHrn))

    // Trigger the program execution
    env.execute

  }

  class SampleMapper(hrn: HRN) extends RichMapFunction[Partition, String] with Serializable {
    @transient
    private lazy val flinkDataClient: FlinkDataClient = new FlinkDataClient()

    @transient
    private lazy val flinkReadEngine: FlinkReadEngine =
      // Create read engine to read the partition bytes
      flinkDataClient.readEngine(hrn)

    override def map(partition: Partition): String = {
      val partitionContent = new String(flinkReadEngine.getDataAsBytes(partition))
      Logger.info(partitionContent)
      partitionContent
    }

    override def close(): Unit =
      // FlinkDataClient is a heavyweight object that needs to be created once, reused and terminated.
      flinkDataClient.terminate()
  }
}

コードが記述されると、リソースを準備してアプリケーションを実行できます。

アプリケーションを実行します

アプリケーションを実行するには、リソースを準備する必要があります。つまり、データが書き込まれるストリーム レイヤー を使用してカタログを作成します。

このチュートリアルでは、アプリケーションをローカルで実行します。したがって、ローカルカタログを作成するだけで十分です。 ローカルカタログを使用しているため、このチュートリアルを実行するための認証および外部ネットワークへのアクセスは必要ありません。 これらの名前はローカルマシンに含まれているため、レルム内での命名の競合の影響を受けません。また、任意の名前を使用できます。

ストリーム レイヤー を使用してローカルカタログを作成するに catalog-with-stream-layer.json は、チュートリアルの冒頭でソースコードとともにダウンロードした設定 ファイルが必要です。 ここには、 1 つの OLP CLI コマンドのみを使用して、レイヤーを含むカタログを作成できるようにする設定が含まれています。

olp local catalog create streaming-catalog streaming-catalog --config catalog-with-stream-layer.json

catalog-with-stream-layer.json ファイルの構造は次のとおりです。


{
  "id": "input-catalog",
  "name": "Develop Flink Application",
  "summary": "Catalog for Stream Data",
  "description": "The catalog containing the notification stream",
  "layers": [
    {
      "id": "streaming-layer",
      "name": "Notification Stream",
      "summary": "Stream for notification",
      "description": "Stream for notification",
      "layerType": "stream",
      "volume": {
        "volumeType": "durable"
      },
      "partitioning": {
        "scheme": "generic"
      },
      "contentType": "text/plain"
    }
  ]
}

レルムで請求タグが必要な場合 は、layerセクションにbillingTags: ["YOUR_BILLING_TAG"]プロパティを追加して設定 ファイルを更新します。

「 Maven プロジェクトを設定PipelineContextpipeline-config.conf 」の章で説明したように、はファイルから入力カタログ に関する情報を取得するために使用されます。 pipeline-config.conf ファイルの構造は次のとおりです。


pipeline.config {

  output-catalog {hrn = "OUTPUT_CATALOG_HRN"}

  input-catalogs {
    input-catalog {hrn = "INPUT_CATALOG_HRN"}
  }
}

チュートリアルでは出力カタログを使用しませんが、入力カタログ HERE リソースネームを使用して pipeline-config.conf ファイル内の OUTPUT_CATALOG_HRNINPUT_CATALOG_HRN の両方を置き換えることができます。

プレースホルダーを置き換えた後、次のコマンドを使用して、ダウンロードしたチュートリアルのルートからアプリケーションを実行します。

Java
Scala

mvn compile exec:java -D"exec.mainClass"="DevelopFlinkApplication" \
-Dhere.platform.data-client.endpoint-locator.discovery-service-env=local \
-Dspark.master=local[*] \
-Dpipeline-config.file=pipeline-config.conf


mvn compile exec:java -D"exec.mainClass"="DevelopFlinkApplicationScala"
-Dhere.platform.data-client.endpoint-locator.discovery-service-env=local \
-Dspark.master=local[*] \
-Dpipeline-config.file=pipeline-config.conf

このコマンドには、次のパラメータがあります。

  • exec.mainClass –アプリケーションを実行するためのエントリポイント。
  • here.platform.data-client.endpoint-locator.discovery-service-env=local –ローカルカタログのみを使用するようにデータ クライアント ライブラリ を設定します。

この時点では、アプリケーションは実行中ですが、入力カタログ にデータがありません。 いくつかのパーティションを入れてみましょう。 新しいコンソールウィンドウを開き、 First HERE Platform Flink Application 次の bash スクリプトを実行します。このスクリプトは、コンテンツのデータを 5 秒ごとに 10 回ストリーム レイヤー にアップロードします。

Bash

#!/usr/bin/env bash

# Copyright (c) 2018-2023 HERE Europe B.V.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.



# Local catalog hrn with stream layer
CATALOG_HRN=$1
# Folder with data that is uploaded to the stream layer
FOLDER_WITH_DATA="./src/main/resources/data"


# Upload data to the stream layer 10 times every 5 seconds
for i in {1..10}
do
  olp local catalog layer stream put ${CATALOG_HRN} streaming-layer --input "${FOLDER_WITH_DATA}"
  sleep 5
done

スクリプトを実行するには、次のコマンドを実行します。

bash scripts/populate-streaming-data.sh {{YOUR_CATALOG_HRN}}

スクリプトが実行されると、実行中のアプリケーションのコンソールに同じ情報のログが表示されます。

デバッガを接続します

この章では、IntelliJ アイデアの機能を使用して Flink アプリケーションをデバッグする方法について説明します。また、コンソールを使用してプログラムを実行する場合にデバッグを開始するプロセスにアタッチする方法についても学習します。

デバッガを設定するに MAVEN_OPTS は、変数を設定する必要があります。 この操作を行うには、実行中のアプリケーションを停止して、次のコマンドを実行します。

JDK 5-8
Java 9 以降
 
export MAVEN_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 

 
export MAVEN_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005 

このオプションには、次のパラメータがあります。

  • address –デバッグに使用されるポート。 チュートリアルではポートを使用 5005しますが、任意の空きポートを使用できます。
  • server=y –受信したデバッガ接続 ( サーバーとして動作 ) をプロセスがリッスンするように指定します。
  • suspend=n –デバッガが接続されるまでプロセスが待機するように指定します。

次のコマンドを使用して、アプリケーションを実行できるようになりました。

Java
Scala
 mvn compile exec:java -D"exec.mainClass"="DevelopFlinkApplication"
-Dhere.platform.data-client.endpoint-locator.discovery-service-env=local  
 mvn compile exec:java -D"exec.mainClass"="DevelopFlinkApplicationScala"
-Dhere.platform.data-client.endpoint-locator.discovery-service-env=local 

Listening for transport dt_socket at address: 5005 ログに行が表示されていることを確認してください。

実行中のアプリケーションにアタッチする前に、たとえばパーティションがマップされている行でブレークポイントを設定しましょう。 この行から、ダウンロードしたパーティション についての役立つ情報を多数取得できます。

Run > を使用してプロセスにアタッチし Attach to Process 、指定 5050 したポートのプロセスを選択できるようになりました。

プロセスがアタッチされ、プログラムが実行され、ストリーム レイヤー にデータが表示されるのを待っている状態になったら、スクリプトを再度実行してデータを挿入する必要があります。

bash scripts/populate-streaming-data.sh {{YOUR_CATALOG_HRN}}

scripts/populate-streaming-data.sh スクリプトの実行後、カタログに新しいデータが表示され、アプリケーションがそのデータの読み取りを開始するとすぐに、デバッガがブレークポイントで停止する必要があります。 これで、コードをステップ実行して、変数およびスタックトレースの内容を検査できます。

IntelliJ アイデアでは、デバッガの代わりに標準の Java デバッガを使用することもできます。

Maven 原型を使用したプロジェクト生成

Maven の原型を使用して、 Flink アプリケーションの Maven プロジェクトをブートストラップできます。 この場合、次のタスクが自動的に完了するため、プロジェクトの設定が迅速化されます。

  • SDK BOM ファイルが含まれています。
  • プラットフォーム のファット JAR を生成する Maven プロファイルを作成します。

ストリーミングパイプライン オプションの詳細について は、「 SDK ワークフロー」を参照してください。

Flink アプリケーションプロジェクトを作成するには、 Java プロジェクトで次のコマンドを使用します。

Linux
mvn archetype:generate -DarchetypeGroupId=com.here.platform \
-DarchetypeArtifactId=streaming-java-archetype \
-DarchetypeVersion=1.0.892 \
-DgroupId=com.here.platform.tutorial \
-DartifactId=develop-flink-application \
-Dversion=1.0-SNAPSHOT \
-Dpackage=com.here.platform.tutorial
mvn archetype:generate -DarchetypeGroupId=com.here.platform ^
-DarchetypeArtifactId=streaming-java-archetype ^
-DarchetypeVersion=1.0.892 ^
-DgroupId=com.here.platform.tutorial ^
-DartifactId=develop-flink-application ^
-Dversion=1.0-SNAPSHOT ^
-Dpackage=com.here.platform.tutorial

Scala プロジェクトでは、次のコマンドを使用します。

Linux
mvn archetype:generate -DarchetypeGroupId=com.here.platform \
-DarchetypeArtifactId=streaming-scala-archetype \
-DarchetypeVersion=1.0.892 \
-DgroupId=com.here.platform.tutorial.scala \
-DartifactId=develop-flink-application \
-Dversion=1.0-SNAPSHOT \
-Dpackage=com.here.platform.tutorial
mvn archetype:generate -DarchetypeGroupId=com.here.platform ^
-DarchetypeArtifactId=streaming-scala-archetype ^
-DarchetypeVersion=1.0.892 ^
-DgroupId=com.here.platform.tutorial.scala ^
-DartifactId=develop-flink-application ^
-Dversion=1.0-SNAPSHOT ^
-Dpackage=com.here.platform.tutorial

ローカルで実行するプロジェクトをビルドします

プロジェクトをビルドするには、プロジェクトフォルダーで次のコマンドを実行します。

mvn install

プラットフォーム で実行するプロジェクトをビルドします

プラットフォーム でパイプライン を実行する fat jar には、まずをビルドする必要があります。 ビルドするには、次のコマンドを使用します。

mvn install -Pplatform

fat jarのビルドの詳細 について は、「プロジェクトに SDK を含める」を参照してください。

結論

このチュートリアルでは、 Flink プログラム開発の各段階について学習しました。 プラットフォームで Flink アプリケーションを実行する方法、および Splunk、Grafana、Flink UI、Platform Billing Page などの監視ツールについて理解するには、プラットフォーム チュートリアルの「Flink アプリケーションを実行する」を参照してください。

詳細情報

このチュートリアルで扱うトピックの詳細については、次のソースを参照してください。

」に一致する結果は 件です

    」に一致する結果はありません