ストリーム処理のベストプラクティス

トピック

概要

Apache Flink は、パイプラインサービスがストリームデータ処理を実装するために使用します。 以下のセクションでは、ストリームを使用して HERE platform の Flink 処理パイプラインを作成する開発者のためのベストプラクティスについて説明します。

HERE platform SDK をインストールすると、 Flink v1.13.5 のランタイムライブラリもインストールされます。 これは、開発者のローカルマシンに Flink プロジェクトを設定する、提供されている Maven の原型を使用して行われます。 このプロジェクトには、 Apache からの正しい Flink 依存関係のダウンロードが含まれています。 開発者がプロジェクトをビルドすると 、すべてのプロジェクトクラスファイルおよびリソースがすべての依存関係とともに圧縮された Fat JAR ( 別名「 Uber JAR 」 ) が作成されます。 この Fat JAR には Flink ライブラリの依存関係が含まれています。 開発者 ファット JAR を展開する必要があります。これらの依存関係はパイプラインクラスタに事前展開されていないため、 Flink ライブラリの依存関係を削除することはできません。

次のデータライブラリは、 Flink で使用するように設計されています。

データ クライアント ライブラリ

データ クライアント ライブラリは、下位レベルのビルディングブロックと上位レベルの API の両方を使用して非同期 API 、ストリーミング、バックプレッシャ、並列処理のサポート、組み込みの再試行 / レジュームを提供するマルチモジュールのライブラリです。 および Akka はコネクターをストリーミングします。 ストリーム パイプライン( Flink )内でデータ クライアント ライブラリを使用するアプリケーションを実行するに flink-support は、このモジュールをプロジェクトへの依存関係として使用します。 他のデータ クライアント ライブラリモジュールをプロジェクトに追加しないでください。追加すると、ストリーム パイプラインの失敗につながる依存関係の競合が発生します。 詳細については 、『データ クライアント ライブラリ開発者ガイド』を参照してください。

Data Archiving ライブラリ(データアーカイブサービス)

このライブラリは、ストリーム レイヤーを介してインジェストされたメッセージのアーカイブを支援します。 ストリーム、 Avro 、 Parquet などのさまざまな形式で Protobuf データを作成およびアーカイブできるようにカスタマイズできる、データに依存しないワークフローを提供します。 詳細については 、 Data Archiving ライブラリ開発者ガイドを参照してください。

ロケーション ライブラリ

これは、道路ネットワークのナビゲート、道路属性へのアクセス、地球空間クエリーなど、位置ベースの分析のための一連のアルゴリズムです。 バッチパイプラインまたはストリームパイプラインで実行される場所ベースのプログラムを作成するために使用されます。 詳細については 、『ロケーション ライブラリ開発者ガイド』を参照してください。

センサーデータの取り込みプラットフォーム

ライブラリではありませんが、 Sensor Data Injestion プラットフォームは、さまざまなセンサーからデータメッセージを収集して検証するための多機能な Web サービスです。 プラットフォームの API を使用すると、センサーデータを含む POST メッセージをセンサーデータの取り込みプラットフォームに送信できます。 このデータはストリーム パイプラインで処理できます。 詳細について は、『 Sensor Data Inジ ェストインターフェース( SDII )データ仕様』を参照してください。

チェックポイントを有効にします

Flink (ストリーム)パイプラインの状態を復旧するには、 Flink チェックポイントが必要です。 有効にすると、 Flink は指定した間隔でパイプラインの整合性のあるスナップショット(または Flink 用語ではジョブグラフ)を取得します。 既定では、チェックポイントは無効になっています。 チェックポイントを有効にするには、StreamExecutionEnvironmentでメソッド enableCheckpointing(n)を使用します。n は、チェックポイントの間隔(ミリ秒単位)です。 詳細については 、チェックポイントの Apache Flink ドキュメントを参照してください。

ストリーム パイプラインの一時停止またはアップグレード時に Flink がセーブポイントを作成できるように、チェックポイントを有効にすることが重要です。 セーブポイントは関連するチェックポイントのコレクションであり、チェックポイントがない場合、セーブポイントは空になります。 また、各演算子に安定した UID を設定することも重要です。 Flink は、再起動時にこれらの UID を使用して、保存されている状態とオペレータを照合します。 演算子が一致しない場合、 Flink は状態を復元できません。 詳細 については、セーブポイントの Apache Flink ドキュメントを参照してください。

