トラブルシューティング

内容

パイプライン

Spark


パイプラインのトラブルシューティング

質問 : ロギング URL が作成される前に失敗したパイプラインを調査する方法を教えてください。

A : HERE platform ポータルにログインし 、 [ ツール ] 、 [ 監視と警告] の順にクリックします。 「 request_id 」というテキストを使用して Splunk ログを検索します。 複数のログがある場合は、実行のおおよその時間でフィルタリングします。 このシナリオは、今後のリリースでより適切に処理されるようになります。

質問 : CLI には一部のパイプラインが表示されますが、プラットフォームポータルには表示されません。また、プラットフォームポータルには表示されますが、 CLI には表示されません。なぜですか。

A : パイプラインは、パイプラインの作成時に指定されたグループにのみ表示されます。 CLI クライアントはクライアント資格情報を使用し、プラットフォームポータルはユーザー資格情報を使用します。 クライアントの資格情報とユーザーの資格情報には、同じグループにアクセスする権限が必要です。

質問 : パイプラインがをスローする理由 [DatastreamSource] fetchMessages request failed with invalid offset error

A : このエラーには、次の 2 つの既知の原因があります。

  • パイプラインが、いずれかの入力ストリームの保持期間よりも長い間一時停止されました。
  • パイプラインは、入力ストリームがデータを受信しているよりもデータの処理に時間がかかります。 パイプラインによって処理されているデータは、最終的には保持期間のしきい値を超えるとストリーム レイヤーによってドロップされます。

質問 : データ処理パイプラインをローカルで実行しようとしたときにマスター URL 例外が発生した場合、どのような意味がありますか ?

例 :

A master URL must be set in your configuration at org.apache.spark.SparkContext.<init>(SparkContext.scala:379) at com.here.platform.data.processing.driver.DriverRunner$class.com$here$platform$data$processing$driver$DriverRunner$$newSparkContext
...

A : エラーが発生したのは、ネイティブビルドの実行引数に単純な欠落があることです。

次のものを aven コマンド ラインに追加してください。 -Dexec.args=--master local[*]

例 :

mvn exec:java -Dexec.cleanupDaemonThreads=false
-Dexec.mainClass=com.here.platform.examples.location.batch.Main
"-Dexec.args=--master local[*]"
-Dpipeline-config.file=config/pipeline-config.conf
-Dpipeline-job.file=config/pipeline-job.conf

質問 : パイプラインバージョンを作成するとき JsonParsingException にエラーが発生することがあります。 どうすればよいですか ?

原因 : これは、パイプラインメッセージコールで発生する可能性がある断続的なエラーです。

解決策 : この問題の再現と切り分けは困難でした。 ただし、このエラーメッセージにもかかわらず、パイプラインバージョン ID の応答が失われても、コマンドは意図したとおりに動作します。 このエラーが表示された場合は、次の CLI コマンドを使用して、パイプラインバージョンが正常に作成されたことを確認し、パイプラインバージョン ID を取得できます。

pipeline.py pipeline-version list <pipeline-id>

質問 : パイプライン JAR ファイル に資格情報を含める方法を教えてください。

A : セキュリティ上の理由から、パイプライン JAR ファイル に資格情報を追加することはお勧めしません。 プラットフォームは、ユーザーに代わってパイプラインの資格情報を管理します。 グループおよび権限の設定についての詳細 は、『 ID およびアクセス管理ガイド』を参照してください。

質問 : パイプラインへのアクセスを制限する方法を教えてください。

A : これは「グループ」を介して達成できます。 パイプライン API では、パイプラインの作成中にグループを指定できます。 そのグループに属するユーザーはパイプラインにアクセスできますが、そのグループ外のユーザーはパイプラインにアクセスできません。 アカウントのグループ設定の詳細について は、『 ID およびアクセス管理ガイド』を参照してください。

質問 : パイプライン JAR ファイル の展開に失敗する理由

