バッチパイプラインのアーキテクチャ

バッチパイプラインは、データをバッチで処理するために使用される特定の種類のパイプラインです。

パイプライン SCHEDULED がステートにある場合、パイプライン API は、入力カタログの変更など、一部の条件が満たされたとき、または外部イベントが検出されたときに、その実行をトリガーします。 ジョブの説明 には、この変更の詳細が保持され、パイプラインに渡されます。 パイプラインがトリガーされると RUNNING 、状態に切り替わり、データが処理されて結果が出力カタログにコミットされます。 このプロセスが終了 SCHEDULED し、パイプラインが状態に戻ります。

パイプライン、その状態、および状態の遷移の詳細については、『パイプライン API 開発者ガイド』を参照してください。

データ プロセッシング ライブラリは、 Apache Spark でのバッチパイプラインの開発をサポートしています。

内部構造

ドライバーの概要、ドライバータスクの例、入力レイヤーと出力レイヤーの接続方法
ドライバーの概要、ドライバータスクの例、入力レイヤーと出力レイヤーの接続方法

バッチ パイプラインを開発するには、次の 2 つのクラスから開始します。

  1. PipelineRunnerメソッドを実装 main し、コマンド ラインとインターフェイスするためのパイプライン API およびシステムプロパティの解析を提供する実行可能なクラスです。 パイプラインの設定とジョブの説明が検出され、開発者のコードに提供されます。 開発者は、コードをローカルで実行する場合と、パイプライン API を介してパイプラインを HERE platform に展開する場合の両方のエントリポイントとしてこのクラスを使用する必要があります。

  2. Driver分散処理を制御するメインクラスで DriverContext、 Apache Spark へのアクセスを提供し、呼び出された内容の入力および出力カタログの Data API への事前設定された Spark フレンドリーなアクセスを提供します。 Driverはコンテキストを設定し、Driverに添付された 1 つ以上のDriverTask形式でモデル化され、順次実行されるデータ処理コードにコンテキストを提供します。

Driver を手動で作成するか DriverBuilder 、またはパッケージにある推奨のヘルパークラスを使用 com.here.platform.data.processing.driverして作成できます。

ドライバを設定します

Driverは、手動またはDriverBuilder経由で設定して、 1 つ以上のデータ処理DriverTasksを添付できます。 各タスクは、一連の入力レイヤーのデータを消費することで、出力レイヤーのセットのデータを生成するエンドツーエンドのバッチ処理ロジックを表します。

各タスクで指定される内容は次

  • タスクが処理する入力カタログのレイヤーのサブセット。タスクによって、入力カタログで使用可能なレイヤー間で 1 つ以上のレイヤーが入力として宣言されます。 各レイヤーは、カタログのシンボリック ID およびレイヤー ID で指定されます。 複数のタスクが同じ入力レイヤーを入力として宣言することがあります。複数のタスクの入力レイヤーのセットがオーバーラップしても安全です。
  • 出力カタログで生成するタスクの出力レイヤー。各タスクは、 1 つ以上の出力レイヤーを排他的に作成します。つまり、各出力レイヤーは 1 つのタスクでのみ作成できます。 2 つ以上のタスクで同じ出力レイヤーを宣言することはできません。これは、設定がサポートされていないためです。

データ プロセッシング ライブラリでは、入力レイヤーに変更が検出されない限りタスクが実行されないように、インクリメンタル処理がサポートされています。 ただし、他のタスクがまだ実行されている場合があります。

PipelineRunner なしでドライバーを設定します

PipelineRunner このクラスは DriverContext 、構築に関するすべての詳細情報を抽象化し、ローカル開発中に役立つコマンド ラインオプションの組み込みサポートを提供します。 ビジネスロジックをパイプラインに接続する方法をお勧めします。 または、を手動でインスタンス化 DriverContextDriver 、を作成して、任意 main の方法で処理ジョブを実行することもできます。