情報 : 追加コスト

基盤となるストレージからの読み取り / 書き込みのチェックポイントを利用する Flink パイプライン。 このトラフィックはパイプライン IO として請求されます。

チェックポイントの頻度

ステートフルデータをオペレータに保存するには、チェックポイントの作成時間を考慮することが重要です。 チェックポイントの動作方法の詳細について は、データストリーミングの耐障害性に関する Apache Flink のドキュメントを参照してください。

チェックポイント / セーブポイント機能を設定して、障害が発生した場合にパイプライン を再度アクティブ化するバックアップを選択できるようにすることもできます。

スナップショットからのストリーム パイプライン のリストア

Stream-5.0 以降のランタイムを使用するストリーム パイプラインは、以前に作成したスナップショット(または Flink のドキュメントのセーブポイント)からアクティブ化できます。 「アクティブ化」操作中に、スナップショット ID を提供して、スナップショットによってキャプチャされた状態からパイプライン を実行できます。

一時停止、アップグレード、および再起動操作中に、ストリーム パイプラインのスナップショットがシステムによって取得されます。 スナップショットのリストを取得して、目的のパイプライン バージョンのアクティブ化に使用するスナップショット ID を特定できます。 パイプライン API と CLI の両方が、スナップショットを一覧表示し、「アクティブ化」操作中にスナップショット ID を提供する機能を提供します。

メモ

ストリーム パイプラインではランタイムに Apache Flink が使用されるため、スナップショットは基本的に Flink のセーブポイントになります。

外部化されたチェックポイント機能

ストリーム 5.0 以降のパイプライン を開発している場合は、コード内で外部化されたチェックポイントを有効にして、保持するように設定できます。 ストリーム 5.0 以降のパイプライン バージョンをアクティブ化する際に、同じパイプライン の以前の実行で作成された外部化されたチェックポイントでアクティブ化を要求できます。 外部化されたチェックポイントを使用してパイプライン バージョンをアクティブ化するよう要求すると、システムはパイプライン を開始し、同じパイプライン で利用可能な最新のチェックポイントのチェックポイントの場所を提供します。

外部化されたチェックポイントを使用してパイプライン をアクティブ化するには、まず次のコードをパイプライン に追加して、外部化されたチェックポイントを作成して保持する必要があります。

// enable externalized checkpoints which are retained
// after job cancellation
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

詳細については 、『 Flink Checkpointing 』ドキュメントを参照してください。

パイプライン API と CLI の両方で、パイプライン をアクティブ化するときに、利用可能な最新のチェックポイントを使用することを示すフラグを指定できます。

スーパーバイザーおよび作業者の設定

パイプライン JAR ファイル が開発されると、内部パラメータによって各 Flink タスクの予想されるリソースレベルが設定されます。 このパラメータは パイプライン テンプレートで指定されたランタイム設定を使用して、そのテンプレートを使用するパイプラインバージョンで使用される処理リソースを決定する乗数として効果的に使用されます。 これにより、各パイプラインバージョンを独自のランタイム環境で最適化できます。

各ストリーム パイプラインは、次の 2 種類のプロセスを実行します。

  • スーパーバイザJobManagers とも呼ばれます)が分散実行を調整します。 スーパーバイザは常に 1 人です。
  • ワーカー (TaskManagers とも呼ば れます ) は、データフローのタスクを実行し、データストリームをバッファリングして交換します。 少なくとも 1 人の作業者がいる必要がありますが、 9999 未満である必要があります。

パイプライン テンプレートでは、スーパーバイザユニット数、ワーカーユニット数、およびワーカー数を指定する必要があります。 これらのパラメータは、既定のランタイムリソース割り当てを決定するために使用されますが、パイプラインバージョンの作成時に指定された値で上書きされることがあります。 これらの値の意味の詳細 については、パイプラインユーザー ガイドの記事「割り当てと制限」を参照してください。