A : パイプラインの展開に失敗する最も一般的な理由は、次のとおりです。

  • パイプラインの展開に必要な「権限」または「資格情報」がありません。
  • パイプライン JAR ファイル のファイルサイズが 500MB を超えているため、展開するには大きすぎます。
  • ご利用のパイプライン JAR ファイル のファイル名は最大 200 文字を超えているため、処理できません。
  • パイプラインが利用できないか、または利用可能なすべてのリソースをコミットしました。 システム管理者に連絡して問題を解決してください。
  • POST トランザクションが 50 分以内に完了しない場合、リモートホストによって接続が閉じられ、エラーが返されます。

質問 : パイプラインエラーが発生する直前にログに記録されたイベントを見つける方法を教えてください。

A : ログファイルを使用して、イベントの履歴全体を参照できます。 まず、失敗イベントのログエントリを探します。 次に、前のログエントリを確認して、エラーが発生する前に発生した内容を確認します。 これは 3 つのステップから成るプロセスです。

  1. Splunk でログを参照する場合 は、i列のログエントリを展開します。

    Splunk ログ表示のスクリーンキャプチャ。
    図 1. Splunk ログの表示
  2. 下にスクロールし Event Actions て、ドロップダウンメニューを探して開きます。 「ソースを表示」を選択します。

    イベントアクションのドロップダウンメニューのスクリーンキャプチャ。
    図 2. [ イベントアクション ] ドロップダウンメニュー
  3. ログソースが開き、エラーイベントの前に発生したイベントが表示されます。

    障害イベントに関する Splunk ログエントリのスクリーンキャプチャ。
    図 3. 失敗イベントに関するエントリをログに記録します。

質問 : パイプラインの実行中にこのエラーメッセージが表示された場合のパイプラインの修正方法 : java.lang.NoSuchMethodError: com.google.common.cache.CacheBuilder.recordStats()Lcom/google/common/cache/CacheBuilder;

A : com.google.common パッケージの再配置を追加して プラグイン 設定をシェードします (FAT JAR ファイル をビルドします ) 。 例 :

<project>
...
  <profiles>
    <profile>
      <id>platform</id>
      <build>
        <pluginManagement>
          <plugins>
            <plugin>
              <artifactId>maven-shade-plugin</artifactId>
              <executions>
                <execution>
                  <configuration>
                    <relocations combine.children="append">
                      <relocation>
                        <pattern>com.google.common</pattern>
                        <shadedPattern>${project.groupId}.shaded.com.google.common</shadedPattern>
                        <!-- WORKAROUND: until pipeline provided guava gets in-sync with environment pom -->
                      </relocation>
                    </relocations>
                  </configuration>
                </execution>
              </executions>
            </plugin>
          </plugins>
        </pluginManagement>
      </build>
    </profile>
  </profiles>
...
</project>

質問 : パイプラインバージョンで使用されている入力カタログを変更する方法を教えてください。

A : パイプラインバージョンに関連付けられている入力カタログを直接変更する方法はありませんが、同じ結果を得る方法があります。 これを行うには、 1 つ以上の指定された入力カタログを除き、同じテンプレートおよび設定値を使用する新しいパイプラインバージョンでパイプラインバージョンをアップグレードする必要があります。 たとえば、次の手順で CLI を使用できます。

  1. 同じパイプライン テンプレートおよび pipeline-config.conf 新しい入力カタログを指定する新しいファイルを使用して、新しいパイプラインバージョンを作成します。
  2. パイプラインアップグレード手順を使用 して、既存のパイプラインバージョンを、新しい入力カタログをターゲットとする新しいパイプラインバージョンに置き換えます。

質問 : エラーコード MSG1000 の意味を教えてください。また、解決できますか ?

A : Spark または Flink クラスタが新しいジョブを初期化するまで待機するタイムアウトロジックがあります。 Spark または Flink クラスタが予定された時間枠(現在は 1 時間)内に初期化されていない場合 、パイプラインジョブは失敗としてマークされ、そのリソースが削除されます。

この問題の一般的な原因は、 Flink 内に、指定した数のワーカー、 CPU などを使用して Spark またはプラットフォームクラスタを作成するためのリソースが不足していることです。

救済

