パイプラインプロセスとのインターフェース
HERE Platform Pipeline は、実行可能パイプラインバージョンに関連付けられている( Flink または Spark フレームワークを使用して)そのパイプライン用に選択されたランタイム環境のユーザーパイプラインを管理します。
ここでは、パイプラインによってジョブとして送信されたパイプラインバージョンに渡されるパラメータおよび設定ファイルについて説明します。
各パイプラインバージョンの詳細をパイプライン REST API 、 OLP CLI 、またはプラットフォームのポータル GUI を使用してインタラクティブに設定することで、設定ファイルのパラメータおよび内容を制御できます。
同じインターフェイスでローカルに実行されています
パイプラインをローカルで実行する場合、開発マシンの場合と同様に、パラメータおよびファイルを手動で渡すことによって、このドキュメントで説明しているインターフェイスを模倣する必要があります。
HERE platform SDK では、 Java および Scala の Maven アーキタイプを使用し て基本的なパイプラインプロジェクトを作成します。 このプロジェクトには、パイプラインのローカルでの開発および実行を簡素化するサンプルファイルおよびスクリプトが含まれています。
ローカル開発で は、pipeline-config.conf
およびpipeline-job.conf
を 開発マシンのプロセスクラスパスまたはその場所 ( パス ) に含めることができます。このパスは、pipeline-config.file
およびpipeline-job.file
システムプロパティで指定します。 これらのファイルの詳細は、本番環境に移行すると変更されます。
入口および出口地点
パイプライン テンプレートで、パイプラインのエントリポイントを表すクラスを指定します。 このクラスは、ジョブが Flink of Spark に送信されたときのエントリポイントの選択に使用されます。 JVM によって新しいオブジェクトが作成 main
され、メソッドで実行が開始されます。
ストリーム処理パイプラインアプリケーションは通常終了しませんが、終了することがあります。 バッチ処理パイプラインは、処理が完了すると自然に終了します。 パイプラインアプリケーションが終了すると、エラーコードが返されます。 エラーコードが 0 の場合、アプリケーションは正常に終了したと見なされます。 エラーコードがゼロ以外の場合、エラーが原因でアプリケーションが終了したと見なされ、パイプラインサービスがこのイベントをジョブの説明で報告するか、またはジョブの再送信を試みる可能性があります。
システムのプロパティ
次の JVM システムプロパティは、パイプラインが新しいジョブとして送信されたときにパイプライン API によって設定されます。 メソッドまたは System.getProperties()
同等のメソッドを使用して取得できます。
-
olp.pipeline.id
: パイプライン API で定義されているパイプラインの識別子 -
olp.pipeline.version.id
: パイプライン API で定義されているパイプラインバージョンの識別子 -
olp.deployment.id
: パイプライン API で定義されているジョブの識別子 -
olp.realm
: 顧客レルム
プラットフォームで使用される追加のプロパティパスを次に示します。
env.api.lookup.host
akka.*
here.platform.*
com.here.*
これらに加えて、ランタイム環境を設定するために他のプロパティがシステムによって設定されます。 選択したパイプラインバージョン設定に関連付けられている Spark または Flink の設定パラメータが含まれます。 実際の詳細情報は、選択した環境とそのバージョンに固有です。 このような詳細情報は変更される可能性があるため、実装固有のものと見なされ、決定に任せられます。
このセクションで指定したシステムプロパティは、メインユーザープロセスからのみ表示されます。 これらのシステムプロパティは、必ずしもクラスタのワーカーノードで実行されている JVM に複製されるわけではありません。
パイプラインの設定
対応するパイプラインバージョンで指定されているパイプラインの設定が、という名前のファイルを介して渡さ pipeline-config.conf
れます。 このファイルは、メインユーザープロセスのクラスパスに追加されます。
注
ファイルの形式は HOCONで、 JSON および Java のプロパティのスーパーセットです。 Lightbend のオープンソースの typesafe 設定ライブラリで解析 できます。
データ プロセッシング ライブラリがパイプラインを使用して実装されている場合、解析は pipeline-runner パッケージによって自動的に処理されます。 このパッケージでは、 パイプラインサービスとのインターフェイスを容易にし、パイプラインを実装するためのメインアプリケーションも提供されます。
内容の例 pipeline-config.conf
:
pipeline.config {
billing-tag = "test-billing-tag"
output-catalog { hrn = "hrn:here:data:::example-output" }
input-catalogs {
test-input-1 { hrn = "hrn:here:data:::example1" }
test-input-2 { hrn = "hrn:here:data:::example2" }
test-input-3 { hrn = "hrn:here:data:::example3" }
}
}
ここで、
-
billing-tag
パイプラインの請求エントリをグループ化するオプションのタグを指定します。 -
output-catalog
パイプラインの出力カタログを識別する HERE リソースネーム を指定します。 -
input-catalogs
パイプラインの 1 つ以上の入力カタログを指定します。 各入力カタログについて、その固定 ID が実際のカタログの HERE リソースネーム とともに提供されます。
パイプラインの実装では、固定 ID を使用して複数の入力カタログをバインドし、区別できます。 固定 ID はパイプライン テンプレートで定義されます。 各パイプラインバージョンに HERE リソースネーム が定義され、同じパイプライン テンプレートを複数の設定で再利用できます。
バッチ パイプラインジョブ
バッチパイプラインは特定のジョブを実行して終了します。 ストリームパイプラインは、時間的制約のある特定のジョブを実行するものではなく、継続的に実行されます。 このセクションは、バッチパイプラインにのみ適用されます。
バッチパイプラインは 、 1 つ以上の入力カタログの 1 つ以上のバージョン付レイヤーを処理 して 、 1 つの出力カタログで 1 つ以上のバージョン付レイヤーを生成します。 バージョン管理されたレイヤーは、データセットのパーティション分割された整合性のあるスナップショットを パイプライン がスケジュール済み状態の場合、パイプライン API スケジューラはパイプラインの実行を管理します。 設定された入力カタログを監視し、新しいデータが入力カタログのいずれかに公開されたことを検出した場合に、 Spark へのジョブ送信をトリガーします。
パイプラインに渡されたジョブには 、処理する各入力カタログのバージョンが記述されています。 これらは通常、各カタログのより新しいバージョンです。
簡単ですが、正解 です。バッチ パイプラインは、ジョブで記述されているバージョンのすべての関連する入力レイヤーに保存されているすべてのデータを取得し、処理を実行して結果を独自の出力レイヤーに公開し、出力カタログの新しいバージョンを生成します。 このイベントによって、そのカタログを入力として使用する他のバッチパイプラインがトリガされることがあります。 ジョブの説明には、カタログが変更されたかどうかにかかわらず、各入力カタログのバージョン番号が常に含まれています。
入力の完全な再処理は、通常、リソースを大量に消費する操作です。 スケジューラは、以前のジョブおよびその時点で使用されていた各入力カタログのバージョンも追跡します。 この追加情報は、ジョブの説明にも含ま since-version
れています ( 「パラメータ」を参照 ) 。 入力全体を再処理する代わりに、より高度なパイプラインでは、前回の実行以降に変更された内容のみを処理し、出力を最初から再生成するのではなく、単に更新することができます。 このような増分処理の実装は簡単ではありません。 HERE platform に含まれているデータ プロセッシング ライブラリは、この問題を解決するための重要なサポートを提供します。そのため、ライブラリが増分処理の複雑さを処理しながら、ユーザーはビジネスロジックに集中できます。
ジョブの説明は、という名前のファイルを介して渡さ pipeline-job.conf
れます。 このファイルは HOCON 形式でもあり、メインユーザープロセスのクラスパスに追加されます。 このファイルはオプションで、バッチ処理パイプラインにのみ存在します。
内容の例 pipeline-job.conf
:
pipeline.job.catalog-versions {
output-catalog { base-version = 42 }
input-catalogs {
test-input-1 {
processing-type = "no_changes"
version = 19
}
test-input-2 {
processing-type = "changes"
since-version = 70
version = 75
}
test-input-3 {
processing-type = "reprocess"
version = 314159
}
}
}
ここで、
注
HERE platform リリース 2.3 では pipeline-job.conf
、ファイルのすべてのパラメータは任意です。 詳細が指定されていない場合、パイプラインはデフォルトで最新の入力カタログバージョンを選択して再処理します。
ユーザー設定
pipeline-config.conf
との両方 pipeline-job.conf
がパイプラインによって提供され、コンテンツも定義されます。 ユーザーは、パイプラインでカスタム設定を利用できるようにすることができます。 詳細については、設定ファイル参照ページのapplication.properties
を参照してください。
も参照してください