テンプレートは、プラットフォームポータル、 CLI 、または API レベルで作成できます。 まず、 OLP CLI ガイドの [ テンプレート] の下にあるドキュメントを参照してください。 CLI コマンド pipeline-template createでは、これらのランタイムパラメータがオプションで一覧表示されます。 API リファレンス では、 PipelineTemplate パラメータのデフォルトのクラスタ設定に含まれている情報と同じ情報が参照できます。 詳細について createPipelineTemplateは、演算子の API リファレンス エントリを参照してください。

並列処理を設定しています

Flink のドキュメントから :

「 Flink プログラムは、複数のタスク ( 変換、演算子、データソース、および出力 ) で構成されています。 タスクは複数の並列インスタンスに分割されて実行され、各並列インスタンスがタスクの入力データのサブセットを処理します。 タスクの並列インスタンスの数は、並列処理と呼ばれます。

タスクの並列処理は、オペレータレベルまたはランタイム環境レベルで設定できます。

クラスタの設定と並列処理

パイプラインコードで設定された最大並列処理をサポートするように、 Flink パイプラインに十分なクラスタリソースを構成します。 同時に、パイプラインコードで設定できる最大並列処理数は 、パイプラインクラスタ内のタスクスロットの合計数です。 Flink パイプラインクラスタがサポートできる最大の並列処理は 、ワーカー パラメータによって制御されます。 デフォルトでは、 HERE platform では 、各ワーカー が取得できる タスクスロットは 1 つだけです。 ワーカーごとのタスクスロット は 、 CLI を介してストリーム設定で設定できます。

メモ

TaskManager とスロットの詳細について は、 Distributed ランタイム環境の Apache Flink のマニュアルを参照してください。

Flink パイプラインの例を次に示します。

  • パイプラインには、 3 つの演算子があります。 1 つはソース、 1 つのマップ、もう 1 つはシンク演算子で、それぞれ並列処理数は 3 、 8 、 3 です。
  • ストリーム パイプラインコードで設定されている最大の並列処理数は 8 です。

この例では、最大並列処理数 8 をサポートするに は、パイプラインクラスタに 8 つ以上のタスクスロットが必要です。 これを行うに は、workersタスクスロットの組み合わせを 8 の値に指定します。たとえば、 4 workersを 2 つのタスクスロットに設定するか、 8 workersを設定して 1 つのワーカーに 1 つのタスクスロットを割り当てます。

このパラメータ workerUnitsは、各ワーカーに割り当てられているリソースを指定します。 各メモリ workerUnits には、 1 CPU 、 7 GB の RAM 、 8 GB のディスク容量があります。 TaskManager あたりの並列処理数とリソース数を増やして、それぞれのタスクマネージャのリソース量を大幅に削減することをお勧めします。 workerUnits1 人の作業者に割り当てることができる最大数 は 15 です。 また、ワーカーあたりのタスクスロット数を、そのワーカーに割り当てられている最大 CPU 数に制限することも推奨されます。

ストリームの設定

ストリーム設定では、特定の Flink 設定プロパティを設定できます。 たとえば、 ワーカーごとに 2 つのタスクスロットを設定するようにtaskmanager.numberOfTaskSlots=2を設定します。 ストリーム設定でサポートされている Flink 設定プロパティの一覧を次に示します。


      taskmanager.numberOfTaskSlots,
      cluster.evenly-spread-out-slots,
      taskmanager.heap.size,
      taskmanager.memory.flink.size,
      taskmanager.memory.jvm-metaspace.size,
      taskmanager.memory.framework.heap.size,
      taskmanager.memory.task.heap.size,
      taskmanager.memory.managed.size,
      taskmanager.memory.managed.fraction,
      taskmanager.memory.framework.off-heap.size,
      taskmanager.memory.task.off-heap.size,
      taskmanager.memory.network.min,
      taskmanager.memory.network.max,
      taskmanager.memory.network.fraction,
      taskmanager.memory.jvm-overhead.min,
      taskmanager.memory.jvm-overhead.max,
      taskmanager.memory.jvm-overhead.fraction,
      statefun.message.serializer,
      classloader.parent-first-patterns.additional,
      statefun.feedback.memory.size,
      akka.framesize,
      akka.ask.timeout,
      akka.lookup.timeout,
      akka.tcp.timeout,
      akka.client.timeout,
      heartbeat.interval,
      heartbeat.timeout,
      state.storage.fs.memory-threshold,
      jobmanager.memory.heap.size,
      jobmanager.memory.off-heap.size,
      jobmanager.memory.jvm-metaspace.size,
      jobmanager.memory.jvm-overhead.min,
      jobmanager.memory.jvm-overhead.max,
      jobmanager.memory.jvm-overhead.fraction

