// create writeEngine for a catalogval writeEngine = DataEngine().writeEngine(catalogHrn)// list of dependencies for this publicationval dependencies = Seq.empty[VersionDependency]val partitions: Source[PendingPartition, NotUsed]=
Source(
List(
NewPartition(
partition = newPartitionId1,
layer = versionedLayerId,
data = NewPartition.ByteArrayData(blobData)),
DeletedPartition(
partition = deletedPartitionId,
layer = versionedLayerId
)))
writeEngine.publishBatch2(parallelism =10,
layerIds = Some(Seq(versionedLayerId)),
dependencies = dependencies,
partitions = partitions)
// create writeEngine for source catalogWriteEngine writeEngine =DataEngine.get(myActorSystem).writeEngine(catalogHrn);// parallelism defines how many parallel requests would be made to fetch the dataint parallelism =10;// list of dependencies for this publicationList<VersionDependency> dependencies =Collections.emptyList();NewPartition newPartition =newNewPartition.Builder().withPartition(partitionId).withData(blobData).withLayer(layer).build();DeletedPartition deletedPartition =newDeletedPartition.Builder().withPartition(deletedPartitionId).withLayer(layer).build();ArrayList<PendingPartition> partitionList =newArrayList<>();
partitionList.add(newPartition);
partitionList.add(deletedPartition);Source<PendingPartition,NotUsed> partitions =Source.from(partitionList);CompletableFuture<Done> futurePublish =
writeEngine
.publishBatch2(parallelism,Optional.of(Arrays.asList(layer)), dependencies, partitions).toCompletableFuture();
// create writeEngine and queryApi for a catalogval writeEngine = DataEngine().writeEngine(catalogHrn)val queryApi = DataClient().queryApi(catalogHrn)// subscribe to receive new publications from stream layer
queryApi.subscribe("stream-layer",
ConsumerSettings("test-consumer"),
partition => println("Received "+newString(partition.partition)))val partitions =
Source.single(
NewPartition(
partition = newPartitionId1,
layer = streamingLayerId,
data = NewPartition.ByteArrayData(blobData),
dataSize = dataSize,
checksum = checksum,
compressedDataSize = compressedDataSize,
timestamp = Some(timestamp)// optional, see explation below))
writeEngine.publish(partitions)
// create writeEngine and queryApi for a catalogQueryApi queryApi =DataClient.get(myActorSystem).queryApi(catalogHrn);WriteEngine writeEngine =DataEngine.get(myActorSystem).writeEngine(catalogHrn);// subscribe to receive new publications from stream layer
queryApi.subscribe("stream-layer",newConsumerSettings.Builder().withGroupName("test-consumer").build(),
partition ->processPartition(partition));NewPartition newPartition =newNewPartition.Builder().withPartition(partitionId).withData(blobData).withLayer(layer).withDataSize(dataSize).withCompressedDataSize(compressedDataSize).withChecksum(checksum).withTimestamp(OptionalLong.of(timestamp))// optional, see explation below.build();Source<PendingPartition,NotUsed> partitions =Source.single(newPartition);
writeEngine.publish(partitions);
新しいバージョンをボラタイル レイヤーに公開する必要がある場合 version dependencies は、を使用して、バッチ公開を使用してパーティションをアップロードします。
以下のスニペットは、の使用方法を示し version dependenciesています。
Scala
Java
// get base version to commit a new versionval publishApi = DataClient().publishApi(catalogHrn)
publishApi.getBaseVersion().flatMap { baseVersion =>// compute next version to be used in Md5BlobIdGeneratorval nextVersion =
baseVersion
.map(_ +1L).getOrElse(0L)// create writeEngine for a catalog with a deterministic BlobIdGeneratorval writeEngine =
DataEngine().writeEngine(catalogHrn,new StableBlobIdGenerator(nextVersion))// list of dependencies for this publicationval dependencies = Seq.empty[VersionDependency]// given a list partitions to commitval partitions: Source[PendingPartition, NotUsed]=
Source(
List(
NewPartition(
partition = newPartitionId1,
layer = volatileLayerId,
data = maybeEmptyData
),
NewPartition(
partition = newPartitionId2,
layer = volatileLayerId,
data = maybeEmptyData
)))// upload dataval commitPartitions: Source[CommitPartition, NotUsed]=
partitions.mapAsync(parallelism =10){ pendingPartition =>
writeEngine.put(pendingPartition)}// publish version to metadata
publishApi
.publishBatch2(baseVersion, Some(Seq(volatileLayerId)), dependencies, commitPartitions)}
// get base version to commit a new versionPublishApi publishApi =DataClient.get(myActorSystem).publishApi(catalogHrn);
publishApi
.getBaseVersion().thenCompose(
baseVersion ->{// compute next version to be used in Md5BlobIdGeneratorLong nextVersion = baseVersion.isPresent()? baseVersion.getAsLong()+1:0;// create writeEngine for a catalog with a deterministic BlobIdGeneratorBlobIdGenerator idGenerator =newStableBlobIdGenerator.Builder().withVersion(nextVersion).build();WriteEngine writeEngine =DataEngine.get(myActorSystem).writeEngine(catalogHrn, idGenerator);// list of dependencies for this publicationList<VersionDependency> dependencies =Collections.emptyList();NewPartition newPartition1 =newNewPartition.Builder().withPartition(partitionId1).withLayer(layer).withData(maybeEmptyData).build();NewPartition newPartition2 =newNewPartition.Builder().withPartition(partitionId2).withLayer(layer).withData(maybeEmptyData).build();ArrayList<PendingPartition> partitionList =newArrayList<>();
partitionList.add(newPartition1);
partitionList.add(newPartition2);Source<PendingPartition,NotUsed> partitions =Source.from(partitionList);int parallelism =10;// upload dataSource<CommitPartition,NotUsed> commitPartitions =
partitions.mapAsync(parallelism, writeEngine::put);// publish version to metadataCompletionStage<Done> done =
publishApi.publishBatch2(
baseVersion,Optional.of(Arrays.asList(layer)),
dependencies,
commitPartitions);return done;});
// get base version to commit a new versionval publishApi = DataClient().publishApi(catalogHrn)
publishApi.getBaseVersion().flatMap { baseVersion =>// compute next version to be used in Md5BlobIdGeneratorval nextVersion =
baseVersion
.map(_ +1L).getOrElse(0L)// create writeEngine for a catalog with a deterministic BlobIdGeneratorval writeEngine =
DataEngine().writeEngine(catalogHrn,new StableBlobIdGenerator(nextVersion))// list of dependencies for this publicationval dependencies = Seq.empty[VersionDependency]val queryApi = DataClient().queryApi(catalogHrn)val filter = VolatilePartitionsFilter.byIds(Set(deletePartitionId1, deletePartitionId2))val partitions: Seq[Partition]= Await
.result(queryApi.getVolatilePartitionsAsIterator(volatileLayerId, filter), Duration.Inf).toSeq
// prepare list of partitions to be deletedval deletePartitions: Source[PendingPartition, NotUsed]=
Source(
partitions.map {case referencePartition: ReferencePartition =>val dataHandle = referencePartition.getDataHandle
val partitionId = referencePartition.partition
DeletedPartition(
partition = partitionId,
layer = volatileLayerId,
dataHandle = Some(dataHandle))}.toList
)// delete dataval commitPartitions: Source[CommitPartition, NotUsed]=
deletePartitions.mapAsync(parallelism =10){ pendingPartition =>
writeEngine.put(pendingPartition)}// publish version to metadata
publishApi
.publishBatch2(baseVersion, Some(Seq(volatileLayerId)), dependencies, commitPartitions)}
// get the partitions from partitionIdsQueryApi queryApi =DataClient.get(myActorSystem).queryApi(catalogHrn);VolatilePartitionsFilter filter =newVolatilePartitionsFilter.Builder().withIds(newHashSet<String>(Arrays.asList(partitionId1, partitionId2))).build();finalList<Partition> partitions =newArrayList<Partition>();try{
queryApi
.getVolatilePartitionsAsIterator(layerId, filter,Collections.emptySet()).toCompletableFuture().get().forEachRemaining(partitions::add);}catch(Exception exp){
partitions.clear();}// get base version to commit a new versionPublishApi publishApi =DataClient.get(myActorSystem).publishApi(catalogHrn);
publishApi
.getBaseVersion().thenCompose(
baseVersion ->{// compute next version to be used in Md5BlobIdGeneratorLong nextVersion = baseVersion.isPresent()? baseVersion.getAsLong()+1:0;// create writeEngine for a catalog with a deterministic BlobIdGeneratorBlobIdGenerator idGenerator =newStableBlobIdGenerator.Builder().withVersion(nextVersion).build();WriteEngine writeEngine =DataEngine.get(myActorSystem).writeEngine(catalogHrn, idGenerator);ArrayList<PendingPartition> partitionList =newArrayList<>();for(Partition p : partitions){if(p instanceofReferencePartition){ReferencePartition referencePartition =(ReferencePartition) p;
partitionList.add(newDeletedPartition.Builder().withLayer(layerId).withPartition(referencePartition.getPartition()).withDataHandle(referencePartition.getDataHandle()).build());}}Source<PendingPartition,NotUsed> pendingPartitions =Source.from(partitionList);int parallelism =10;// upload dataSource<CommitPartition,NotUsed> commitPartitions =
pendingPartitions.mapAsync(parallelism, writeEngine::put);// publish version to metadataCompletionStage<Done> done =
publishApi.publishBatch2(
baseVersion,Optional.of(Arrays.asList(layerId)),Collections.emptyList(),
commitPartitions);return done;});
// How to define NewPartition for Index layerval newPartition = NewPartition(
partition ="",
layer = indexLayerId,
data = ByteArrayData(bytes),
fields = Some(
Map("someIntKey"-> IntIndexValue(42),"someStringKey"-> StringIndexValue("abc"),"someBooleanKey"-> BooleanIndexValue(true),"someTimeWindowKey"-> TimeWindowIndexValue(123456789L),"someHereTileKey"-> HereTileIndexValue(91956L))),
metadata = Some(
Map("someKey1"->"someValue1","someKey2"->"someValue2")),
checksum = Some(checksum),
crc = Some(crc),
dataSize = Some(dataSize))
// How to define NewPartition for Index layerNewPartition newPartition =newNewPartition.Builder().withPartition("").withLayer(indexLayerId).withData(bytes).addIntField("someIntKey",42).addStringField("someStringKey","abc").addBooleanField("someBooleanKey",true).addTimeWindowField("someTimeWindowKey",123456789L).addHereTileField("someHereTileKey",91956L).addMetadata("someKey1","someValue1").addMetadata("someKey2","someValue2").withChecksum(Optional.of(checksum)).withDataSize(OptionalLong.of(dataSize)).build();
// The example illustrated how to upload data and to index partition// with single method WriteEngine.uploadAndIndex
writeEngine.uploadAndIndex(Iterator(newPartition))
// The example illustrated how to upload data and to index partition// with single method WriteEngine.uploadAndIndexIterator<NewPartition> partitions =Arrays.asList(newPartition).iterator();CompletionStage<Done> publish = writeEngine.uploadAndIndex(partitions);
// How to upload data with WriteEngine.put and// index the partition with PublishApi.publishIndexval putAndIndex: Future[Done]=for{
commitPartition <- writeEngine.put(newPartition)
_ <- publishApi.publishIndex(indexLayerId, Iterator(commitPartition))}yield Done
// How to upload data with WriteEngine.put and// index the partition with PublishApi.publishIndexCompletionStage<Done> putAndIndex =
writeEngine
.put(newPartition).thenCompose(
commitPartition ->{Iterator<CommitPartition> commitPartitions =Arrays.asList(commitPartition).iterator();return publishApi.publishIndex(indexLayerId, commitPartitions);});
インデックス レイヤーを更新します
インデックス レイヤーのデータを変更する必要がある場合 PublishApi.updateIndex は、 API コールを使用できます。 このメソッドは、次の 3 つの引数を取ります
次のスニペットは PublishApi.updateIndex 、 API の使用方法を示しています。
Scala
Java
val updateIndex: Future[Done]={// partitions to add// see above how to define a new partition for an index layerval additions = Seq(newPartition)// partitions to remove// use CommitPartition.deletedIndexPartition to define a partition its data handle that// you plan to removeval removals = Seq(CommitPartition.deletedIndexPartition(dataHandle, indexLayerId))for{// first you have to upload corresponding blobs of the new partitions to the Blob Store
committedAdditions <- Future.sequence(additions.map(p => writeEngine.put(p)))
_ <- publishApi.updateIndex(indexLayerId,
committedAdditions.toIterator,
removals.toIterator)}yield Done
}
CompletionStage<Done> updateIndex =
writeEngine
// first you have to upload corresponding blobs of the new partitions// to the Blob Store.put(newPartition).thenCompose(
commitPartition ->{Iterator<CommitPartition> additions =Arrays.asList(commitPartition).iterator();// use DeleteIndexPartitionBuilder to define partitions that you plan to removeCommitPartition deletePartition =newCommitPartition.Builder().deleteIndexPartition().withLayer(indexLayerId).withDataHandle(dataHandle).build();Iterator<CommitPartition> removals =Arrays.asList(deletePartition).iterator();return publishApi.updateIndex(indexLayerId, additions, removals);});
インデックス レイヤーから削除します
インデックス レイヤーでメタデータおよびデータを削除する必要がある場合 PublishApi.deleteIndex は、 API コールを使用できます。 PublishApi.deleteIndex API の呼び出しが成功すると、削除操作がスケジュールされます。
以下のスニペットで PublishApi.deleteIndex は、 API および QueryApi.queryIndexDeleteStatus API の使用方法について説明します。
Scala
Java
importscala.concurrent.Await
importscala.concurrent.duration._
val queryString ="someIntKey>42;someStringKey!=abc"val deleteId =
Await.result(publishApi.deleteIndex(indexLayerId, queryString),45.seconds)// Note that the delete operation for deleting records in index layer is an async operation// This example will return the current status of the delete request// If user wants to wait for the delete status to be in Succeeded state, user may have to// perform multiple delete status calls// It is recommended to use exponential backoff policy to reduce the rate of delete status// calls to the serverval deleteStatusResponse =
Await.result(queryApi.queryIndexDeleteStatus(indexLayerId, deleteId),5.seconds)
println("Current state of index delete request is "+ deleteStatusResponse.state)
String queryString ="someIntKey>42;someStringKey!=abc";String deleteId =
publishApi.deleteIndex(indexLayerId, queryString).toCompletableFuture().join();// Note that the delete operation for deleting records in index layer is an async operation// This example will return the current status of the delete request// If user wants to wait for the delete status to be in Succeeded state, user may have to// perform multiple delete status calls// It is recommended to use exponential backoff policy to reduce the rate of delete status// calls to the serverDeleteIndexesStatusResponse deleteStatusResponse =
queryApi.queryIndexDeleteStatus(indexLayerId, deleteId).toCompletableFuture().join();System.out.println("Current state of index delete request is "+ deleteStatusResponse.state());