Scala
Java
def main(args: Array[String]): Unit = {

  // the application version, when it changes incremental compilation
  // is disabled
  val applicationVersion = "1.0.0"

  // build a driver context with the default configuration
  val driverContext = new DriverContext.Builder(applicationVersion).build()

  // create a driver builder and add a driver task
  val driverBuilder = new DriverBuilder(driverContext)

  driverBuilder.addTask(new MyTask(driverContext))

  // build the driver and run the processing job
  val driver = driverBuilder.build()

  driver.run()
}
public static void main(String args[]) {

  // the application version, when it changes incremental compilation
  // is disabled
  String applicationVersion = "1.0.0";

  // build a driver context with the default configuration
  DriverContext driverContext = new DriverContext.Builder(applicationVersion).build();

  // create a driver builder and add a driver task
  DriverBuilder driverBuilder = new DriverBuilder(driverContext);

  driverBuilder.addTask(new MyTask(driverContext));

  // build the driver and run the processing job
  Driver driver = driverBuilder.build();

  driver.run();
}

パイプラインの設定、ジョブの説明、および完全な設定は、 DriverContext の構築時に自動的に検出されますが DriverContext.Builder 、クラスの対応する設定値を使用して上書きできます。

Scala
Java
def main(args: Array[String]): Unit = {

  // the application version, when it changes incremental compilation
  // is disabled
  val applicationVersion = "1.0.0"

  // custom configuration
  val completeConfig = CompleteConfig(
    Seq("here.platform.data-processing.executors.compilein.threads=20",
        "here.platform.data-processing.executors.compileout.threads=10"))

  // build a driver context with a customized configuration
  val driverContext =
    new DriverContext.Builder(applicationVersion).setCompleteConfig(completeConfig).build()

  // create a driver builder and add a driver task
  val driverBuilder = new DriverBuilder(driverContext)

  driverBuilder.addTask(new MyTask(driverContext))

  // build the driver and run the processing job
  val driver = driverBuilder.build()

  driver.run()
}
public static void main(String args[]) {

  // the application version, when it changes incremental compilation
  // is disabled
  String applicationVersion = "1.0.0";

  // custom configuration
  CompleteConfig completeConfig =
      CompleteConfig.load(
          Arrays.asList(
              "here.platform.data-processing.executors.compilein.threads=20",
              "here.platform.data-processing.executors.compileout.threads=10"));

  // build a driver context with a customized configuration
  DriverContext driverContext =
      new DriverContext.Builder(applicationVersion).setCompleteConfig(completeConfig).build();

  // create a driver builder and add a driver task
  DriverBuilder driverBuilder = new DriverBuilder(driverContext);

  driverBuilder.addTask(new MyTask(driverContext));

  // build the driver and run the processing job
  Driver driver = driverBuilder.build();

  driver.run();
}

ローカル開発中に、 Spark マスター URL を明示的に指定する必要があります。 パイプライン API でパイプラインを実行する場合、この操作は不要です。

Scala
Java
def main(args: Array[String]): Unit = {

  // the application version, when it changes incremental compilation
  // is disabled
  val applicationVersion = "1.0.0"

  // build a driver context with support for local development
  val driverContextBuilder = new DriverContext.Builder(applicationVersion)

  if (isLocalRun) {
    driverContextBuilder.setSparkMaster("local[*]")
  }

  val driverContext = driverContextBuilder.build()

  // create a driver builder and add a driver task
  val driverBuilder = new DriverBuilder(driverContext)

  driverBuilder.addTask(new MyTask(driverContext))

  // build the driver and run the processing job
  val driver = driverBuilder.build()

  driver.run()
}
public static void main(String args[]) {

  // the application version, when it changes incremental compilation
  // is disabled
  String applicationVersion = "1.0.0";

  // build a driver context with support for local development
  DriverContext.Builder driverContextBuilder = new DriverContext.Builder(applicationVersion);

  if (isLocalRun) {
    driverContextBuilder.setSparkMaster("local[*]");
  }

  DriverContext driverContext = driverContextBuilder.build();

  // create a driver builder and add a driver task
  DriverBuilder driverBuilder = new DriverBuilder(driverContext);

  driverBuilder.addTask(new MyTask(driverContext));

  // build the driver and run the processing job
  Driver driver = driverBuilder.build();

  driver.run();
}

データ処理の形式でコンパイルします

