メタデータを公開します

CommitPartition 新しいパーティションまたは削除されたパーティションを公開するために使用します。 同じパブリケーションで、作成と削除の両方を混在させることができます。

HERE platform では、次の種類のレイヤーがサポートされています。

  • バージョン付き
  • 揮発性
  • インデックス
  • ストリーム
  • オブジェクトストア

ストリームレイヤー / 揮発性レイヤー

プラットフォームストリームおよび揮発性レイヤーには、そのメタデータよりも小さいデータが含まれている場合があります。そのため、メタデータをデータとは別に公開する理由はありません。 ストリーム / 揮発性レイヤーの公開について詳しく は、データの公開を参照してください。

バージョニングレイヤー

カタログは、最初に作成したときに空になります。 すべてのバージョンレイヤーのバージョンが∅になります。

最初のパブリケーションをバージョン付レイヤーに公開するには、次のものを追加します。

Scala
Java
// create publishApi for a catalog
val publishApi = DataClient().publishApi(catalogHrn, settings)

// prepare a list of partitions to publish
val partitions =
  Source(
    List(
      CommitPartition.newCommitPartition(
        partition = partitionId1,
        layer = versionedLayerId,
        dataHandle = "example-data-handle"
      ),
      CommitPartition.newCommitPartition(
        partition = partitionId2,
        layer = versionedLayerId,
        dataHandle = "example-data-handle"
      )
    )
  )

// publish initial version
val firstPublish: Future[Done] =
  publishApi.publishBatch2(
    baseVersion = None, // publication to an empty catalog can be done with `baseVersion = None`.
    Some(Seq(versionedLayerId)),
    dependencies = Seq.empty,
    partitions = partitions
  )
// create publishApi for a catalog
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);

// prepare a list of partitions to publish
CommitPartition newPartition1 =
    new CommitPartition.Builder()
        .newPartition()
        .withPartition(partitionId1)
        .withDataHandle("<example-data-handle>")
        .withLayer(layer)
        .build();

CommitPartition newPartition2 =
    new CommitPartition.Builder()
        .newPartition()
        .withPartition(partitionId2)
        .withDataHandle("<example-data-handle>")
        .withLayer(layer)
        .build();

ArrayList<CommitPartition> partitionList = new ArrayList<>();
partitionList.add(newPartition1);
partitionList.add(newPartition2);

Source<CommitPartition, NotUsed> partitions = Source.from(partitionList);

// publish initial version
CompletableFuture<Done> futurePublish =
    publishApi
        .publishBatch2(
            emptyVersion,
            Optional.of(Arrays.asList(layer)),
            Collections.emptyList(),
            partitions)
        .toCompletableFuture();

バージョン付レイヤーに更新を公開する場合は、発行時のカタログの最新バージョンである基本バージョンを提供する必要があります。 基本バージョンは、パブリケーションの処理中にカタログバージョンが変更されていないことを確認するために使用されます。変更すると、データの不整合が発生する可能性があります。

バージョン付レイヤーに更新を公開するには、次のものを追加します。

Scala
Java
val nextPublishPartitions =
  Source(
    List(
      CommitPartition.newCommitPartition(
        partition = partitionId1,
        layer = versionedLayerId,
        dataHandle = "example-data-handle"
      ),
      CommitPartition.deletedPartition(
        partition = partitionId2,
        layer = versionedLayerId
      )
    )
  )

// for subsequent publications catalog base version needs to be provided
val nextPublish: Future[Done] =
  for {
    baseVersion <- publishApi.getBaseVersion()
    _ <- publishApi.publishBatch2(
      baseVersion = baseVersion,
      Some(Seq(versionedLayerId)),
      dependencies = Seq.empty,
      partitions = nextPublishPartitions
    )
  } yield Done
// prepare a list of partitions to publish
CommitPartition nextPublishPartition1 =
    new CommitPartition.Builder()
        .newPartition()
        .withPartition(partitionId1)
        .withDataHandle("<example-data-handle>")
        .withLayer(layer)
        .build();

CommitPartition deletePartition2 =
    new CommitPartition.Builder()
        .deletePartition()
        .withPartition(partitionId2)
        .withLayer(layer)
        .build();

ArrayList<CommitPartition> nextPublishPartitionList = new ArrayList<>();
partitionList.add(nextPublishPartition1);
partitionList.add(deletePartition2);

Source<CommitPartition, NotUsed> nextPublishPartitions = Source.from(nextPublishPartitionList);