詳細について は、 Apache Flink の設定および状態機能の設定に関するドキュメントを参照してください。 パイプライン テンプレートでは、スーパーバイザユニット数、ワーカーユニット数、およびワーカー数を指定する必要があります。 これらのパラメータは、既定のランタイムリソース割り当てを決定するために使用されますが、パイプラインバージョンの作成時に指定された値で上書きされることがあります。 これらの値の意味の詳細 については、「割り当て量と制限」を参照してください。

テンプレートは、プラットフォームポータル、 CLI 、または API レベルで作成できます。 まず、 OLP CLI ガイドの [ テンプレート] の下にあるドキュメントを参照してください。 CLI コマンド pipeline-template createでは、これらのランタイムパラメータがオプションで一覧表示されます。 API リファレンス では、 PipelineTemplate パラメータのデフォルトのクラスタ設定に含まれている情報と同じ情報が参照できます。 詳細について createPipelineTemplateは、演算子の API リファレンス エントリを参照してください。 タスクスロットの値の設定については ' パイプラインの CLI コマンドを参照してください

ストリームパイプラインの通知と復旧を有効化します

計画的な停止では、ストリーム パイプラインに影響が出る可能性があります。 データ処理で予期せぬ中断が発生しないようにするには、パイプラインに関連付けられている電子メールアドレスを更新して、インシデントおよび予期されるユーザー操作についての明確な詳細情報を含む計画的な停止通知を送信することをお勧めします。次に例を示します。

  • インシデントの概要
  • レルム
  • パイプライン ID
  • パイプライン名
  • パイプラインバージョン ID
  • サポート指示が記載されたリクエスト済みアクション
  • 期限日時
  • 要求されたアクションが実行されなかった場合に、 HERE platform によって実行されるシステム操作

要求されたアクションが期限日時までに実行されなかった場合、停止の電子メールに一覧表示されているシステム操作が実行されます。 影響を受けるストリーム パイプラインでシステム操作が開始されると、次の詳細情報を含む 2 通目の電子メールが送信されます。

  • レルム
  • パイプライン ID
  • パイプライン名
  • パイプラインバージョン ID
  • インシデントの概要
  • システム操作の日時
  • システム操作が実行されています

システム操作中に、現在の状態を保存して、保存済みの状態で新しいジョブを開始しようとします。 要求されたアクションが期限までに実行された場合、システム操作はドロップされます。 状態は、チェックポイントを使用するストリームパイプラインについてのみ復旧されます。 それ以外の場合、要求されたアクションが期限までに実行されなかった場合、パイプラインジョブはキャンセルされ、パイプラインが再度アクティブ化されます。 そのため、ストリームパイプラインでは、チェックポインティングを使用することをお勧めします。

注 : セーブポイントを完了する時間

セーブポイントを取得しようとしている間、パイプラインがセーブポイントを完了するまで 120 秒待ちます。 パイプラインがその時間内にセーブポイントを完了できない場合は、キャンセルしてからパイプラインをアクティブ化します。

影響を受けるストリーム パイプラインに電子メールアドレスが関連付けられていない場合、通知は送信されず、計画された停止通知で説明されているようにシステム操作が実行されます。

戦略を再起動します

既定で使用される再起動方法は、 " 再起動なし " です。 別の再起動方法を使用するには、パイプラインコードで再起動方法を実装する必要があります。 たとえば、次のコードでは、「失敗率」の再起動ストラテジが設定されます。


        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.failureRateRestart(
            3, // max failures per interval
            Time.of(60, TimeUnit.MINUTES), //time interval for measuring failure rate
            Time.of(30, TimeUnit.SECONDS) // delay
        ));

注意

このコード例では、パフォーマンスの要求は行われません。 これは、技術を説明することのみを目的としています。

Flink は、さまざまな再起動方法をサポートしてい Flink の再起動方法の詳細について は、再起動方法に関する Apache Flink のドキュメントを参照してください。