短期的な問題に対する解決策 :

  • このパイプラインバージョンに相当量のリソースが設定されている場合は、クラスタが正常に開始される可能性を高めるために、作業者数および要求された合計 CPU 数を約半分に削減してください。
  • それ以外の場合、プラットフォームは 5 分後に自動的にジョブの再実行を試みます。
  • ただし、これが 1 回だけ実行されるように設定されたバッチ パイプラインバージョンの場合、パイプラインは、プラットフォームバージョンが明示的にユーザーによって再アクティブ化されていない限り、障害発生後に自動的に再実行を試行しません。

問題が解決しない場合は、サポートチケットを提出してください。

質問 : エラーコード MSG2000 の意味

A : Spark または Flink のパイプラインの実行が開始されるまで待機するタイムアウトロジックがあります。 このタイムアウトは、 Spark または Flink クラスタが正常に作成された後に発生します。

予定されている時間枠(現在は 3 分)内にジョブの実行が開始されない場合 、パイプラインジョブは失敗としてマークされ、そのリソースが削除されます。

詳細については、 Splunk ログを確認するか ( つまり、パイプラインバージョンの loggingUrl を使用して ) 、ログレベル をデバッグに設定してパイプラインを実行 して詳細を取得してください。

対策 :

"MSG2000" というメッセージは、 Spark ジョブがパイプラインの Spark クラスタに送信されなかったためにタイムアウトになったことを示します。 これは、さまざまな理由で発生する可能性があります。 ただし、一般的な原因の 1 つは、ユーザーが Spark masterプロパティを、local[*] 表示されている HERE のようにハードコードしたことです。

SparkConf conf = new SparkConf();
conf.setMaster("local[*]");
context = new JavaSparkContext(conf);

これにより、コードがプラットフォームによって設定されたマスター設定を上書きし、 Spark クラスタリソースを利用しなくなります。 パイプライン管理がジョブのステータスを監視できないため、タイムアウト後にエラーが発生します。

この種の設定ミスには、いくつかの症状があります。

  • タイムアウト時間が経過する と、パイプラインに障害が発生したことが報告され、 MSG2000 エラーが報告されます。
  • Splunk にログインしているすべてのログが「 source=driver 」の下に表示され、「 source=executor 」の下にはログが表示されません。
  • Splunk ログを参照すると、タスクが実行されていることを示すログメッセージが表示されるため( 情報 またはデバッグのログレベルが必要)、 Spark ジョブが実行されているようです。
  • パイプラインは、出力カタログでデータを生成することもあります。
  • すべての実行がドライバーで実行されるため、 JVM がをスローすること OutOfMemoryErrorがあります。

local[*] ローカルでのテストを目的としてマスター設定をに設定することは一般的ですが、コードをプラットフォームに展開する場合は、この設定を無効にする必要があります。 以下に示す方法の 1 つです。

SparkConf conf = new SparkConf();
  if (!conf.contains("spark.master")) {
    LOGGER.warn("No master set, using local[*]");
    conf.setMaster("local[*]");
  }
  context = new JavaSparkContext(conf);

これにより、spark-submitmaster 提供していない場合にのみ、 Spark が local[*]にフォールバックされます。

質問 : エラーコード MSG3000 の意味を教えてください。

A : この問題は、 Spark ジョブにのみ適用されます。 これは、 Spark コンテキストが閉じられているが、 Spark ドライバーが終了時にハングアップするときに発生する可能性があるタイムアウト ( 現在 5 分 ) に基づいています。

この問題が発生した場合、 Spark ドライバーからのリターンコードはまだ利用できません。その理由は次のとおりです。

  • Spark ドライバーが追加処理を行っているか、または
  • Spark コンテキストが閉じられた後のクリーンアップ、または
  • JVM が終了しないように、ドライバーに追加のスレッドが作成されている可能性があります。 詳細については 、 https://docs.oracle.com/javase/7/docs/api/java/lang/Thread.htmlを参照してください。

メモ

プラットフォームでジョブが「失敗」とマークされていますが、 Spark ジョブは実際には正常に完了している可能性があります。

対策 :

  • 出口ロジックを確認します。 たとえば、 Spark コンテキストを閉じた後に、無限ループまたは長時間の実行ループがないかを確認します。
  • カスタムスレッドが固着していないか確認してください。 正しく廃棄されていることを確認してください。
  • Spark ジョブコードから Spark コンテキストが明示的に閉じる操作を削除します。この操作は終了時に自動的に終了します。