// for subsequent publications catalog base version needs to be provided
CompletableFuture<Done> futureNextPublish =
    publishApi
        .getBaseVersion()
        .thenCompose(
            baseVersion ->
                publishApi.publishBatch2(
                    baseVersion,
                    Optional.of(Arrays.asList(layer)),
                    Collections.emptyList(),
                    nextPublishPartitions))
        .toCompletableFuture();

基本バージョンとバージョンの依存関係

バッチを開始する場合 base versionは、を指定する必要があります。このには、バッチによって処理されるパブリケーションが相対的になります。 は base version 、カタログの最新バージョンである必要があります。 は base version 、パブリケーションがバッチで想定されたバージョンを基準にしていること、および同じレイヤーへの同時パブリケーションが適用されていないことを確認するために使用されます。

整合性は、カタログ内だけでなく、データ処理パイプライン内のすべてのカタログ間で重要になります。 カタログ間の整合性を確保 VersionDependency するために、各バッチには、パブリケーションの作成に使用されたカタログおよびバージョンを示すオブジェクトのリストを含めることができます。

バージョンの依存関係には、そのバージョン付レイヤーを使用して特定のカタログの新しいバージョンを作成した場合にのみ、カタログを含める必要があります。 たとえば、単一のストリーム レイヤーから読み取り、バージョン付レイヤーにデータを公開する場合、依存関係は空になります。

バージョンの依存関係を追加するには、次のものを追加します。

Scala
Java
val dependencies =
  Seq(
    VersionDependency(
      hrn = upstreamCatalogHrn,
      version = 5L,
      direct = true
    )
  )

for {
  baseVersion <- publishApi.getBaseVersion()
  _ <- publishApi.publishBatch2(
    baseVersion = baseVersion,
    Some(Seq(versionedLayerId)),
    dependencies = dependencies,
    partitions = partitions
  )
} yield Done
ArrayList<VersionDependency> dependencies = new ArrayList<>();
VersionDependency dependency = new VersionDependency(upstreamCatalogHrn, 5L, true);
dependencies.add(dependency);

CompletableFuture<Done> futurePublishWithDeps =
    publishApi
        .getBaseVersion()
        .thenCompose(
            baseVersion ->
                publishApi.publishBatch2(
                    baseVersion, Optional.of(Arrays.asList(layer)), dependencies, partitions))
        .toCompletableFuture();

メタデータ公開プロセスの簡素化

batch token このタイプのレイヤーへのすべてのパブリケーションには、バッチジョブが 1 つのパブリケーションリクエストで実行された場合でもが必要です。 バッチを開始し、batch tokenを受信して、以降のすべてのパブリケーションbatch tokenに提供する必要があります。 すべてのパブリケーションが送信されたら、バッチを完了する必要があります。完了すると、サーバーが送信済みのパブリケーションの処理を開始します。 すべて completed のパブリケーションが処理され、新しいバージョンがカタログにパブリッシュされると、バッチ処理されたパブリケーションが考慮されます。

データ クライアント ライブラリでは、を 1 batched publication つの手順で実行できます。 ヘルパーが自動的にバッチを開始し、そのバッチを使用してデータをパブリッシュして、バッチをファイナライズします。 データが処理され、カタログで利用できるようになると、呼び出しが完了します。

バッチのパブリッシュを追加するには、次のものを追加します。

Scala
Java
// create publishApi for a catalog
val publishApi = DataClient().publishApi(catalogHrn)

// list of dependencies for this publication
val dependencies = Seq.empty[VersionDependency]

val partitions: Source[CommitPartition, NotUsed] =
  Source(
    List(
      CommitPartition.newCommitPartition(
        partition = partitionId1,
        layer = versionedLayerId,
        dataHandle = "example-data-handle"
      ),
      CommitPartition.newCommitPartition(
        partition = partitionId2,
        layer = versionedLayerId,
        dataHandle = "example-data-handle"
      )
    )
  )

publishApi.publishBatch2(
  baseVersion = None,
  Some(Seq(versionedLayerId)),
  dependencies = dependencies,
  partitions = partitions
)
// create publishApi for source catalog
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);

// list of dependencies for this publication
List<VersionDependency> dependencies = Collections.emptyList();

CommitPartition newPartition1 =
    new CommitPartition.Builder()
        .newPartition()
        .withPartition(partitionId1)
        .withDataHandle("<example-data-handle>")
        .withLayer(layer)
        .build();

CommitPartition newPartition2 =
    new CommitPartition.Builder()
        .newPartition()
        .withPartition(partitionId2)
        .withDataHandle("<example-data-handle>")
        .withLayer(layer)
        .build();

