パイプラインプロセスとのインターフェース

HERE Platform Pipeline は、実行可能パイプラインバージョンに関連付けられている( Flink または Spark フレームワークを使用して)そのパイプライン用に選択されたランタイム環境のユーザーパイプラインを管理します。

ここでは、パイプラインによってジョブとして送信されたパイプラインバージョンに渡されるパラメータおよび設定ファイルについて説明します。

各パイプラインバージョンの詳細をパイプライン REST API 、 OLP CLI 、またはプラットフォームのポータル GUI を使用してインタラクティブに設定することで、設定ファイルのパラメータおよび内容を制御できます。

同じインターフェイスでローカルに実行されています

パイプラインをローカルで実行する場合、開発マシンの場合と同様に、パラメータおよびファイルを手動で渡すことによって、このドキュメントで説明しているインターフェイスを模倣する必要があります。

HERE platform SDK では、 Java および Scala の Maven アーキタイプを使用し て基本的なパイプラインプロジェクトを作成します。 このプロジェクトには、パイプラインのローカルでの開発および実行を簡素化するサンプルファイルおよびスクリプトが含まれています。

ローカル開発で は、pipeline-config.confおよびpipeline-job.confを 開発マシンのプロセスクラスパスまたはその場所 ( パス ) に含めることができます。このパスは、pipeline-config.fileおよびpipeline-job.fileシステムプロパティで指定します。 これらのファイルの詳細は、本番環境に移行すると変更されます。

入口および出口地点

パイプライン テンプレートで、パイプラインのエントリポイントを表すクラスを指定します。 このクラスは、ジョブが Flink of Spark に送信されたときのエントリポイントの選択に使用されます。 JVM によって新しいオブジェクトが作成 main され、メソッドで実行が開始されます。

ストリーム処理パイプラインアプリケーションは通常終了しませんが、終了することがあります。 バッチ処理パイプラインは、処理が完了すると自然に終了します。 パイプラインアプリケーションが終了すると、エラーコードが返されます。 エラーコードが 0 の場合、アプリケーションは正常に終了したと見なされます。 エラーコードがゼロ以外の場合、エラーが原因でアプリケーションが終了したと見なされ、パイプラインサービスがこのイベントをジョブの説明で報告するか、またはジョブの再送信を試みる可能性があります。

システムのプロパティ

次の JVM システムプロパティは、パイプラインが新しいジョブとして送信されたときにパイプライン API によって設定されます。 メソッドまたは System.getProperties() 同等のメソッドを使用して取得できます。

  • olp.pipeline.id: パイプライン API で定義されているパイプラインの識別子
  • olp.pipeline.version.id: パイプライン API で定義されているパイプラインバージョンの識別子
  • olp.deployment.id: パイプライン API で定義されているジョブの識別子
  • olp.realm: 顧客レルム

プラットフォームで使用される追加のプロパティパスを次に示します。

  • env.api.lookup.host
  • akka.*
  • here.platform.*
  • com.here.*

これらに加えて、ランタイム環境を設定するために他のプロパティがシステムによって設定されます。 選択したパイプラインバージョン設定に関連付けられている Spark または Flink の設定パラメータが含まれます。 実際の詳細情報は、選択した環境とそのバージョンに固有です。 このような詳細情報は変更される可能性があるため、実装固有のものと見なされ、決定に任せられます。

このセクションで指定したシステムプロパティは、メインユーザープロセスからのみ表示されます。 これらのシステムプロパティは、必ずしもクラスタのワーカーノードで実行されている JVM に複製されるわけではありません。

パイプラインの設定

対応するパイプラインバージョンで指定されているパイプラインの設定が、という名前のファイルを介して渡さ pipeline-config.confれます。 このファイルは、メインユーザープロセスのクラスパスに追加されます。

ファイルの形式は HOCONで、 JSON および Java のプロパティのスーパーセットです。 Lightbend のオープンソースの typesafe 設定ライブラリで解析 できます。

データ プロセッシング ライブラリがパイプラインを使用して実装されている場合、解析は pipeline-runner パッケージによって自動的に処理されます。 このパッケージでは、 パイプラインサービスとのインターフェイスを容易にし、パイプラインを実装するためのメインアプリケーションも提供されます。

内容の例 pipeline-config.conf:


    pipeline.config {
         billing-tag = "test-billing-tag"
         output-catalog { hrn = "hrn:here:data:::example-output" }
         input-catalogs {
             test-input-1 { hrn = "hrn:here:data:::example1" }
             test-input-2 { hrn = "hrn:here:data:::example2" }
             test-input-3 { hrn = "hrn:here:data:::example3" }
         }
     }

ここで、

  • billing-tag パイプラインの請求エントリをグループ化するオプションのタグを指定します。
  • output-catalog パイプラインの出力カタログを識別する HERE リソースネーム を指定します。
  • input-catalogs パイプラインの 1 つ以上の入力カタログを指定します。 各入力カタログについて、その固定 ID が実際のカタログの HERE リソースネーム とともに提供されます。

