CommitPartition
新しいパーティションまたは削除されたパーティションを公開するために使用します。 同じパブリケーションで、作成と削除の両方を混在させることができます。
HERE platform では、次の種類のレイヤーがサポートされています。
- バージョン付き
- 揮発性
- インデックス
- ストリーム
- オブジェクトストア
ストリームレイヤー / 揮発性レイヤー
プラットフォームストリームおよび揮発性レイヤーには、そのメタデータよりも小さいデータが含まれている場合があります。そのため、メタデータをデータとは別に公開する理由はありません。 ストリーム / 揮発性レイヤーの公開について詳しく は、データの公開を参照してください。
バージョニングレイヤー
カタログは、最初に作成したときに空になります。 すべてのバージョンレイヤーのバージョンが∅になります。
最初のパブリケーションをバージョン付レイヤーに公開するには、次のものを追加します。
val publishApi = DataClient().publishApi(catalogHrn, settings)
val partitions =
Source(
List(
CommitPartition.newCommitPartition(
partition = partitionId1,
layer = versionedLayerId,
dataHandle = "example-data-handle"
),
CommitPartition.newCommitPartition(
partition = partitionId2,
layer = versionedLayerId,
dataHandle = "example-data-handle"
)
)
)
val firstPublish: Future[Done] =
publishApi.publishBatch2(
baseVersion = None,
Some(Seq(versionedLayerId)),
dependencies = Seq.empty,
partitions = partitions
)
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);
CommitPartition newPartition1 =
new CommitPartition.Builder()
.newPartition()
.withPartition(partitionId1)
.withDataHandle("<example-data-handle>")
.withLayer(layer)
.build();
CommitPartition newPartition2 =
new CommitPartition.Builder()
.newPartition()
.withPartition(partitionId2)
.withDataHandle("<example-data-handle>")
.withLayer(layer)
.build();
ArrayList<CommitPartition> partitionList = new ArrayList<>();
partitionList.add(newPartition1);
partitionList.add(newPartition2);
Source<CommitPartition, NotUsed> partitions = Source.from(partitionList);
CompletableFuture<Done> futurePublish =
publishApi
.publishBatch2(
emptyVersion,
Optional.of(Arrays.asList(layer)),
Collections.emptyList(),
partitions)
.toCompletableFuture();
バージョン付レイヤーに更新を公開する場合は、発行時のカタログの最新バージョンである基本バージョンを提供する必要があります。 基本バージョンは、パブリケーションの処理中にカタログバージョンが変更されていないことを確認するために使用されます。変更すると、データの不整合が発生する可能性があります。
バージョン付レイヤーに更新を公開するには、次のものを追加します。
val nextPublishPartitions =
Source(
List(
CommitPartition.newCommitPartition(
partition = partitionId1,
layer = versionedLayerId,
dataHandle = "example-data-handle"
),
CommitPartition.deletedPartition(
partition = partitionId2,
layer = versionedLayerId
)
)
)
val nextPublish: Future[Done] =
for {
baseVersion <- publishApi.getBaseVersion()
_ <- publishApi.publishBatch2(
baseVersion = baseVersion,
Some(Seq(versionedLayerId)),
dependencies = Seq.empty,
partitions = nextPublishPartitions
)
} yield Done
CommitPartition nextPublishPartition1 =
new CommitPartition.Builder()
.newPartition()
.withPartition(partitionId1)
.withDataHandle("<example-data-handle>")
.withLayer(layer)
.build();
CommitPartition deletePartition2 =
new CommitPartition.Builder()
.deletePartition()
.withPartition(partitionId2)
.withLayer(layer)
.build();
ArrayList<CommitPartition> nextPublishPartitionList = new ArrayList<>();
partitionList.add(nextPublishPartition1);
partitionList.add(deletePartition2);
Source<CommitPartition, NotUsed> nextPublishPartitions = Source.from(nextPublishPartitionList);
CompletableFuture<Done> futureNextPublish =
publishApi
.getBaseVersion()
.thenCompose(
baseVersion ->
publishApi.publishBatch2(
baseVersion,
Optional.of(Arrays.asList(layer)),
Collections.emptyList(),
nextPublishPartitions))
.toCompletableFuture();
基本バージョンとバージョンの依存関係
バッチを開始する場合 base version
は、を指定する必要があります。このには、バッチによって処理されるパブリケーションが相対的になります。 は base version
、カタログの最新バージョンである必要があります。 は base version
、パブリケーションがバッチで想定されたバージョンを基準にしていること、および同じレイヤーへの同時パブリケーションが適用されていないことを確認するために使用されます。
整合性は、カタログ内だけでなく、データ処理パイプライン内のすべてのカタログ間で重要になります。 カタログ間の整合性を確保 VersionDependency
するために、各バッチには、パブリケーションの作成に使用されたカタログおよびバージョンを示すオブジェクトのリストを含めることができます。
注
バージョンの依存関係には、そのバージョン付レイヤーを使用して特定のカタログの新しいバージョンを作成した場合にのみ、カタログを含める必要があります。 たとえば、単一のストリーム レイヤーから読み取り、バージョン付レイヤーにデータを公開する場合、依存関係は空になります。
バージョンの依存関係を追加するには、次のものを追加します。
val dependencies =
Seq(
VersionDependency(
hrn = upstreamCatalogHrn,
version = 5L,
direct = true
)
)
for {
baseVersion <- publishApi.getBaseVersion()
_ <- publishApi.publishBatch2(
baseVersion = baseVersion,
Some(Seq(versionedLayerId)),
dependencies = dependencies,
partitions = partitions
)
} yield Done
ArrayList<VersionDependency> dependencies = new ArrayList<>();
VersionDependency dependency = new VersionDependency(upstreamCatalogHrn, 5L, true);
dependencies.add(dependency);
CompletableFuture<Done> futurePublishWithDeps =
publishApi
.getBaseVersion()
.thenCompose(
baseVersion ->
publishApi.publishBatch2(
baseVersion, Optional.of(Arrays.asList(layer)), dependencies, partitions))
.toCompletableFuture();
batch token
このタイプのレイヤーへのすべてのパブリケーションには、バッチジョブが 1 つのパブリケーションリクエストで実行された場合でもが必要です。 バッチを開始し、batch token
を受信して、以降のすべてのパブリケーションbatch token
に提供する必要があります。 すべてのパブリケーションが送信されたら、バッチを完了する必要があります。完了すると、サーバーが送信済みのパブリケーションの処理を開始します。 すべて completed
のパブリケーションが処理され、新しいバージョンがカタログにパブリッシュされると、バッチ処理されたパブリケーションが考慮されます。
データ クライアント ライブラリでは、を 1 batched publication
つの手順で実行できます。 ヘルパーが自動的にバッチを開始し、そのバッチを使用してデータをパブリッシュして、バッチをファイナライズします。 データが処理され、カタログで利用できるようになると、呼び出しが完了します。
バッチのパブリッシュを追加するには、次のものを追加します。
val publishApi = DataClient().publishApi(catalogHrn)
val dependencies = Seq.empty[VersionDependency]
val partitions: Source[CommitPartition, NotUsed] =
Source(
List(
CommitPartition.newCommitPartition(
partition = partitionId1,
layer = versionedLayerId,
dataHandle = "example-data-handle"
),
CommitPartition.newCommitPartition(
partition = partitionId2,
layer = versionedLayerId,
dataHandle = "example-data-handle"
)
)
)
publishApi.publishBatch2(
baseVersion = None,
Some(Seq(versionedLayerId)),
dependencies = dependencies,
partitions = partitions
)
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);
List<VersionDependency> dependencies = Collections.emptyList();
CommitPartition newPartition1 =
new CommitPartition.Builder()
.newPartition()
.withPartition(partitionId1)
.withDataHandle("<example-data-handle>")
.withLayer(layer)
.build();
CommitPartition newPartition2 =
new CommitPartition.Builder()
.newPartition()
.withPartition(partitionId2)
.withDataHandle("<example-data-handle>")
.withLayer(layer)
.build();
ArrayList<CommitPartition> partitionList = new ArrayList<>();
partitionList.add(newPartition1);
partitionList.add(newPartition2);
Source<CommitPartition, NotUsed> partitions = Source.from(partitionList);
CompletableFuture<Done> futurePublish =
publishApi
.publishBatch2(baseVersion, Optional.of(Arrays.asList(layer)), dependencies, partitions)
.toCompletableFuture();
出版物の配布
HERE platform を使用すると、多数のパーティションを分散して処理および公開できます。
バージョン管理されたレイヤーの場合、これは 3 つのステップから成るプロセスです。
- 新しいバッチパブリケーションを開始し、を受信することで、パブリケーションプロセスを開始
batch token
します。通常、この操作はクラスタ内のマスターノードまたはドライバノードで実行されます。 - 複数のワーカーがパーティションをアップロードして、それらを同じに添付します
batch token
- すべてのデータがサーバーに送信されたら、バッチアップロードを完了する必要があります。通常、この操作はクラスタ内のマスターノードまたはドライバーノードで行われます。
完全なバッチ要求を受け取ると、サーバーはパブリケーションの処理を開始して、次のカタログバージョンを作成します。
複数のリクエストをバージョン付レイヤーに公開するには、次のものを追加します。
val publishApi = DataClient().publishApi(catalogHrn, settings)
val writeEngine = DataEngine().writeEngine(catalogHrn)
publishApi
.startBatch2(None, Some(Seq(versionedLayerId)), Seq.empty)
.flatMap { batchToken =>
val worker1 =
publishApi.publishToBatch(batchToken, partitions1.mapAsync(parallelism = 2) {
partition =>
writeEngine.put(partition)
})
val worker2 =
publishApi.publishToBatch(batchToken, partitions2.mapAsync(parallelism = 2) {
partition =>
writeEngine.put(partition)
})
for {
_ <- worker1
_ <- worker2
} yield batchToken
}
.flatMap { batchToken =>
publishApi.completeBatch(batchToken)
}
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);
WriteEngine writeEngine = DataEngine.get(myActorSystem).writeEngine(catalogHrn);
CompletableFuture<Done> futurePublish =
publishApi
.startBatch2(baseVersion, Optional.empty(), Collections.emptyList())
.thenCompose(
batchToken -> {
Source<PendingPartition, NotUsed> partitionsOnWorker1 =
arbitraryPendingPartitions1;
CompletableFuture<Done> worker1 =
publishApi
.publishToBatch(
batchToken,
partitionsOnWorker1.mapAsync(
2, partition -> writeEngine.put(partition)))
.toCompletableFuture();
Source<PendingPartition, NotUsed> partitionsOnWorker2 =
arbitraryPendingPartitions2;
CompletableFuture<Done> worker2 =
publishApi
.publishToBatch(
batchToken,
partitionsOnWorker2.mapAsync(
2, partition -> writeEngine.put(partition)))
.toCompletableFuture();
return worker1.thenCombine(worker2, (done, done2) -> batchToken);
})
.thenCompose(
batchToken -> {
return publishApi.completeBatch(batchToken);
})
.toCompletableFuture();
ストリームおよび揮発性レイヤーの場合、分散されたパブリケーションは通常のパブリケーションプロセスとは異なりません。
インデックス レイヤー
パーティション データがすでに Blobstore にアップロードされていて、このアップロードされたデータの dataHandle がある場合は、指定されたキーを使用してインデックス レイヤーのパーティションをインデックス化できます。
インデックス レイヤーでパーティションのインデックスを作成するには、次のものを追加します。
publishApi.publishIndex(indexLayerId, commitPartitions)
publishApi.publishIndex(indexLayerId, commitPartitions);