質問 : エラーコード MSG4000 の意味を教えてください。

A : この問題は、 Flink ジョブにのみ適用されます。 障害が発生した場合、実行中の Flink ジョブがまず失敗状態に切り替わり、次に失敗状態に切り替わります。 予想された時間枠(現在 20 分)内に失敗から失敗に切り替えられなかっ た場合、パイプラインジョブはこのメッセージで失敗とマークされ、そのリソースが削除されます。

対策 :

詳細については、 Splunk ログを確認するか ( つまり、パイプラインバージョンの loggingUrl を使用して ) 、ログレベル をデバッグに設定してパイプラインを実行 して詳細を取得してください。

質問 : エラーコード MSG5000 の意味

A : MSG5000 エラーは、パイプラインのアカウンティングおよび監視メトリクスシステムの初期化に関する問題を示しています。 その結果、請求データを含むメトリクスの収集とレポートに失敗します。 この初期化では、タイムアウトが 2 分になります。 システムがその期間内に初期化されなかった場合、パイプラインは、監視データおよび請求データを収集せずに無期限に実行されないようにするために失敗したと報告されます。

アカウンティングおよび監視の初期化は、パイプラインジョブ自体の起動に影響を与えません。 そのため、パイプライン自体が正常に開始され、データの処理が開始される可能性があります。 バッチパイプラインの場合、タイムアウトになる前にパイプラインジョブが正常に完了することがあります。

対策 :

これはプラットフォームインフラストラクチャのエラーです。 ほとんどの場合、パイプラインを再起動するだけで十分です。

  • ストリームおよび未スケジュールのバッチ パイプライン: パイプラインを再度アクティブ化します
  • Scheduled バッチ パイプライン(スケジュール済みスケジュール):パイプラインはスケジュールの次のイテレーションで再起動されます。

問題が解決しない場合は、 HERE サポートチームに連絡してください。

質問 : エラーコード MSG6000 の意味を教えてください。

A : MSG6000 のエラーにより、新しいレルムのプロビジョニングに関する問題が強調表示されます。 新しいレルムがプロビジョニングされた瞬間と、そのレルムでパイプラインを実行できるタイミングの間に遅延が発生します。 この遅延時間は 1 時間を超えることはできません。

対策 :

ほとんどの場合、しばらくしてからパイプラインを再起動するだけで十分です。 1 時間待っても問題が解決しない場合は、 HERE サポートチームに連絡してください。

質問 : HERE platform で実行されているパイプラインからサードパーティのサービスを利用する適切な方法は何ですか。

A : パイプラインは、パイプラインからの HTTPS コールを使用して、サードパーティサービスにアクセスできます。 ただし、サードパーティのサービスは、 HERE プラットフォームのパイプラインコンポーネントのいずれにもアクセスできません。 サードパーティのサービスに接続するには 、 Connecting Pipeline to Third-party Services を参照してください

質問 : ブロックの取得に失敗したか、またはノードへの接続に失敗したために、パイプラインで繰り返しエラーが発生します。 問題なく、小規模なジョブが実行されました。 原因

A : ワーカー ( ノード ) が失われたために、大量のデータがあるジョブで発生することがあります。 OutOfMemory 例外が一般的な原因です。 クラスタが小さすぎるか、 JVM に割り当てられているメモリに問題がある可能性があります。 OutOfMemory ログファイルにメッセージがない場合は、 Grafana を使用して JVM メトリクスを確認し、ワーカーが削除されたときに実際にメモリ不足になっていないかどうかを確認します。

↑ページトップへ


Spark のトラブルシューティング

質問 : Spark UI を使用する理由

A : Spark フレームワークには、 Running その状態のすべての Spark ジョブでアクティブな Web コンソールが含まれています。 これは Spark UI と呼ば れ、プラットフォーム内から直接アクセスできます。 Spark UI では、ジョブ、ステージ、実行グラフ、実行者からのログなど、バッチ パイプラインの処理に関する情報を提供します。 データ プロセッシング ライブラリコンポーネントは、 読み取るメタデータパーティションの数、ダウンロードまたはアップロードされたデータのバイト数など、さまざまな統計情報(「 Spark AccumulatorV2 」を参照)も公開します。 このデータは、操作が実行された段階で参照できます。

