PipelineRunnerメソッドを実装 main し、コマンド ラインとインターフェイスするためのパイプライン API およびシステムプロパティの解析を提供する実行可能なクラスです。 パイプラインの設定とジョブの説明が検出され、開発者のコードに提供されます。 開発者は、コードをローカルで実行する場合と、パイプライン API を介してパイプラインを HERE platform に展開する場合の両方のエントリポイントとしてこのクラスを使用する必要があります。
Driver分散処理を制御するメインクラスで DriverContext、 Apache Spark へのアクセスを提供し、呼び出された内容の入力および出力カタログの Data API への事前設定された Spark フレンドリーなアクセスを提供します。 Driverはコンテキストを設定し、Driverに添付された 1 つ以上のDriverTask形式でモデル化され、順次実行されるデータ処理コードにコンテキストを提供します。
タスクが処理する入力カタログのレイヤーのサブセット。タスクによって、入力カタログで使用可能なレイヤー間で 1 つ以上のレイヤーが入力として宣言されます。 各レイヤーは、カタログのシンボリック ID およびレイヤー ID で指定されます。 複数のタスクが同じ入力レイヤーを入力として宣言することがあります。複数のタスクの入力レイヤーのセットがオーバーラップしても安全です。
def main(args: Array[String]):Unit={// the application version, when it changes incremental compilation// is disabledval applicationVersion ="1.0.0"// build a driver context with the default configurationval driverContext =new DriverContext.Builder(applicationVersion).build()// create a driver builder and add a driver taskval driverBuilder =new DriverBuilder(driverContext)
driverBuilder.addTask(new MyTask(driverContext))// build the driver and run the processing jobval driver = driverBuilder.build()
driver.run()}
publicstaticvoidmain(String args[]){// the application version, when it changes incremental compilation// is disabledString applicationVersion ="1.0.0";// build a driver context with the default configurationDriverContext driverContext =newDriverContext.Builder(applicationVersion).build();// create a driver builder and add a driver taskDriverBuilder driverBuilder =newDriverBuilder(driverContext);
driverBuilder.addTask(newMyTask(driverContext));// build the driver and run the processing jobDriver driver = driverBuilder.build();
driver.run();}
def main(args: Array[String]):Unit={// the application version, when it changes incremental compilation// is disabledval applicationVersion ="1.0.0"// custom configurationval completeConfig = CompleteConfig(
Seq("here.platform.data-processing.executors.compilein.threads=20","here.platform.data-processing.executors.compileout.threads=10"))// build a driver context with a customized configurationval driverContext =new DriverContext.Builder(applicationVersion).setCompleteConfig(completeConfig).build()// create a driver builder and add a driver taskval driverBuilder =new DriverBuilder(driverContext)
driverBuilder.addTask(new MyTask(driverContext))// build the driver and run the processing jobval driver = driverBuilder.build()
driver.run()}
publicstaticvoidmain(String args[]){// the application version, when it changes incremental compilation// is disabledString applicationVersion ="1.0.0";// custom configurationCompleteConfig completeConfig =CompleteConfig.load(Arrays.asList("here.platform.data-processing.executors.compilein.threads=20","here.platform.data-processing.executors.compileout.threads=10"));// build a driver context with a customized configurationDriverContext driverContext =newDriverContext.Builder(applicationVersion).setCompleteConfig(completeConfig).build();// create a driver builder and add a driver taskDriverBuilder driverBuilder =newDriverBuilder(driverContext);
driverBuilder.addTask(newMyTask(driverContext));// build the driver and run the processing jobDriver driver = driverBuilder.build();
driver.run();}
ローカル開発中に、 Spark マスター URL を明示的に指定する必要があります。 パイプライン API でパイプラインを実行する場合、この操作は不要です。
Scala
Java
def main(args: Array[String]):Unit={// the application version, when it changes incremental compilation// is disabledval applicationVersion ="1.0.0"// build a driver context with support for local developmentval driverContextBuilder =new DriverContext.Builder(applicationVersion)if(isLocalRun){
driverContextBuilder.setSparkMaster("local[*]")}val driverContext = driverContextBuilder.build()// create a driver builder and add a driver taskval driverBuilder =new DriverBuilder(driverContext)
driverBuilder.addTask(new MyTask(driverContext))// build the driver and run the processing jobval driver = driverBuilder.build()
driver.run()}
publicstaticvoidmain(String args[]){// the application version, when it changes incremental compilation// is disabledString applicationVersion ="1.0.0";// build a driver context with support for local developmentDriverContext.Builder driverContextBuilder =newDriverContext.Builder(applicationVersion);if(isLocalRun){
driverContextBuilder.setSparkMaster("local[*]");}DriverContext driverContext = driverContextBuilder.build();// create a driver builder and add a driver taskDriverBuilder driverBuilder =newDriverBuilder(driverContext);
driverBuilder.addTask(newMyTask(driverContext));// build the driver and run the processing jobDriver driver = driverBuilder.build();
driver.run();}
インクリメンタルパブリッシャ は、出力カタログメタデータにアクセスして、生成されたペイロードをチェックサム経由ですでに利用可能なペイロードと比較できます。 その後、増分パブリッシャが Data Blob API とは異なるペイロードをアップロードします。同じペイロードは廃棄されます。 このプロセスは、 Spark ジョブの一部として、常に分散した方法で実装されます。 インクリメンタルパブリッシャーは、データ公開 API で使用されるコミットメタデータも作成します。