ジョブマネージャーは、すべての Flink パイプラインのスケジュール設定とリソース管理を調整します。 デフォルトでは、パイプラインの計算クラスタごとに 1 つのジョブマネージャーが作成されます。 これにより、単一の障害点が作成されます。ジョブマネージャーがクラッシュすると、実行中のパイプラインが失敗します。

注 : ジョブマネージャーの高可用性設定

  • Stream-2.0.0 (非推奨) パイプラインの場合は、 1 つのジョブマネージャーが使用されます。 ズーキーパを使用してジョブマネージャーの状態を保存し、エラーから復元します。 Flink 1.7 のバグにより、 Stream-2.0.0 で複数のジョブマネージャーを使用できませんでした。
  • Stream-3.0.0 、ストリーム 4.0 、およびストリーム 5.0 のパイプラインでは、 2 つのジョブマネージャが使用されます。 1 つはアクティブ、もう 1 つはスタンバイです。 これらの複数のジョブマネージャーは、リーダーの選出とパイプラインの状態を調整するズーキーパーを介して管理されます。 プライマリジョブマネージャがクラッシュした場合、スタンバイジョブマネージャが迅速に引き継ぎ、パイプラインの実行を続行します。 また、障害が発生したプライマリジョブマネージャが再起動され、新しいスタンバイジョブマネージャになり、高可用性を再確立して将来の障害から保護します。

高可用性を有効化するオプションは、アクティブ化、再開、およびアップグレード操作中に使用できます。

注意 : 追加コスト

Flink ジョブマネージャーの高可用性オプションを使用すると、ストリーム パイプラインの実行コストが増加します。 ストリームパイプラインのジョブマネージャーを高可用性で実行するには、次のリソースが必要です。

  • Zookeeper のリソース : 1.5 CPU および 1.5 GB の RAM
  • 追加のジョブマネージャーのリソース ( プライマリのジョブマネージャーと同じ )

これらの追加リソースのコストは、パイプラインの元のコストに追加されます。

Flink ジョブマネージャーの高可用性の詳細 については、 Apache Flink のドキュメントを参照してください。

Flink のドキュメントには、開発者に役立つ推奨事項を一覧表示した実稼働準備チェックリストが含まれています。 簡単に言うと、次の内容が含まれています。

  • 演算子の最大並列処理構成パラメータ
  • 状態の保存と復元をより適切に行うために、オペレータの UUID を設定します
  • 状態バックエンドを選択します。これは、チェックポイント設定に影響を与える可能性があります
  • JobManager 高可用性 (HA) の設定

すべてのチェックリストは 、 Apache Flink のドキュメントに記載されています。

複数のパイプラインを使用しています

複数のストリームパイプラインと 1 つのデータソースを使用する場合、 2 つの使用例があります。 ただし、これらのパイプラインがどのように設定されているかに注意して、目的の結果を得る必要があります。

情報

Apache Kafka は、ストリームサービスによってデータパイプラインメッセージングシステムとして使用され、ストリームのリアルタイム処理を可能にします。 これにより、複数のパイプラインで共通の入力カタログデータソースを使用でき ます。このデータソースでは、 Kafka がキューイングモデルまたはパブリッシュサブスクライブモデルとして機能します。詳細については、「メッセージングシステムとしての Kafka 」を参照してください。

ユースケース 1 : 共有処理中です

ユースケース 1- 共有処理
図 1. 共有処理中です

このユースケースでは、単一のストリーミングデータソースの処理負荷を共有する複数のパイプラインを使用することをお勧めします。 各パイプラインは、 Kafka ストリームプロセッサによってルーティングされたデータを処理するので、各パイプラインは入力カタログの一部を処理し、データストリームをレイヤー化するだけで済みます。 これにより、各パイプラインがオーバーオール入力データの一部のみを処理する必要があるため、新しいストリームデータの処理を高速化できます。

ユースケース 2 : 共有データソース

ユースケース 2- 共有データソース
図 2. 共有データソース

このユースケースでは、複数のパイプラインが同じソースデータに異なるデータ処理ワークフローを適用します。 HERE platform ストリームサービスでは、パイプラインが同じストリーミングデータソースを使用していることを除き、パイプラインは無関係と見なされます。 各パイプラインは、入力カタログデータストリーム全体を消費します。