ローカルに実行されたパイプラインの場合、ドライバーはドライバープロセスの一部として UI Web サーバーを起動します。 ドライバーの実行中 http://127.0.0.1:4040/jobsに、開発者はから Web サーバーにアクセスできます。 に PipelineRunner は、 --no-quit 開発者が Enter キーを押してから最終コミット後に終了するまで待機する便利なオプションがあります。

プラットフォームで実行されているバッチパイプラインの場合は、 CLI または Web ポータルを使用して、パイプラインジョブの詳細から Spark UI にアクセスできます。 プラットフォームポータルで Open Spark UI は、ジョブがデータの処理を開始するとリンクが表示されます。 実行中のジョブの Spark UI に移動します。

batch-2.1.0 ランタイム環境から、パイプラインジョブの実行が完了した後で Spark UI にアクセスすることもできます。 完了したジョブのランタイムデータには、完了後 30 日間、 Sparkk UI からアクセスできます。 この期間が経過 Open Spark UI すると、ジョブのランタイムデータが削除され、リンクは Web ポータルで利用できなくなります。

Spark UI でのトラブルシューティングの詳細について は、「 Spark UI 」を参照してください。

質問 : 以前に完了したパイプライン ジョブの Spark UI がロードされないのはなぜですか ?

A : パイプライン は、その実行過程でイベントログデータを生成します。このデータは、このイベント後に Spark UI を生成するために使用されます。 イベントログデータの量が約 1.5 GB を超えると、 Spark UI が正常にロードされないことがあります。

Spark は、実行された各タスクについて「開始」および「終了」イベントを生成するため、イベントログのサイズはタスクの数に直接比例します。 ステージ数もイベント数に影響しません。 最後に、実行ステップあたりのタスク数は、処理する必要があるパーティションの数に比例します。

その結果、パイプライン のパーティション数とステージ数が増加すると、タスク数が増加し、その結果、イベントログのサイズが大きくなります。

データセットが大きいほど、パーティションの数が多くなることが予想されます。 これは通常、よい練習です。 ただし、データのパーティション分割は過剰に行うことができます。 過剰なパーティション分割は、実際には処理時間とドライバーのメモリの問題の増加につながります。 同時に、パーティション分割不足によってメモリの問題が発生する可能性があります。 正しいバランスをとることは難しい。

パイプラインの特定のデータまたは処理ロジックを把握していない場合、パーティション数を減らしたときに改善があるかどうかを確認することをお勧めします。 パーティションを何らかの要因で削減すると、履歴サーバーの機能範囲でイベントログのサイズを取得できる場合があります。 正しい要因は使用事例によって異なります。

質問 : が表示される理由 Task Not Serializable Exception

A : 「タスクがシリアル化できません」例外は、特に複雑なクラス階層を使用する場合に、 Spark 開発で最も一般的に使用されます。 関数が Spark のラムダ関数として実行されると、その関数が参照するすべての変数 ( 終了 ) が作業者にシリアル化されます。 ほとんどの場合、最も簡単な修正は、クラスまたはインラインではなくオブジェクトで関数を宣言し、必要なすべての状態情報をパラメーターとして関数に渡すことです。

lambda 関数が、キャッシュなどのシリアライズできない状態を必要とする場合、共通のパターンは オブジェクト内の遅延値であり、最初にアクセスしたときにすべてのワーカーで初期化されます。 参照@transient によってシリアル化されないように、 val もマークする必要があります。

パフォーマンス上の理由 から、データ プロセッシング ライブラリでは Kryo シリアル化フレームワークが頻繁に使用されます。 このフレームワークは、 Spark が RDD に存在するオブジェクトをシリアライズおよびデシリアライズするために使用します。 これには、パーティションキーやメタデータなどの広く使用されている概念に加えて、コンパイラーパターンで T で識別された開発者が使用するカスタムタイプも含まれます。 また、 RDD ベースのパターンでは、開発者は任意のカスタムタイプを自由に導入し、そのようなタイプの DDs を宣言して使用できます。

