実行ノードに入力レイヤーをブロードキャストします

ライブラリでは、入力カタログの特定のレイヤーからのコンテンツの事前計算がサポートされています。 このコンテンツは、さまざまなコンパイラーおよびタスクの入力レイヤーとしてコンテンツを指定することなく、すべてのワーカーノードに効率的に配布および使用できます。

この機能は、 Spark ブロードキャスト変数を使用して実装されます。

BroadcastCompiler このインターフェイスを使用すると、このプロセスを実装して、入力カタログを手動で照会する必要がなくなります。 抽象クラスを実装した compileBroadcast 後、このメソッドは開発者が定義した型の Spark ブロードキャストオブジェクトを返します。このオブジェクトは、コンパイラーにコピーできます。通常は、ブロードキャストオブジェクトをコンストラクターのパラメーターとして渡します。

提供されている API は、ブロードキャストがインクリメンタルコンパイルに干渉しないようにします。 提供されているメカニズムを使用してブロードキャストオブジェクトが作成され、そのオブジェクトの内容が前回の実行と比較して変更された場合、インクリメンタルコンパイルは安全ではないと見なされ、無効になります。

この API は com.here.platform.data.processing.broadcast パッケージに含まれています。

ブロードキャストコンパイラ

BroadcastCompiler 、処理中のライブラリでブロードキャスト変数を作成するための現在の最も抽象的な方法を提供します。 例 :

def configureCompiler(completeConfig: CompleteConfig,
                      context: DriverContext,
                      builder: DriverBuilder): builder.type = {

  // Create the broadcast data compiler and execute it
  val myBroadcastCompiler = new MyBroadcastCompiler(context)
  val broadcastVariable = myBroadcastCompiler.compileBroadcast(context.spark.defaultParallelism)

  // Create the compiler
  val myCompiler = new Utils.MyCompiler(context, broadcastVariable)

  // Add the compiler to the Driver as new DriverTask
  builder.addTask(
    builder.newTaskBuilder("myCompiler").withNonIncrementalCompiler(myCompiler).build())
}

ブロードキャスト機能を安全に使用できるように支援します

broadcast パッケージオブジェクトには、ブロードキャスト変数を作成するための低レベルのヘルパーが用意されています。 開発者は、これらの関数を使用してメタデータを照会し、その結果作成された RDD を複数回使用して、ブロードキャストするオブジェクトを計算および作成できます。

例 :

def configureCompiler(completeConfig: CompleteConfig,
                      context: DriverContext,
                      builder: DriverBuilder): builder.type = {

  // The InputOptPartitioner object, leaving the processing library to decide the partitioner
  val inputOptPartitioner = new InputOptPartitioner {
    def inPartitioner(parallelism: Int) = None
  }

  // Input metadata suitable to be reused multiple times
  val inputMeta =
    queryInputMeta(context, inLayers, inputOptPartitioner, context.spark.defaultParallelism)

  // Create the broadcast data to be shared and its related broadcast variable
  val broadcastData = createDataToBroadCast(inputMeta)
  val broadcastVariable = toBroadcast(context, broadcastData)

  // Create the compiler
  val myCompiler = new Utils.MyCompiler(context, broadcastVariable)

  // Add the compiler to the Driver as new DriverTask
  builder.addTask(
    builder.newTaskBuilder("myCompiler").withNonIncrementalCompiler(myCompiler).build())
}

」に一致する結果は 件です

    」に一致する結果はありません