Broadcast Input Layers to Executor Nodes

The library supports pre-calculation of content coming from specific layers of the input catalogs. This content can be distributed and made available to all worker nodes efficiently, without the need to specify the content as input layers of the various compilers and tasks.

This feature is implemented leveraging Spark broadcast variables.

The BroadcastCompiler interface helps implementing this process and avoid the need to manually query input catalogs. After implementing the abstract class, the compileBroadcast method returns a Spark broadcast object of a developers- defined type that can be copied to compilers, usually passing the broadcast object as parameters of their constructors.

The provided API also ensures broadcasting does not interfere with incremental compilation. If a broadcast object is created via the mechanism provided and the content of that object changes compared to the previous run, incremental compilation is disabled as considered not safe.

This API is located in the com.here.platform.data.processing.broadcast package.

The Broadcast Compiler

The BroadcastCompiler provides the current most abstract way to create a broadcast variable in the processing library. For example:

def configureCompiler(completeConfig: CompleteConfig,
                      context: DriverContext,
                      builder: DriverBuilder): builder.type = {

  // Create the broadcast data compiler and execute it
  val myBroadcastCompiler = new MyBroadcastCompiler(context)
  val broadcastVariable = myBroadcastCompiler.compileBroadcast(context.spark.defaultParallelism)

  // Create the compiler
  val myCompiler = new Utils.MyCompiler(context, broadcastVariable)

  // Add the compiler to the Driver as new DriverTask
  builder.addTask(
    builder.newTaskBuilder("myCompiler").withNonIncrementalCompiler(myCompiler).build())
}

Helpers to Use The Broadcast Feature Safely

The broadcast package object offers low level helpers to create broadcast variables. Developers can use these functions to query the metadata and then use the resulting RDD multiple times to calculate and create the objects to broadcast.

For example:

def configureCompiler(completeConfig: CompleteConfig,
                      context: DriverContext,
                      builder: DriverBuilder): builder.type = {

  // The InputOptPartitioner object, leaving the processing library to decide the partitioner
  val inputOptPartitioner = new InputOptPartitioner {
    def inPartitioner(parallelism: Int) = None
  }

  // Input metadata suitable to be reused multiple times
  val inputMeta =
    queryInputMeta(context, inLayers, inputOptPartitioner, context.spark.defaultParallelism)

  // Create the broadcast data to be shared and its related broadcast variable
  val broadcastData = createDataToBroadCast(inputMeta)
  val broadcastVariable = toBroadcast(context, broadcastData)

  // Create the compiler
  val myCompiler = new Utils.MyCompiler(context, broadcastVariable)

  // Add the compiler to the Driver as new DriverTask
  builder.addTask(
    builder.newTaskBuilder("myCompiler").withNonIncrementalCompiler(myCompiler).build())
}

results matching ""

    No results matching ""