処理中のライブラリは、アプリケーションで使用されているすべてのカスタムタイプを把握できませんが、 Kryo フレームワークにはこの情報が必要です。 そのため、開発者はカスタムの CryoRegistrator を提供する必要があります。

例 :


    package com.mycompany.myproject

    class MyKryoRegistrator extends com.here.platform.data.processing.spark.KryoRegistrator {

        override def userClasses: Seq[Class[_]] = Seq(
        classOf[MyClass1],
        classOf[MyClass2]
        )
    }

クラスの名前は、 application.conf を使用してライブラリ設定に指定する必要があります。


    spark {
        kryo.registrationRequired = true
        kryo.registrator = "com.mycompany.myproject.MyKryoRegistrator"
    }

↑ページトップへ


質問 : Flink ホームを使用する理由

A : Flink フレームワークには、実行中のすべての Flink ジョブで使用できる Web インターフェイスが含まれています。 これは Flink ホームと呼ばれ、プラットフォーム内から直接アクセスできます。 Flink ホームは、ストリーム パイプラインクラスタの概要や、メトリクス、チェックポイント、バックプレッシャーなどのジョブの詳細を含む、実行中の Flink に関する情報を提供します Flink ホームには、プラットフォームポータルのパイプラインジョブリスト表示から直接アクセスできます。

Flink ホームのトラブルシューティングの詳細については 、 Flink ホームを参照してください。

質問 : データの読み取りおよび書き込みにおける致命的なエラーの処理方法

A : パイプラインバージョンがストリーミングレイヤーに長期間( 1 週間以上など)書き込みを行うと、発信 HTTPS 接続でまれな TLS ハンドシェイクの失敗が発生することがあります。 これは 、ストリーミング Flink パイプライン内でデータを公開するために FlinkWriteEngine::publish使用されているSinkFunctionを返すことによってトリガーされます。 sink 関数の呼び出し中に例外がスローされた場合、その例外は自動的に処理され、ログに記録されます。 それ以外の場合 、データをデータクライアント経由で Flink シンクに送信する際に致命的な例外が発生します。 致命的な例外 が発生すると、パイプラインバージョンでエラーが発生し、状態が RUNNINGからFAILEDに変更されます。

解決策 :

この問題は SinkFunction 、致命的な例外をキャッチしてログに記録するようにパイプラインコードのを拡張することで解決されます。 パイプラインバージョンは引き続きメッセージの実行と処理を行います。 次のサンプルコードは、この新機能を示し SinkFunctionています。

import com.here.cvs.ss.hrs.vss.logger.TraceLogger;
import com.here.platform.data.client.model.PendingPartition;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

public class VSSFlinkSinkFunction implements SinkFunction<PendingPartition> {

    private static final long serialVersionUID = 6118402368186172504L;

    private static final TraceLogger TRACE_LOGGER = new TraceLogger(VSSFlinkSinkFunction.class);

    private final SinkFunction<PendingPartition> writeEngineSinkFunction;

    public VSSFlinkSinkFunction(SinkFunction<PendingPartition> writeEngineSinkFunction) {
        this.writeEngineSinkFunction = writeEngineSinkFunction;
    }

    @Override
    public void invoke(PendingPartition pendingPartition) {
        try {
            TRACE_LOGGER.setTraceId(pendingPartition.getPartition());
            writeEngineSinkFunction.invoke(pendingPartition);
            TRACE_LOGGER.info("SEND_MESSAGE_SUCCESSFUL");
        } catch (Exception exception) {
            TRACE_LOGGER.error("SEND_MESSAGE_FAILED | cause=" + exception, exception);
        }
    }
}

質問 : Flink でアキュムレータまたはカウンタを使用できますか ?

A : はい。 詳細について は、「アキュムレータとカウンタ」を参照してください。

質問 : Flink パイプラインの実行可能ファイル JAR ファイル 内で指定されたログは、 Splunk に表示されません。

A : これは Flink の既知の問題です。 この問題を回避するには、メインメソッドの最初の行として次の行を追加して、標準の出力およびエラーを System.out および System.err にリダイレクトします。

System.setOut(new PrintStream(new FileOutputStream(FileDescriptor.out)));
System.setErr(new PrintStream(new FileOutputStream(FileDescriptor.err)));

