import com.here.platform.data.client.spark.DataClientSparkContextUtils
import com.here.platform.data.client.spark.SparkSupport._
case class CustomData(partition: String, layer: String, data: Array[Byte])
def publishUsingMultipleWorkers(sc: SparkContext,
catalog: HRN,
layerIds: Seq[String],
partitions: Seq[CustomData]): Unit = {
val masterActorSystem = DataClientSparkContextUtils.context.actorSystem
val masterPublishApi = DataClient(masterActorSystem).publishApi(catalog)
val latestVersion = masterPublishApi.getBaseVersion().awaitResult()
val token: BatchToken =
masterPublishApi.startBatch2(latestVersion, Some(layerIds), dependencies = Nil).awaitResult()
val commitParts: RDD[Done] =
sc.parallelize(partitions)
.mapPartitions({ partitions =>
val workerActorSystem = DataClientSparkContextUtils.context.actorSystem
val workerPublishApi = DataClient(workerActorSystem).publishApi(catalog)
val workerWriteEngine = DataEngine(workerActorSystem).writeEngine(catalog)
val committedPartitions: Iterator[CommitPartition] =
partitions.map { partition =>
val newPartition =
NewPartition(
partition = partition.partition,
layer = partition.layer,
data = NewPartition.ByteArrayData(partition.data)
)
workerWriteEngine.put(newPartition).awaitResult()
}
workerPublishApi.publishToBatch(token, committedPartitions).awaitResult()
Seq(Done).iterator
})
commitParts.collect()
masterPublishApi.completeBatch(token).awaitResult()
}
import com.here.platform.data.client.spark.DataClientSparkContextUtils;
@SuppressWarnings("serial")
public static class CustomData implements Serializable {
String partition;
String layerId;
byte[] data;
CustomData(String partition, String layerId, byte[] data) {
this.partition = partition;
this.layerId = layerId;
this.data = data;
}
}
private static final int PARALLELISM = 3;
public static void publishUsingMultipleWorkers(
JavaSparkContext jsc, HRN catalog, List<String> layerIds, List<CustomData> partitions) {
ActorSystem masterActorSystem = DataClientSparkContextUtils.context().actorSystem();
PublishApi masterPublishApi = DataClient.get(masterActorSystem).publishApi(catalog);
OptionalLong latestVersion = masterPublishApi.getBaseVersion().toCompletableFuture().join();
BatchToken token =
masterPublishApi
.startBatch2(latestVersion, Optional.of(layerIds), Collections.emptyList())
.toCompletableFuture()
.join();
JavaRDD<Done> commitParts =
jsc.parallelize(partitions, PARALLELISM)
.mapPartitions(
cdPartitions -> {
ActorSystem workerActorSystem =
DataClientSparkContextUtils.context().actorSystem();
PublishApi workerPublishApi =
DataClient.get(workerActorSystem).publishApi(catalog);
WriteEngine workerWriteEngine =
DataEngine.get(workerActorSystem).writeEngine(catalog);
List<CommitPartition> committedPartitions = new ArrayList<>();
while (cdPartitions.hasNext()) {
CustomData cdPartition = cdPartitions.next();
com.here.platform.data.client.javadsl.NewPartition newPartition =
new com.here.platform.data.client.javadsl.NewPartition.Builder()
.withPartition(cdPartition.partition)
.withData(cdPartition.data)
.withLayer(cdPartition.layerId)
.build();
workerWriteEngine
.put(newPartition)
.thenAccept(committedPartitions::add)
.toCompletableFuture()
.join();
}
Done done =
workerPublishApi
.publishToBatch(token, committedPartitions.iterator())
.toCompletableFuture()
.join();
return Collections.singletonList(done).iterator();
});
commitParts.collect();
masterPublishApi.completeBatch(token).toCompletableFuture().join();
}