パイプラインの実装では、固定 ID を使用して複数の入力カタログをバインドし、区別できます。 固定 ID はパイプライン テンプレートで定義されます。 各パイプラインバージョンに HERE リソースネーム が定義され、同じパイプライン テンプレートを複数の設定で再利用できます。

バッチ パイプラインジョブ

バッチパイプラインは特定のジョブを実行して終了します。 ストリームパイプラインは、時間的制約のある特定のジョブを実行するものではなく、継続的に実行されます。 このセクションは、バッチパイプラインにのみ適用されます。

バッチパイプラインは 、 1 つ以上の入力カタログの 1 つ以上のバージョン付レイヤーを処理 して 、 1 つの出力カタログで 1 つ以上のバージョン付レイヤーを生成します。 バージョン管理されたレイヤーは、データセットのパーティション分割された整合性のあるスナップショットを パイプライン がスケジュール済み状態の場合、パイプライン API スケジューラはパイプラインの実行を管理します。 設定された入力カタログを監視し、新しいデータが入力カタログのいずれかに公開されたことを検出した場合に、 Spark へのジョブ送信をトリガーします。

パイプラインに渡されたジョブには 、処理する各入力カタログのバージョンが記述されています。 これらは通常、各カタログのより新しいバージョンです。

簡単ですが、正解 です。バッチ パイプラインは、ジョブで記述されているバージョンのすべての関連する入力レイヤーに保存されているすべてのデータを取得し、処理を実行して結果を独自の出力レイヤーに公開し、出力カタログの新しいバージョンを生成します。 このイベントによって、そのカタログを入力として使用する他のバッチパイプラインがトリガされることがあります。 ジョブの説明には、カタログが変更されたかどうかにかかわらず、各入力カタログのバージョン番号が常に含まれています。

入力の完全な再処理は、通常、リソースを大量に消費する操作です。 スケジューラは、以前のジョブおよびその時点で使用されていた各入力カタログのバージョンも追跡します。 この追加情報は、ジョブの説明にも含ま since-version れています ( 「パラメータ」を参照 ) 。 入力全体を再処理する代わりに、より高度なパイプラインでは、前回の実行以降に変更された内容のみを処理し、出力を最初から再生成するのではなく、単に更新することができます。 このような増分処理の実装は簡単ではありません。 HERE platform に含まれているデータ プロセッシング ライブラリは、この問題を解決するための重要なサポートを提供します。そのため、ライブラリが増分処理の複雑さを処理しながら、ユーザーはビジネスロジックに集中できます。

ジョブの説明は、という名前のファイルを介して渡さ pipeline-job.confれます。 このファイルは HOCON 形式でもあり、メインユーザープロセスのクラスパスに追加されます。 このファイルはオプションで、バッチ処理パイプラインにのみ存在します。

内容の例 pipeline-job.conf:

    pipeline.job.catalog-versions {
        output-catalog { base-version = 42 }
        input-catalogs {
            test-input-1 {
                processing-type = "no_changes"
                version = 19
                }
            test-input-2 {
                processing-type = "changes"
                since-version = 70
                version = 75
                }
            test-input-3 {
                processing-type = "reprocess"
                version = 314159
            }
        }
    }

ここで、

  • base-version のは output-catalog 、新しいデータをパブリッシュする、既存のカタログのバージョンを示します。 このパラメータは今後必要になることはなく、削除されます。 現在、出力カタログの更新済みデータにコミットする必要があります。
  • input-catalogs 各入力には version 、その入力のうち最新のものが含まれています。 これは処理するバージョンです。 また、ジョブの最後の実行以降に変更された内容を指定する情報も含まれます。 カタログは、パイプライン設定ファイルにあるものと同じ識別子を使用して識別できます。
  • processing-type 最後に正常に実行されてから各入力で変更された内容について説明します。 値は no_changes、、 changes 、および reprocessです。

    • no_changes 最後の実行以降に入力カタログが変更されていないことを示します。
    • changes 入力カタログが変更されたことを示します。 2 番目のパラメーター since-version が含まれ、そのカタログのどのバージョンが最後の実行で処理されたかを示します。
    • reprocess 入力カタログが変更されたかどうかを指定しません。 パイプラインは、何らかの増分処理を試みるのではなく、そのカタログ全体を再処理するよう要求されます。 これは、明示的なユーザー要求、またはパイプラインの初回実行などのシステム条件が原因である可能性があります。

HERE platform リリース 2.3 では pipeline-job.conf 、ファイルのすべてのパラメータは任意です。 詳細が指定されていない場合、パイプラインはデフォルトで最新の入力カタログバージョンを選択して再処理します。

ユーザー設定

pipeline-config.conf との両方 pipeline-job.conf がパイプラインによって提供され、コンテンツも定義されます。 ユーザーは、パイプラインでカスタム設定を利用できるようにすることができます。 詳細については、設定ファイル参照ページのapplication.propertiesを参照してください。

も参照してください

」に一致する結果は 件です

    」に一致する結果はありません