詳細については、次のバグを参照してください。 Flink-15504.

質問 : 「 JAR ファイル does not exist 」というメッセージが表示されてパイプラインに失敗しましたが、テンプレートは正常に作成されました。

A : このエラーメッセージは、 JAR ファイル に、制限のないメモリ使用量につながるエラーが含まれている場合に表示されます。 このメッセージは、内部 Flink エラーであり、上書きできないため、問題の根本原因を示していません。 ローカルの Flink インスタンスで JAR ファイル をテストしてください。

質問 : TaskManager のディスク容量が不足しているため、ストリーム パイプラインでエラーが発生しました。

A : Flink には既知のバグ(FLINK-15068) があり、デフォルトの RocksDB ログが INFO に設定されていますが、ログの記録は制限されていません。 そのため、 RocksDB ログによって、 TaskManager に割り当てられているディスク領域がいっぱいになる可能性があります。 この問題は、この例外の形式で発生する可能性があります。

"Caused by: org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for s3ablock-0001-"

回避 RockDBStateBackend 策は、 Flink ジョブ内で設定することです。 これには、 RocksDB が使用するチェックポイント URL およびデータベースストレージパスを指定する必要があります。 これらの 2 つの値は、 Java システムプロパティとしてパイプラインランタイム環境に渡されます。 次のコードスニペットは、システムプロパティからこれらの値を読み取り、 RocksDB オプションをオーバーライドしてログを最小化する例を示しています。

  1. pom.xml 以下の依存関係をファイルに追加します。 この依存関係は、パイプラインのランタイム環境で利用できるため、 Fat JAR ファイル から除外できます。

     <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
       <version>${flink.version}</version>
       <scope>provided<scope>
     </dependency>
    
  2. CustomOptionsFactory クラスを作成します。

    public class CustomOptionsFactory implements OptionsFactory {
    
     @Override
     public DBOptions createDBOptions(DBOptions dbOptions) {
       // Refer to https://javadoc.io/static/org.rocksdb/rocksdbjni/5.7.5/org/rocksdb/Options.html
       // Minimal logging
       dbOptions.setInfoLogLevel(InfoLogLevel.HEADER_LEVEL);
       // to stop dumping rocksdb.stats to LOG
       dbOptions.setStatsDumpPeriodSec(0);
       return dbOptions;
     }
    
     @Override
     public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions columnFamilyOptions) {
       return columnFamilyOptions;
     }
    }
    
  3. RocksDBStateBackend メインクラス内での設定 :

    StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
    ParameterTool systemParameters = ParameterTool.fromSystemProperties();
    // Use the Checkpoint URL provided by platform environment
    String checkpointUrl = systemParameters.get("checkpointUrl");
    // Use the disk space provided by platform environment for RocksDB local files
    String dbStoragePath = systemParameters.get("dbStoragePath");
    RocksDBStateBackend stateBackend = new RocksDBStateBackend(checkpointUrl, true);
    stateBackend.setDbStoragePath(dbStoragePath);
    
    stateBackend.setOptions(new CustomOptionsFactory());
    
    streamExecutionEnvironment.setStateBackend(stateBackend);
    

質問 : タスクがストリーム パイプラインのすべてのタスクマネージャに均等に分散されるわけではありません。

A : Flink (Stream-2.0.0(非推奨) 、 Stream-3.0.0 、および Stream-4.0) のデフォルトの動作では、別の TaskManager を使用する前に、 TaskManager のすべてのスロットを利用します。 そのため、 Flink ジョブの並列処理を超えるスロット数を持つストリームパイプラインの場合、一部の TaskManager からのスロットが完全に使用され、他の TaskManager にスロットが割り当てられます。 ユーザーは cluster.evenly-spread-out-slots: true、ストリーム設定で Flink 設定を設定することで、この動作を制御できます。 false ストリームパイプラインの場合、このプロパティはデフォルトでに設定されています。 詳細について は、 Flink チケット Flink-12122 を参照してください。

メモ

この設定は、 Stream-3.0.0 ( Flink 1.10.1 )および Stream-4.0 ( Flink 1.10.3 )ランタイム環境でのみ使用できます。