直接実装することもできますが DriverTask、代わり にコンパイラを実装することをお勧めします。これは、処理ライブラリによって提供される抽象化の上位レベルであるためです。

データ プロセッシング ライブラリのコンテキストで は、コンパイラ ーは入力レイヤーを出力レイヤーに機能的に変換するバッチパイプラインを参照します。 このタイプのパイプラインは、入力 versioned レイヤーおよび出力レイヤーでのみ動作します。

この種類の変換は 、コンパイル またはより具体的に はマップのコンパイルと呼ばれ、関連するレイヤーは標準またはカスタムの形式で入力および出力マップ データを表します。

データ プロセッシング ライブラリでは、コンパイラーの作成だけでなく、インクリメンタル・コンパイルなどの重要な機能を有効にするパターンについてもガイドを提供します。

1 つのコンパイラを実行する DriverTask
1 つのコンパイラを実行する DriverTask

必要なデータ処理ロジックを コンパイラーの形式で記述した場合DriverTask 、処理ライブラリは、そのデータ変換ロジックをタスク用に宣言された入力レイヤーに適用するためのの実装を提供します。追加のコードを記述する必要はありません。 DriverBuildernewTaskBuilderメソッドと、それが返すTaskBuilderメソッドを使用します。

コンパイラーのチェーンを実行する DriverTask
コンパイラーのチェーンを実行する DriverTask

さらに、コンパイラーチェーン化して、 1 つの コンパイラーによって生成された出力レイヤーを チェーン内の他の任意のコンパイラーの入力として使用できます。 コンパイラーをチェーン化するためのDriverTask実装を取得するには、DriverBuildernewMultiCompilerTaskBuilderメソッドとそれに対応するMultiCompilerTaskBuilderメソッドを使用します。

分散処理

Driver 1 つ以上 DriverTask のオブジェクトを割り当ててを設定すると PipelineRunner 、は処理を開始します。 は Driver 、 Data API を介して出力カタログで新しいパブリケーションを開始し、 Spark で各タスクを順次起動します。通常は、タスクごとに 1 つの Spark ジョブです。 これは、カスタム DriverTaskのと RDD ベース のコンパイルパターンでは異なる場合があります。

Driver 、バージョン情報を処理し、処理をオーケストレーションするだけです。 メタデータおよびデータのペイロードはによって処理されませ Driverん。処理されないと、ソリューションのスケーラビリティが妨げられます。

各タスクについて、順番に次の操作を行います。

  1. Driver 、タスクが必要とするレイヤーのメタデータについて Data API に問い合わせ、オプションでフィルタリングして、実際 DriverTask の実装に渡します。 このプロセスは、分散した方法で実行される Spark 変換で構成されます。 通常、タスクの実装では Retriever、にあるを使用し DriverContext て Data Blob API からペイロードを取得します。 読み込みは、 Spark ジョブの一部として並行して実行されます。 このタスクでは、 Protobuf ペイロードをデコードしてカスタム処理ロジックを適用することで、データの処理を続行します。
  2. コンパイルタスクによって、タスクが所有する出力カタログのレイヤーにコミットする必要があるペイロードが生成されます。 処理の結果は コンパイラ ーの実装によって返され、常に同じ Spark ジョブの一部として、組み込みのインクリメンタルパブリッシャーに渡されます。
  3. インクリメンタルパブリッシャ は、出力カタログメタデータにアクセスして、生成されたペイロードをチェックサム経由ですでに利用可能なペイロードと比較できます。 その後、増分パブリッシャが Data Blob API とは異なるペイロードをアップロードします。同じペイロードは廃棄されます。 このプロセスは、 Spark ジョブの一部として、常に分散した方法で実装されます。 インクリメンタルパブリッシャーは、データ公開 API で使用されるコミットメタデータも作成します。

Driver 添付 DriverTaskされているすべてののの実行を正常に終了すると、パブリケーションが完了し、新しいデータとメタデータが出力カタログにトランザクションとしてパブリッシュされます。 エラーが発生した場合、パブリケーションは中止さ FAILEDれ、ジョブはとしてマークされます。

」に一致する結果は 件です

    」に一致する結果はありません