各ユースケースは HERE platform で実行できますが、設定はユースケースごとに異なります。

両方の使用例の主要な設定パラメータは 、グループ ID とKafka Consumer Group IDです。 グループ ID は通常、展開時に割り当てられ、テンプレートに含まれているパラメータの 1 つです。 すべてのパイプラインバージョンには、関連付けられたグループ ID があります。 グループ ID のみが異なる複数のパイプラインバージョンを作成できます。 Kafka クラスタをデータコンシューマ(パイプライン)に関連付けるために、 Kafka コンシューマグループ ID が自動的に割り当てられます。 Kafka Consumer Group ID はプログラムによって上書きできますが、予期しない動作を引き起こす可能性があるため、この ID は適していません。

設定の詳細

ここには、 2 つのストリームパイプラインと 1 つの入力データカタログがあります。

設定シナリオの例を示します
図 3. 可能な設定

シナリオ 1 : 共有処理 - 両方のパイプラインのグループ ID が同じで、 Kafka Consumer Group ID が上書きされていません。

シナリオ 2 : 共有データソース - 各パイプラインには異なるグループ ID があり、 Kafka 消費者グループ ID が上書きされていません。

シナリオ 3 : 共有データソース - 両方のパイプラインが同じグループ ID を持ち、 Kafka 消費者グループ ID がパイプラインごとに上書きされます。 お勧めできません。

一意のメートル法の演算子命名を使用します

以前のバージョンの Flink では、関連するタスク名に基づいて、メトリック演算子に自動的に名前が付けられていました。 しかし、このシステムには障害が発生しやすいことが判明しました。 それぞれのに保存 streamConfigされている明示的に指定されたメトリック演算子に置き換えられました。 ただし 、重複したメトリックが報告されないような名前の競合を避けるために、メトリック演算子名を一意にする必要があります

operatorName また、メトリック名のコンポーネントを 80 文字を超えないように制限することをお勧めします。 これにより、ログにレポートされるメトリック名の幅が狭くなりません。 また、名前が長すぎると、 Graphite Reporter によってメトリックが無視される既知のリスクも回避されます。

オペレータ ID と命名の詳細 については、セーブポイントの Apache Flink のドキュメントを参照してください。

一意のアプリケーション ID を使用します

情報 : アプリケーション ID

すべてのパイプラインアプリケーションを HERE platform に登録し、 アプリケーション ID を受け取る必要があります。 これは承認目的で使用 credentials.properties され、通常はアプリケーションの登録時に作成されたファイルからアクセスされます。 例について は、「設定ファイルリファレンス」を参照してください。

同じグループ ID をアプリケーション ID 、レイヤー ID 、およびカタログ ID の特定の組み合わせに使用すると、問題が発生する可能性があります したがって、同じアプリケーション ID を共有し、同じストリーミングレイヤーを使用する 2 つのパイプラインを実行する場合、指定されたメッセージは 1 つのパイプラインのみによって消費されます。 これは、各パイプラインが分離して動作し、すべての着信メッセージを消費すると予想される場合に問題になることがあります。

この問題を回避するには、各パイプラインに異なるグループ( HERE アカウントグループ)を作成して、各パイプラインが一意のアプリケーション ID を使用するようにすることをお勧めします。 この方法を使用すると、ストリームレイヤーからデータを消費するパイプラインが、必要に応じてすべてのメッセージを消費することが保証されます。 パイプライン間でアプリケーション ID を共有すると、すべてのパイプラインがストリームからのすべてのメッセージを消費するのではなく、パイプラインごとにデータの一部消費の問題が発生します。 詳細については 、データ クライアント ライブラリのマニュアルを参照してください。

入力と出力に同じカタログを使用します

バッチパイプラインとは異なり、ストリームパイプラインでは入力と出力に同じカタログを使用できます。 ただし、カタログを有効にするには、ストリームレイヤーでカタログを設定する必要があります。

Flink ランタイムライブラリは、 HERE Platform Pipeline に自動的に含まれます。 ネイティブ の原型 を使用して、新しいストリーム パイプラインを作成するための事前設定されたプロジェクト構造を提供できます。 詳細については 、 Archetypes のドキュメントを参照してください。

」に一致する結果は 件です

    」に一致する結果はありません