Flink アプリケーションを開発します
目的: シンプルな Flink アプリケーションを開発します。
複雑さ: 初心者向け
所要時間: 30 分
ソースコード: ダウンロード
このチュートリアルでは、ストリーム レイヤー からデータを読み取る簡単な Flink アプリケーションを開発、デバッグ、および実行する方法を示します。また、このすべてのデータを log4j を使用してログに記録します。
このチュートリアルでは、次のトピックについて説明します。
Maven プロジェクトを設定します
チュートリアルの最初にソースコードをダウンロードして任意のフォルダーに配置するか、プロジェクトのフォルダー構造を最初から作成します。
develop-flink-application
└── src
└── main
├── java
└── resources
└── scala
この操作は、次の bash
1 つのコマンドで実行できます。
mkdir -p develop-flink-application/src/main/{java,resources,scala}
Maven POM ファイルは 、 Maven 設定の確認 の例のファイルと似ていますが、更新された親 POM と依存関係セクションがあります。
親 POM sdk-stream-bom_${scala.compat.version}
は、 Flink 関連のライブラリを使用する必要があるためです。
<parent>
<groupId>com.here.platform</groupId>
<artifactId>sdk-stream-bom_2.12</artifactId>
<version>2.54.3</version>
<relativePath/>
</parent>
次の依存関係が使用されます。
-
com.here.platform.data.client:local-support_${scala.compat.version}
ローカルデータカタログからデータを読み取る。 -
org.apache.flink:flink-clients_${scala.compat.version}
ExecutorFactory
Flink アプリケーションを提供します。 -
org.apache.flink:flink-streaming-java_${scala.compat.version}
Java Flink アプリケーションを実行します。 -
org.apache.flink:flink-streaming-scala_${scala.compat.version}
Scala Flink アプリケーションを実行します。 -
com.here.platform.data.client:flink-support_${scala.compat.version}
プラットフォーム のデータカタログからデータを読み取る。 -
org.slf4j:slf4j-log4j12
アプリケーションの結果をコンソールおよびプラットフォーム の Splunk に記録します。 -
com.here.platform.pipeline:pipeline-interface_${scala.compat.version}
から入力カタログに関する情報を取得 PipelineContext
します。
依存関係 :
<dependencies>
<dependency>
<groupId>com.here.platform.data.client</groupId>
<artifactId>flink-support_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.data.client</groupId>
<artifactId>local-support_2.12</artifactId>
<exclusions>
<exclusion>
<groupId>com.here.platform.data.client</groupId>
<artifactId>client-core_2.12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.here.platform.pipeline</groupId>
<artifactId>pipeline-interface_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
</dependencies>
pom.xml
必要なすべての依存関係をファイルに追加したら、次のステップでアプリケーションのコードを記述して実行します。
ソースコードを入力します
前に説明したように、このチュートリアルでは、ストリーム レイヤー からデータを読み取り、コンソールにデータを出力する単純な Flink アプリケーションを記述する方法を示します。 OLP CLI を使用してストリーム アプリケーションを実行すると、データがストリーム レイヤー に追加されます。レイヤーから読み取られたすべてのデータは 、 log4j を使用して同時にコンソールに記録されます。 log4j の設定は src/main/resources/log4j.properties
ファイルにあります。
この Flink アプリケーションの実装について見てみましょう。 以下のコード スニペット では、が FlinkQueryApi
ストリーム レイヤー に追加された新しいデータを受け取るサブスクリプションの作成に使用されていることを確認できます。 サブスクリプションが作成され DataStreamSource
、ストリーム レイヤー から新しいデータを送信するソース関数が宣言されると SampleMaper
、拡張さ RichMapFunction
れたクラスを使用してパーティションをマップします。このクラスは、セットアップメソッドとティアダウンメソッドを提供します。 SampleMapper
このクラスは、パーティションおよびログを人間が読める文字列でダウンロードします。
import com.here.hrn.HRN;
import com.here.platform.data.client.flink.javadsl.FlinkDataClient;
import com.here.platform.data.client.flink.javadsl.FlinkQueryApi;
import com.here.platform.data.client.flink.javadsl.FlinkReadEngine;
import com.here.platform.data.client.javadsl.Partition;
import com.here.platform.data.client.settings.ConsumerSettings;
import com.here.platform.pipeline.PipelineContext;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DevelopFlinkApplication {
private static final Logger LOGGER = LoggerFactory.getLogger(DevelopFlinkApplication.class);
private static final String STREAMING_LAYER = "streaming-layer";
private static final String CONSUMER_GROUP_NAME = "flink";
private static HRN catalogHrn;
public static void main(String[] args) throws Exception {
PipelineContext pipelineContext = new PipelineContext();
catalogHrn = pipelineContext.getConfig().getInputCatalogs().get("input-catalog");
final FlinkDataClient flinkDataClient = new FlinkDataClient();
FlinkQueryApi queryApi = flinkDataClient.queryApi(catalogHrn);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ConsumerSettings consumerSettings =
new ConsumerSettings.Builder()
.withGroupName(CONSUMER_GROUP_NAME)
.withLatestOffset()
.build();
SourceFunction<Partition> subscriptionFunction =
queryApi.subscribe(STREAMING_LAYER, consumerSettings);
DataStreamSource<Partition> partitions = env.addSource(subscriptionFunction);
partitions.map(new SampleMapper());
env.execute();
}
private static class SampleMapper extends RichMapFunction<Partition, String> {
private transient FlinkDataClient flinkDataClient;
private transient FlinkReadEngine flinkReadEngine;
@Override
public void open(Configuration parameters) {
flinkDataClient = new FlinkDataClient();
flinkReadEngine = flinkDataClient.readEngine(catalogHrn);
}
@Override
public String map(Partition partition) {
String partitionContent = new String(flinkReadEngine.getDataAsBytes(partition));
LOGGER.info(partitionContent);
return partitionContent;
}
@Override
public void close() {
flinkDataClient.terminate();
}
}
}
import com.here.hrn.HRN
import com.here.platform.data.client.flink.javadsl.{FlinkDataClient, FlinkReadEngine}
import com.here.platform.data.client.javadsl.Partition
import com.here.platform.data.client.settings.{ConsumerSettings, LatestOffset}
import com.here.platform.pipeline.PipelineContext
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.slf4j.LoggerFactory
import java.io.Serializable
object DevelopFlinkApplicationScala {
private final val Logger = LoggerFactory.getLogger(classOf[DevelopFlinkApplication])
private final val StreamingLayer = "streaming-layer"
private final val ConsumerGroupName = "flink"
def main(args: Array[String]): Unit = {
val pipelineContext = new PipelineContext
val catalogHrn = pipelineContext.getConfig.getInputCatalogs.get("input-catalog")
val client = new FlinkDataClient
val queryApi = client.queryApi(catalogHrn)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val consumerSettings = ConsumerSettings(ConsumerGroupName, LatestOffset)
val subscriptionFunction = queryApi.subscribe(StreamingLayer, consumerSettings)
val partitions = env.addSource(subscriptionFunction)
partitions.map(new SampleMapper(catalogHrn))
env.execute
}
class SampleMapper(hrn: HRN) extends RichMapFunction[Partition, String] with Serializable {
@transient
private lazy val flinkDataClient: FlinkDataClient = new FlinkDataClient()
@transient
private lazy val flinkReadEngine: FlinkReadEngine =
flinkDataClient.readEngine(hrn)
override def map(partition: Partition): String = {
val partitionContent = new String(flinkReadEngine.getDataAsBytes(partition))
Logger.info(partitionContent)
partitionContent
}
override def close(): Unit =
flinkDataClient.terminate()
}
}
コードが記述されると、リソースを準備してアプリケーションを実行できます。
アプリケーションを実行します
アプリケーションを実行するには、リソースを準備する必要があります。つまり、データが書き込まれるストリーム レイヤー を使用してカタログを作成します。
このチュートリアルでは、アプリケーションをローカルで実行します。したがって、ローカルカタログを作成するだけで十分です。 ローカルカタログを使用しているため、このチュートリアルを実行するための認証および外部ネットワークへのアクセスは必要ありません。 これらの名前はローカルマシンに含まれているため、レルム内での命名の競合の影響を受けません。また、任意の名前を使用できます。
ストリーム レイヤー を使用してローカルカタログを作成するに catalog-with-stream-layer.json
は、チュートリアルの冒頭でソースコードとともにダウンロードした設定 ファイルが必要です。 ここには、 1 つの OLP CLI コマンドのみを使用して、レイヤーを含むカタログを作成できるようにする設定が含まれています。
olp local catalog create streaming-catalog streaming-catalog --config catalog-with-stream-layer.json
catalog-with-stream-layer.json
ファイルの構造は次のとおりです。
{
"id": "input-catalog",
"name": "Develop Flink Application",
"summary": "Catalog for Stream Data",
"description": "The catalog containing the notification stream",
"layers": [
{
"id": "streaming-layer",
"name": "Notification Stream",
"summary": "Stream for notification",
"description": "Stream for notification",
"layerType": "stream",
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "generic"
},
"contentType": "text/plain"
}
]
}
注
レルムで請求タグが必要な場合 は、layer
セクションにbillingTags: ["YOUR_BILLING_TAG"]
プロパティを追加して設定 ファイルを更新します。
「 Maven プロジェクトを設定PipelineContext
pipeline-config.conf
」の章で説明したように、はファイルから入力カタログ に関する情報を取得するために使用されます。 pipeline-config.conf
ファイルの構造は次のとおりです。
pipeline.config {
output-catalog {hrn = "OUTPUT_CATALOG_HRN"}
input-catalogs {
input-catalog {hrn = "INPUT_CATALOG_HRN"}
}
}
チュートリアルでは出力カタログを使用しませんが、入力カタログ HERE リソースネームを使用して pipeline-config.conf
ファイル内の OUTPUT_CATALOG_HRN
と INPUT_CATALOG_HRN
の両方を置き換えることができます。
プレースホルダーを置き換えた後、次のコマンドを使用して、ダウンロードしたチュートリアルのルートからアプリケーションを実行します。
mvn compile exec:java -D"exec.mainClass"="DevelopFlinkApplication" \
-Dhere.platform.data-client.endpoint-locator.discovery-service-env=local \
-Dspark.master=local[*] \
-Dpipeline-config.file=pipeline-config.conf
mvn compile exec:java -D"exec.mainClass"="DevelopFlinkApplicationScala"
-Dhere.platform.data-client.endpoint-locator.discovery-service-env=local \
-Dspark.master=local[*] \
-Dpipeline-config.file=pipeline-config.conf
このコマンドには、次のパラメータがあります。
-
exec.mainClass
–アプリケーションを実行するためのエントリポイント。 -
here.platform.data-client.endpoint-locator.discovery-service-env=local
–ローカルカタログのみを使用するようにデータ クライアント ライブラリ を設定します。
この時点では、アプリケーションは実行中ですが、入力カタログ にデータがありません。 いくつかのパーティションを入れてみましょう。 新しいコンソールウィンドウを開き、 First HERE Platform Flink Application
次の bash スクリプトを実行します。このスクリプトは、コンテンツのデータを 5 秒ごとに 10 回ストリーム レイヤー にアップロードします。
CATALOG_HRN=$1
FOLDER_WITH_DATA="./src/main/resources/data"
for i in {1..10}
do
olp local catalog layer stream put ${CATALOG_HRN} streaming-layer --input "${FOLDER_WITH_DATA}"
sleep 5
done
スクリプトを実行するには、次のコマンドを実行します。
bash scripts/populate-streaming-data.sh {{YOUR_CATALOG_HRN}}
スクリプトが実行されると、実行中のアプリケーションのコンソールに同じ情報のログが表示されます。
デバッガを接続します
この章では、IntelliJ アイデアの機能を使用して Flink アプリケーションをデバッグする方法について説明します。また、コンソールを使用してプログラムを実行する場合にデバッグを開始するプロセスにアタッチする方法についても学習します。
デバッガを設定するに MAVEN_OPTS
は、変数を設定する必要があります。 この操作を行うには、実行中のアプリケーションを停止して、次のコマンドを実行します。
export MAVEN_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005
export MAVEN_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005
このオプションには、次のパラメータがあります。
-
address
–デバッグに使用されるポート。 チュートリアルではポートを使用 5005
しますが、任意の空きポートを使用できます。 -
server=y
–受信したデバッガ接続 ( サーバーとして動作 ) をプロセスがリッスンするように指定します。 -
suspend=n
–デバッガが接続されるまでプロセスが待機するように指定します。
次のコマンドを使用して、アプリケーションを実行できるようになりました。
mvn compile exec:java -D"exec.mainClass"="DevelopFlinkApplication"
-Dhere.platform.data-client.endpoint-locator.discovery-service-env=local
mvn compile exec:java -D"exec.mainClass"="DevelopFlinkApplicationScala"
-Dhere.platform.data-client.endpoint-locator.discovery-service-env=local
Listening for transport dt_socket at address: 5005
ログに行が表示されていることを確認してください。
実行中のアプリケーションにアタッチする前に、たとえばパーティションがマップされている行でブレークポイントを設定しましょう。 この行から、ダウンロードしたパーティション についての役立つ情報を多数取得できます。
Run
> を使用してプロセスにアタッチし Attach to Process
、指定 5050
したポートのプロセスを選択できるようになりました。
プロセスがアタッチされ、プログラムが実行され、ストリーム レイヤー にデータが表示されるのを待っている状態になったら、スクリプトを再度実行してデータを挿入する必要があります。
bash scripts/populate-streaming-data.sh {{YOUR_CATALOG_HRN}}
scripts/populate-streaming-data.sh
スクリプトの実行後、カタログに新しいデータが表示され、アプリケーションがそのデータの読み取りを開始するとすぐに、デバッガがブレークポイントで停止する必要があります。 これで、コードをステップ実行して、変数およびスタックトレースの内容を検査できます。
IntelliJ アイデアでは、デバッガの代わりに標準の Java デバッガを使用することもできます。
Maven 原型を使用したプロジェクト生成
Maven の原型を使用して、 Flink アプリケーションの Maven プロジェクトをブートストラップできます。 この場合、次のタスクが自動的に完了するため、プロジェクトの設定が迅速化されます。
- SDK BOM ファイルが含まれています。
- プラットフォーム のファット JAR を生成する Maven プロファイルを作成します。
ストリーミングパイプライン オプションの詳細について は、「 SDK ワークフロー」を参照してください。
Flink アプリケーションプロジェクトを作成するには、 Java プロジェクトで次のコマンドを使用します。
mvn archetype:generate -DarchetypeGroupId=com.here.platform \
-DarchetypeArtifactId=streaming-java-archetype \
-DarchetypeVersion=1.0.892 \
-DgroupId=com.here.platform.tutorial \
-DartifactId=develop-flink-application \
-Dversion=1.0-SNAPSHOT \
-Dpackage=com.here.platform.tutorial
mvn archetype:generate -DarchetypeGroupId=com.here.platform ^
-DarchetypeArtifactId=streaming-java-archetype ^
-DarchetypeVersion=1.0.892 ^
-DgroupId=com.here.platform.tutorial ^
-DartifactId=develop-flink-application ^
-Dversion=1.0-SNAPSHOT ^
-Dpackage=com.here.platform.tutorial
Scala プロジェクトでは、次のコマンドを使用します。
mvn archetype:generate -DarchetypeGroupId=com.here.platform \
-DarchetypeArtifactId=streaming-scala-archetype \
-DarchetypeVersion=1.0.892 \
-DgroupId=com.here.platform.tutorial.scala \
-DartifactId=develop-flink-application \
-Dversion=1.0-SNAPSHOT \
-Dpackage=com.here.platform.tutorial
mvn archetype:generate -DarchetypeGroupId=com.here.platform ^
-DarchetypeArtifactId=streaming-scala-archetype ^
-DarchetypeVersion=1.0.892 ^
-DgroupId=com.here.platform.tutorial.scala ^
-DartifactId=develop-flink-application ^
-Dversion=1.0-SNAPSHOT ^
-Dpackage=com.here.platform.tutorial
ローカルで実行するプロジェクトをビルドします
プロジェクトをビルドするには、プロジェクトフォルダーで次のコマンドを実行します。
mvn install
プラットフォーム でパイプライン を実行する fat jar
には、まずをビルドする必要があります。 ビルドするには、次のコマンドを使用します。
mvn install -Pplatform
fat jar
のビルドの詳細 について は、「プロジェクトに SDK を含める」を参照してください。
結論
このチュートリアルでは、 Flink プログラム開発の各段階について学習しました。 プラットフォームで Flink アプリケーションを実行する方法、および Splunk、Grafana、Flink UI、Platform Billing Page などの監視ツールについて理解するには、プラットフォーム チュートリアルの「Flink アプリケーションを実行する」を参照してください。
このチュートリアルで扱うトピックの詳細については、次のソースを参照してください。