質問 : JobManager またはストリーム パイプラインのタスク管理者の CPU 使用率を確認する方法を教えてください。

A : 次のクエリは、 taskmanager コンテナの基盤となるインフラストラクチャによって報告されたメトリクスを使用して、 CPU 使用率を示します。

sum(rate(container_cpu_usage_seconds_total{pod=~"job-$deploymentId-tm-.*", container="taskmanager"}[5m])) by (pod) 
/ sum(container_spec_cpu_quota{pod=~"job-$deploymentId-tm-.*", container="taskmanager"}
/ container_spec_cpu_period{pod=~"job-$deploymentId-tm-.*", container="taskmanager"}) by (pod)

メモ

  • $deploymentId は、展開されたジョブの UUID 値です。 CLI 、ポータル、およびパイプライン API では、ジョブ ID とも呼ばれます。
  • $deploymentId は、クエリの deploymentId|jobId の値で置き換えることができます。または、変数 $deploymentId を Grafana ホームで設定できます。
  • 左の Y 軸の Grafana の単位はパーセント (0.0 ~ 1.0) にする必要があります。

JobManager の場合と同様に、クエリーは次のようになります。

sum(rate(container_cpu_usage_seconds_total{pod=~"job-$deploymentId-jm-.*", container="jobmanager"}[5m])) by (pod) 
/ sum(container_spec_cpu_quota{pod=~"job-$deploymentId-jm-.*", container="jobmanager"}
/ container_spec_cpu_period{pod=~"job-$deploymentId-jm-.*", container="jobmanager"}) by (pod)

以下の Grafana ホームのスクリーンショットは、 taskmanager の CPU 使用量の例を示しています

タスク管理者の CPU 使用量に関する Grafana ホームのスクリーンキャプチャ。
図 4. CPU 使用率に関するエントリをログに記録します。

質問 : 「セーブポイントに時間がかかりすぎました」というメッセージが表示された場合、どのような意味になりますか ?

ポータルディスプレイのスクリーンキャプチャ。
図 5. セーブポイントのエラーメッセージ

A : Flink パイプラインバージョンの一時停止またはアップグレード操作では、セーブポイントが取得され、パイプラインバージョンが中断した場所から再起動されます。 まれに、タイムアウトのためにセーブポイントを取得するプロセスが失敗することがあります。 このような場合 Savepoint took too long は、エラーメッセージが表示されます。 上の図に示されている例は、 2 分または 120,000 ミリ秒でタイムアウトしました。

メモ

Flink パイプラインのセーブポイントタイムアウトが 10 分に増加しました。 この変更により、このエラーの可能性が低減され、ストリームパイプラインの信頼性が向上します。

それでも Flink パイプラインでこの問題が発生する場合 は、操作を再試行することをお勧めします。 セーブポイントで継続的な障害が発生した場合は、パイプラインに Cancel報告してから再度Activate 報告することをお勧めします。 申し訳ありませんが、セーブポイントが成功しなかった場合、再開に使用できるパイプラインバージョンの保存済み状態は存在せず、データのゼロからの処理が開始されます。

Flink セーブポイントの詳細について は、「 Flink セーブポイント」を参照してください。

質問 : ジョブのデプロイ時にランタイム設定の一部のプロパティが使用できない理由

A : パイプライン バージョンのランタイム設定の一部として提供されているプロパティは、という名前のファイルのクラスパスで利用できます application.properties。 ストリーム ランタイムでは、Uber jar がapplication.properties を含んでいる場合、ランタイムによって提供された application.properties を超えるクラスパスで優先されます。 この問題は application.properties 、ローカル開発中にファイルが存在し、誤ってシェイディングされた jar に含まれている場合に発生する可能性があります。 解決 application.properties 策は、 uber jar にを含めることを除外することです。

<filter>
    <artifact>*:*</artifact>
    <excludes>
        <!-- This is to make sure that shaded jar doesn't have an application.properties -->
        <exclude>application.properties</exclude>
    </excludes>
</filter>

uber-jar の内容の制御の詳細については 'Maven shaded プラグイン のドキュメントを参照してください

↑ページトップへ

」に一致する結果は 件です

    」に一致する結果はありません