ArrayList<CommitPartition> partitionList = new ArrayList<>();
partitionList.add(newPartition1);
partitionList.add(newPartition2);

Source<CommitPartition, NotUsed> partitions = Source.from(partitionList);

CompletableFuture<Done> futurePublish =
    publishApi
        .publishBatch2(baseVersion, Optional.of(Arrays.asList(layer)), dependencies, partitions)
        .toCompletableFuture();

出版物の配布

HERE platform を使用すると、多数のパーティションを分散して処理および公開できます。

バージョン管理されたレイヤーの場合、これは 3 つのステップから成るプロセスです。

  1. 新しいバッチパブリケーションを開始し、を受信することで、パブリケーションプロセスを開始 batch tokenします。通常、この操作はクラスタ内のマスターノードまたはドライバノードで実行されます。
  2. 複数のワーカーがパーティションをアップロードして、それらを同じに添付します batch token
  3. すべてのデータがサーバーに送信されたら、バッチアップロードを完了する必要があります。通常、この操作はクラスタ内のマスターノードまたはドライバーノードで行われます。

完全なバッチ要求を受け取ると、サーバーはパブリケーションの処理を開始して、次のカタログバージョンを作成します。

複数のリクエストをバージョン付レイヤーに公開するには、次のものを追加します。

Scala
Java
// create publishApi and writeEngine for source catalog
val publishApi = DataClient().publishApi(catalogHrn, settings)
val writeEngine = DataEngine().writeEngine(catalogHrn)

// start batch publication
publishApi
  .startBatch2(None, Some(Seq(versionedLayerId)), Seq.empty)
  .flatMap { batchToken =>
    //start worker 1 with upload data and publishing metadata
    val worker1 =
      publishApi.publishToBatch(batchToken, partitions1.mapAsync(parallelism = 2) {
        partition =>
          writeEngine.put(partition)
      })

    //start worker 2 with upload data and publishing metadata
    val worker2 =
      publishApi.publishToBatch(batchToken, partitions2.mapAsync(parallelism = 2) {
        partition =>
          writeEngine.put(partition)
      })

    // wait until workers are done uploading data/metadata
    for {
      _ <- worker1
      _ <- worker2
    } yield batchToken

  }
  .flatMap { batchToken =>
    //signal to server complete batch publication
    publishApi.completeBatch(batchToken)
  }
// create publishApi and writeEngine for source catalog
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);
WriteEngine writeEngine = DataEngine.get(myActorSystem).writeEngine(catalogHrn);

// start a batch, publish partitions, complete batch
CompletableFuture<Done> futurePublish =
    publishApi
        .startBatch2(baseVersion, Optional.empty(), Collections.emptyList())
        .thenCompose(
            batchToken -> {
              // start worker 1 with upload data and publishing metadata
              Source<PendingPartition, NotUsed> partitionsOnWorker1 =
                  arbitraryPendingPartitions1;
              CompletableFuture<Done> worker1 =
                  publishApi
                      .publishToBatch(
                          batchToken,
                          partitionsOnWorker1.mapAsync(
                              2, partition -> writeEngine.put(partition)))
                      .toCompletableFuture();

              // start worker 2 with upload data and publishing metadata
              Source<PendingPartition, NotUsed> partitionsOnWorker2 =
                  arbitraryPendingPartitions2;
              CompletableFuture<Done> worker2 =
                  publishApi
                      .publishToBatch(
                          batchToken,
                          partitionsOnWorker2.mapAsync(
                              2, partition -> writeEngine.put(partition)))
                      .toCompletableFuture();

              // wait until workers are done upload
              return worker1.thenCombine(worker2, (done, done2) -> batchToken);
            })
        .thenCompose(
            batchToken -> {
              return publishApi.completeBatch(batchToken);
            })
        .toCompletableFuture();

ストリームおよび揮発性レイヤーの場合、分散されたパブリケーションは通常のパブリケーションプロセスとは異なりません。

インデックス レイヤー

パーティション データがすでに Blobstore にアップロードされていて、このアップロードされたデータの dataHandle がある場合は、指定されたキーを使用してインデックス レイヤーのパーティションをインデックス化できます。

インデックス レイヤーでパーティションのインデックスを作成するには、次のものを追加します。

Scala
Java
// How to index partition with PublishApi.publishIndex
publishApi.publishIndex(indexLayerId, commitPartitions)
// How to index partition with PublishApi.publishIndex
publishApi.publishIndex(indexLayerId, commitPartitions);

」に一致する結果は 件です

    」に一致する結果はありません