データを公開します

次の図は、データパブリケーションモデルを示しています。 同じパブリケーション内のすべてのレイヤータイプを混在させることができます。

DataService ビュー
図 1. DataService ビュー

HERE platform では、次の種類のレイヤーがサポートされています。 それぞれの章を参照してください。

  • バージョン付き
  • 揮発性
  • インデックス
  • ストリーム
  • オブジェクトストア

バージョン付レイヤーに公開します

簡略化された公開プロセス

バージョン付レイヤーの簡易化されたメタデータ公開と同様に、データとメタデータの両方を 1 つの手順で公開できます。 以下のスニペットは、バッチパブリケーションを自動的に開始し、そのバッチパブリケーションを使用してデータとメタデータをパブリッシュして、バッチをファイナライズします。 データが処理され、カタログで利用できるようになると、コールが終了します。

Scala
Java
// create writeEngine for a catalog
val writeEngine = DataEngine().writeEngine(catalogHrn)

// list of dependencies for this publication
val dependencies = Seq.empty[VersionDependency]

val partitions: Source[PendingPartition, NotUsed] =
  Source(
    List(
      NewPartition(
        partition = newPartitionId1,
        layer = versionedLayerId,
        data = NewPartition.ByteArrayData(blobData)
      ),
      DeletedPartition(
        partition = deletedPartitionId,
        layer = versionedLayerId
      )
    )
  )

writeEngine.publishBatch2(parallelism = 10,
                          layerIds = Some(Seq(versionedLayerId)),
                          dependencies = dependencies,
                          partitions = partitions)
// create writeEngine for source catalog
WriteEngine writeEngine = DataEngine.get(myActorSystem).writeEngine(catalogHrn);

// parallelism defines how many parallel requests would be made to fetch the data
int parallelism = 10;

// list of dependencies for this publication
List<VersionDependency> dependencies = Collections.emptyList();

NewPartition newPartition =
    new NewPartition.Builder()
        .withPartition(partitionId)
        .withData(blobData)
        .withLayer(layer)
        .build();
DeletedPartition deletedPartition =
    new DeletedPartition.Builder().withPartition(deletedPartitionId).withLayer(layer).build();

ArrayList<PendingPartition> partitionList = new ArrayList<>();
partitionList.add(newPartition);
partitionList.add(deletedPartition);

Source<PendingPartition, NotUsed> partitions = Source.from(partitionList);

CompletableFuture<Done> futurePublish =
    writeEngine
        .publishBatch2(parallelism, Optional.of(Arrays.asList(layer)), dependencies, partitions)
        .toCompletableFuture();

出版物の配布

HERE platform を使用すると、多数のパーティションを分散して処理および公開できます。

バージョン管理されたレイヤーの場合、これは 3 つのステップから成るプロセスです。

  • パブリケーションプロセスを開始して、新しいバッチパブリケーションを開始 batch tokenします。このとき、を受け取ると、通常、この操作はクラスタ内のマスタまたはドライバノードで行われます。
  • データやメタデータをアップロードして同じに添付する作業者もいます batch token
  • すべてのデータがサーバーに送信されたら、バッチパブリケーションのアップロードを完了する必要があります。通常、この操作は、クラスタ内のマスターまたはドライバーノードで実行されます。

完全なバッチ要求を受信すると、サーバーはパブリケーションの処理を開始して、次のカタログバージョンを作成します。

次のスニペットは、複数のリクエストをバージョン付レイヤーに公開する方法を示しています。

Scala
Java
// create publishApi and writeEngine for source catalog
val publishApi = DataClient().publishApi(catalogHrn, settings)
val writeEngine = DataEngine().writeEngine(catalogHrn)

// start batch publication
publishApi
  .startBatch2(None, Some(Seq(versionedLayerId)), Seq.empty)
  .flatMap { batchToken =>
    //start worker 1 with upload data and publishing metadata
    val worker1 =
      publishApi.publishToBatch(batchToken, partitions1.mapAsync(parallelism = 2) {
        partition =>
          writeEngine.put(partition)
      })

    //start worker 2 with upload data and publishing metadata
    val worker2 =
      publishApi.publishToBatch(batchToken, partitions2.mapAsync(parallelism = 2) {
        partition =>
          writeEngine.put(partition)
      })

    // wait until workers are done uploading data/metadata
    for {
      _ <- worker1
      _ <- worker2
    } yield batchToken

  }
  .flatMap { batchToken =>
    //signal to server complete batch publication
    publishApi.completeBatch(batchToken)
  }
// create publishApi and writeEngine for source catalog
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);
WriteEngine writeEngine = DataEngine.get(myActorSystem).writeEngine(catalogHrn);

// start a batch, publish partitions, complete batch
CompletableFuture<Done> futurePublish =
    publishApi
        .startBatch2(baseVersion, Optional.empty(), Collections.emptyList())
        .thenCompose(
            batchToken -> {
              // start worker 1 with upload data and publishing metadata
              Source<PendingPartition, NotUsed> partitionsOnWorker1 =
                  arbitraryPendingPartitions1;
              CompletableFuture<Done> worker1 =
                  publishApi
                      .publishToBatch(
                          batchToken,
                          partitionsOnWorker1.mapAsync(
                              2, partition -> writeEngine.put(partition)))
                      .toCompletableFuture();

              // start worker 2 with upload data and publishing metadata
              Source<PendingPartition, NotUsed> partitionsOnWorker2 =
                  arbitraryPendingPartitions2;
              CompletableFuture<Done> worker2 =
                  publishApi
                      .publishToBatch(
                          batchToken,
                          partitionsOnWorker2.mapAsync(
                              2, partition -> writeEngine.put(partition)))
                      .toCompletableFuture();

              // wait until workers are done upload
              return worker1.thenCombine(worker2, (done, done2) -> batchToken);
            })
        .thenCompose(
            batchToken -> {
              return publishApi.completeBatch(batchToken);
            })
        .toCompletableFuture();

ストリーム レイヤーに公開します

ストリーム レイヤーに公開されたデータのバージョンが更新されていません。 消費者がただちに処理できるようになります。 データは、ストリーム レイヤーにサブスクライブすることで取得できます。

以下のスニペットは、ストリーム レイヤーに公開する方法を示しています

Scala
Java
// create writeEngine and queryApi for a catalog
val writeEngine = DataEngine().writeEngine(catalogHrn)
val queryApi = DataClient().queryApi(catalogHrn)

// subscribe to receive new publications from stream layer
queryApi.subscribe("stream-layer",
                   ConsumerSettings("test-consumer"),
                   partition => println("Received " + new String(partition.partition)))

val partitions =
  Source.single(
    NewPartition(
      partition = newPartitionId1,
      layer = streamingLayerId,
      data = NewPartition.ByteArrayData(blobData),
      dataSize = dataSize,
      checksum = checksum,
      compressedDataSize = compressedDataSize,
      timestamp = Some(timestamp) // optional, see explation below
    )
  )

writeEngine.publish(partitions)
// create writeEngine and queryApi for a catalog
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
WriteEngine writeEngine = DataEngine.get(myActorSystem).writeEngine(catalogHrn);

// subscribe to receive new publications from stream layer
queryApi.subscribe(
    "stream-layer",
    new ConsumerSettings.Builder().withGroupName("test-consumer").build(),
    partition -> processPartition(partition));

NewPartition newPartition =
    new NewPartition.Builder()
        .withPartition(partitionId)
        .withData(blobData)
        .withLayer(layer)
        .withDataSize(dataSize)
        .withCompressedDataSize(compressedDataSize)
        .withChecksum(checksum)
        .withTimestamp(OptionalLong.of(timestamp)) // optional, see explation below
        .build();

Source<PendingPartition, NotUsed> partitions = Source.single(newPartition);

writeEngine.publish(partitions);

オプションのパラメータ timestamp を指定すると、そのまま使用されます。 このパラメータが指定されていない場合System.currentTimeMillis()、現在の時刻がデフォルトで使用されます () 。 ワークフローで「イベント」時間のキャプチャが必要な場合は、タイムスタンプを提供すると便利です。そうでない場合は、現在の時間が「インジェスト」時間になります。

注 : 重要ですので、ご注意ください

ストリーム レイヤー TTL (Time to Live) 設定で設定された Kafka データ / レコードの削除は timestamp 、パラメータによってトリガーされます。 したがって、タイムスタンプを提供すると、そのタイムスタンプ + 設定された TTL (存続可能時間)に基づいてストリーム レイヤーデータが削除されます。 デフォルトでは、「インジェスト」タイムスタンプ + 設定されている TTL を使用して、データがカフカから消去されます。 この差別化要因は、インジェスト後一定の時間内にユースケースでストリームデータの削除が必要になる場合に考慮することが重要です。 Kafka レコードの TTL ( 存続可能時間 ) は、 Kafka ブローカで定義されます。

ボラタイル レイヤーに公開します

ボラタイル レイヤーはキー / 値ストアで、特定のキーの値を変更でき、最新の値のみを取得できます。 新しいデータが公開されると、古いデータが上書きされます。 コンシューマーがレイヤーで読み取ることを期待しているデータの変更内容を破壊する場合は、メタデータの新しいバージョンを公開する必要があります。

新しいバージョンをボラタイル レイヤーに公開する必要がある場合 version dependencies は、を使用して、バッチ公開を使用してパーティションをアップロードします。

以下のスニペットは、の使用方法を示し version dependenciesています。

Scala
Java
// get base version to commit a new version
val publishApi = DataClient().publishApi(catalogHrn)

publishApi.getBaseVersion().flatMap { baseVersion =>
  // compute next version to be used in Md5BlobIdGenerator
  val nextVersion =
    baseVersion
      .map(_ + 1L)
      .getOrElse(0L)

  // create writeEngine for a catalog with a deterministic BlobIdGenerator
  val writeEngine =
    DataEngine().writeEngine(catalogHrn, new StableBlobIdGenerator(nextVersion))

  // list of dependencies for this publication
  val dependencies = Seq.empty[VersionDependency]

  // given a list partitions to commit
  val partitions: Source[PendingPartition, NotUsed] =
    Source(
      List(
        NewPartition(
          partition = newPartitionId1,
          layer = volatileLayerId,
          data = maybeEmptyData
        ),
        NewPartition(
          partition = newPartitionId2,
          layer = volatileLayerId,
          data = maybeEmptyData
        )
      )
    )

  // upload data
  val commitPartitions: Source[CommitPartition, NotUsed] =
    partitions.mapAsync(parallelism = 10) { pendingPartition =>
      writeEngine.put(pendingPartition)
    }

  // publish version to metadata
  publishApi
    .publishBatch2(baseVersion, Some(Seq(volatileLayerId)), dependencies, commitPartitions)
}
// get base version to commit a new version
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);

publishApi
    .getBaseVersion()
    .thenCompose(
        baseVersion -> {
          // compute next version to be used in Md5BlobIdGenerator
          Long nextVersion = baseVersion.isPresent() ? baseVersion.getAsLong() + 1 : 0;

          // create writeEngine for a catalog with a deterministic BlobIdGenerator
          BlobIdGenerator idGenerator =
              new StableBlobIdGenerator.Builder().withVersion(nextVersion).build();

          WriteEngine writeEngine =
              DataEngine.get(myActorSystem).writeEngine(catalogHrn, idGenerator);

          // list of dependencies for this publication
          List<VersionDependency> dependencies = Collections.emptyList();

          NewPartition newPartition1 =
              new NewPartition.Builder()
                  .withPartition(partitionId1)
                  .withLayer(layer)
                  .withData(maybeEmptyData)
                  .build();

          NewPartition newPartition2 =
              new NewPartition.Builder()
                  .withPartition(partitionId2)
                  .withLayer(layer)
                  .withData(maybeEmptyData)
                  .build();

          ArrayList<PendingPartition> partitionList = new ArrayList<>();
          partitionList.add(newPartition1);
          partitionList.add(newPartition2);

          Source<PendingPartition, NotUsed> partitions = Source.from(partitionList);

          int parallelism = 10;

          // upload data
          Source<CommitPartition, NotUsed> commitPartitions =
              partitions.mapAsync(parallelism, writeEngine::put);

          // publish version to metadata
          CompletionStage<Done> done =
              publishApi.publishBatch2(
                  baseVersion,
                  Optional.of(Arrays.asList(layer)),
                  dependencies,
                  commitPartitions);
          return done;
        });

ボラタイル レイヤー内のデータの更新のみが必要な場合 publish は、汎用的な方法を使用します。

以下のスニペットは、の使用方法を示し publishています。

Scala
Java
// create queryApi for a catalog to find latest version
val queryApi = DataClient().queryApi(catalogHrn)

queryApi.getLatestVersion().flatMap { maybeLatestVersion =>
  val latestVersion =
    maybeLatestVersion.getOrElse(throw new IllegalArgumentException("No version found!"))

  // create writeEngine for a catalog with a deterministic BlobIdGenerator
  val writeEngine =
    DataEngine().writeEngine(catalogHrn, new StableBlobIdGenerator(latestVersion))

  val partitions: Source[PendingPartition, NotUsed] =
    Source(
      List(
        NewPartition(
          partition = newPartitionId1,
          layer = volatileLayerId,
          data = someData
        ),
        NewPartition(
          partition = newPartitionId2,
          layer = volatileLayerId,
          data = someData
        )
      )
    )

  // publish data without batch token
  partitions
    .mapAsync(parallelism = 10) { partition =>
      writeEngine.put(partition)
    }
    .runWith(Sink.ignore)
}
// create queryApi for a catalog to find latest version
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);

CompletionStage<Done> completionStage =
    queryApi
        .getLatestVersion(OptionalLong.empty())
        .thenCompose(
            maybeLatestVersion -> {
              if (!maybeLatestVersion.isPresent())
                throw new IllegalArgumentException("No version found!");

              Long latestVersion = maybeLatestVersion.getAsLong();

              // create writeEngine for a catalog with a deterministic BlobIdGenerator
              BlobIdGenerator idGenerator =
                  new StableBlobIdGenerator.Builder().withVersion(latestVersion).build();

              WriteEngine writeEngine =
                  DataEngine.get(myActorSystem).writeEngine(catalogHrn, idGenerator);

              NewPartition newPartition1 =
                  new NewPartition.Builder()
                      .withPartition(partitionId1)
                      .withLayer(layer)
                      .withData(someData)
                      .build();

              NewPartition newPartition2 =
                  new NewPartition.Builder()
                      .withPartition(partitionId2)
                      .withLayer(layer)
                      .withData(someData)
                      .build();

              ArrayList<PendingPartition> partitionList = new ArrayList<>();
              partitionList.add(newPartition1);
              partitionList.add(newPartition2);

              Source<PendingPartition, NotUsed> partitions = Source.from(partitionList);

              int parallelism = 10;

              // publish data without batch token
              CompletionStage<Done> done =
                  partitions
                      .mapAsync(parallelism, writeEngine::put)
                      .runWith(Sink.ignore(), myMaterializer);

              return done;
            });

ボラタイル レイヤーから削除します

メタデータおよびデータをボラタイル レイヤーから削除する必要がある場合は、次の 2 つの手順を実行します。

  1. まず DataEngine.writeEngine 、 with DeletePartition オブジェクトを使用してデータを削除します。 DeletePartitionNewPartitionに似ていますが、ペイロードの代わりにdataHandleが含まれています。 参照されている blob オブジェクト ( データ ) が削除されました。

  2. 次に、以前の API 呼び出しwriteEngineから取得したCommitPartitionオブジェクトでPublishApi.publishBatchを使用して、メタデータを削除します。

2 番目のステップをスキップすると、揮発性パーティションの有効期限が切れた場合と同じ結果になります。データは削除されますが、メタデータはまだ存在します。

以下のスニペットでは、ボラタイル レイヤーからデータおよびメタデータを削除する方法を示します。

Scala
Java
// get base version to commit a new version
val publishApi = DataClient().publishApi(catalogHrn)

publishApi.getBaseVersion().flatMap { baseVersion =>
  // compute next version to be used in Md5BlobIdGenerator
  val nextVersion =
    baseVersion
      .map(_ + 1L)
      .getOrElse(0L)

  // create writeEngine for a catalog with a deterministic BlobIdGenerator
  val writeEngine =
    DataEngine().writeEngine(catalogHrn, new StableBlobIdGenerator(nextVersion))

  // list of dependencies for this publication
  val dependencies = Seq.empty[VersionDependency]

  val queryApi = DataClient().queryApi(catalogHrn)
  val filter = VolatilePartitionsFilter.byIds(Set(deletePartitionId1, deletePartitionId2))
  val partitions: Seq[Partition] = Await
    .result(queryApi.getVolatilePartitionsAsIterator(volatileLayerId, filter), Duration.Inf)
    .toSeq

  // prepare list of partitions to be deleted
  val deletePartitions: Source[PendingPartition, NotUsed] =
    Source(
      partitions.map {
        case referencePartition: ReferencePartition =>
          val dataHandle = referencePartition.getDataHandle
          val partitionId = referencePartition.partition
          DeletedPartition(
            partition = partitionId,
            layer = volatileLayerId,
            dataHandle = Some(dataHandle)
          )
      }.toList
    )

  // delete data
  val commitPartitions: Source[CommitPartition, NotUsed] =
    deletePartitions.mapAsync(parallelism = 10) { pendingPartition =>
      writeEngine.put(pendingPartition)
    }

  // publish version to metadata
  publishApi
    .publishBatch2(baseVersion, Some(Seq(volatileLayerId)), dependencies, commitPartitions)
}
// get the partitions from partitionIds
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
VolatilePartitionsFilter filter =
    new VolatilePartitionsFilter.Builder()
        .withIds(new HashSet<String>(Arrays.asList(partitionId1, partitionId2)))
        .build();

final List<Partition> partitions = new ArrayList<Partition>();
try {
  queryApi
      .getVolatilePartitionsAsIterator(layerId, filter, Collections.emptySet())
      .toCompletableFuture()
      .get()
      .forEachRemaining(partitions::add);
} catch (Exception exp) {
  partitions.clear();
}

// get base version to commit a new version
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);

publishApi
    .getBaseVersion()
    .thenCompose(
        baseVersion -> {
          // compute next version to be used in Md5BlobIdGenerator
          Long nextVersion = baseVersion.isPresent() ? baseVersion.getAsLong() + 1 : 0;

          // create writeEngine for a catalog with a deterministic BlobIdGenerator
          BlobIdGenerator idGenerator =
              new StableBlobIdGenerator.Builder().withVersion(nextVersion).build();

          WriteEngine writeEngine =
              DataEngine.get(myActorSystem).writeEngine(catalogHrn, idGenerator);

          ArrayList<PendingPartition> partitionList = new ArrayList<>();
          for (Partition p : partitions) {
            if (p instanceof ReferencePartition) {
              ReferencePartition referencePartition = (ReferencePartition) p;
              partitionList.add(
                  new DeletedPartition.Builder()
                      .withLayer(layerId)
                      .withPartition(referencePartition.getPartition())
                      .withDataHandle(referencePartition.getDataHandle())
                      .build());
            }
          }

          Source<PendingPartition, NotUsed> pendingPartitions = Source.from(partitionList);

          int parallelism = 10;

          // upload data
          Source<CommitPartition, NotUsed> commitPartitions =
              pendingPartitions.mapAsync(parallelism, writeEngine::put);

          // publish version to metadata
          CompletionStage<Done> done =
              publishApi.publishBatch2(
                  baseVersion,
                  Optional.of(Arrays.asList(layerId)),
                  Collections.emptyList(),
                  commitPartitions);
          return done;
        });

注 : BlobIdGenerator

StableBlobIdGenerator 揮発性パーティションをアップロードするための書き込みエンジンの作成には、を使用することをお勧めします。 独自の BlobIdGenerator を定義する場合は、メソッドgenerateVolatileBlobId(partition)が安定していることを確認します。つまり、特定のパーティションでは、各呼び出しで同じblobIdメソッドが生成されます。 デフォルトで generateVolatileBlobId は、の結果が返さ generateBlobIdれます。 したがって、このメソッドが安定している場合は問題ありません。そうでない場合は、揮発性パーティションのための非安定な blobId を持つことで、孤立した blob が作成される可能性があるため、オーバーライドする必要があります。

以下のカスタム blobIdGenerator 例 :

Scala
Java
class CustomBlobIdGenerator extends BlobIdGenerator {

  override def generateBlobId(partition: NewPartition): String =
    UUID.randomUUID.toString

  override def generateVolatileBlobId(partition: NewPartition): String =
    "volatile-partition-" + partition.partition

}
public class JavaCustomBlobIdGenerator implements BlobIdGenerator {
  @Override
  public String generateBlobId(NewPartition partition) {
    return UUID.randomUUID().toString();
  }

  @Override
  public String generateVolatileBlobId(NewPartition partition) {
    return "volatile-partition-" + partition.partition();
  }
}

インデックス レイヤーに公開します

インデックス レイヤーに公開されたデータはバージョン管理されていませんが、インデックスが作成されます。 データを公開してインデックスを作成するには、次の 2 つの方法があります。

  • WriteEngine.put メソッドおよび PublishApi.index メソッドを呼び出して、データを個別に公開およびインデックス化できます。
  • または、パーティションのデータを公開およびインデックス化する WriteEngine.uploadAndIndex メソッドを呼び出します。

以下のスニペットは、後でインデックス レイヤーに公開できる新しいパーティションを作成する方法を示しています。

Scala
Java
// How to define NewPartition for Index layer
val newPartition = NewPartition(
  partition = "",
  layer = indexLayerId,
  data = ByteArrayData(bytes),
  fields = Some(
    Map(
      "someIntKey" -> IntIndexValue(42),
      "someStringKey" -> StringIndexValue("abc"),
      "someBooleanKey" -> BooleanIndexValue(true),
      "someTimeWindowKey" -> TimeWindowIndexValue(123456789L),
      "someHereTileKey" -> HereTileIndexValue(91956L)
    )),
  metadata = Some(
    Map(
      "someKey1" -> "someValue1",
      "someKey2" -> "someValue2"
    )),
  checksum = Some(checksum),
  crc = Some(crc),
  dataSize = Some(dataSize)
)
// How to define NewPartition for Index layer
NewPartition newPartition =
    new NewPartition.Builder()
        .withPartition("")
        .withLayer(indexLayerId)
        .withData(bytes)
        .addIntField("someIntKey", 42)
        .addStringField("someStringKey", "abc")
        .addBooleanField("someBooleanKey", true)
        .addTimeWindowField("someTimeWindowKey", 123456789L)
        .addHereTileField("someHereTileKey", 91956L)
        .addMetadata("someKey1", "someValue1")
        .addMetadata("someKey2", "someValue2")
        .withChecksum(Optional.of(checksum))
        .withDataSize(OptionalLong.of(dataSize))
        .build();

メタデータパラメータは、インデックスキーに関連付けられていない追加のキー値コレクションです。 メタデータキーは、インジェスト時間などのレコードに関する追加情報を保存できるユーザー定義のフィールドです。 Map("ingestionTime" -> "1532018660873")

NewPartition.fields クラスメンバーは、ポータルのインデックス属性、または OLP CLI のインデックス定義とも呼ばれます。

以下のスニペットでは、単一の方法でデータをアップロードし、パーティションのインデックスを作成 WriteEngine.uploadAndIndexする方法について説明します。

Scala
Java
// The example illustrated how to upload data and to index partition
// with single method WriteEngine.uploadAndIndex
writeEngine.uploadAndIndex(Iterator(newPartition))
// The example illustrated how to upload data and to index partition
// with single method WriteEngine.uploadAndIndex
Iterator<NewPartition> partitions = Arrays.asList(newPartition).iterator();
CompletionStage<Done> publish = writeEngine.uploadAndIndex(partitions);

以下のスニペットでは、との間でデータをアップロード WriteEngine.put し、パーティションのインデックスを作成する方法を示します PublishApi.publishIndex

Scala
Java
// How to upload data with WriteEngine.put and
// index the partition with PublishApi.publishIndex
val putAndIndex: Future[Done] =
  for {
    commitPartition <- writeEngine.put(newPartition)
    _ <- publishApi.publishIndex(indexLayerId, Iterator(commitPartition))
  } yield Done
// How to upload data with WriteEngine.put and
// index the partition with PublishApi.publishIndex
CompletionStage<Done> putAndIndex =
    writeEngine
        .put(newPartition)
        .thenCompose(
            commitPartition -> {
              Iterator<CommitPartition> commitPartitions =
                  Arrays.asList(commitPartition).iterator();
              return publishApi.publishIndex(indexLayerId, commitPartitions);
            });

インデックス レイヤーを更新します

インデックス レイヤーのデータを変更する必要がある場合 PublishApi.updateIndex は、 API コールを使用できます。 このメソッドは、次の 3 つの引数を取ります

  • layer - 更新するレイヤーのレイヤー ID 。
  • additions - 追加するパーティションのリスト。 対応するパーティションを追加する前に、 WriteEngine.put を使用してデータの blob をアップロードする必要があります。
  • deletions - 削除するパーティションのリスト。

次のスニペットは PublishApi.updateIndex 、 API の使用方法を示しています。

Scala
Java
val updateIndex: Future[Done] = {
  // partitions to add
  // see above how to define a new partition for an index layer
  val additions = Seq(newPartition)
  // partitions to remove
  // use CommitPartition.deletedIndexPartition to define a partition its data handle that
  // you plan to remove
  val removals = Seq(CommitPartition.deletedIndexPartition(dataHandle, indexLayerId))

  for {
    // first you have to upload corresponding blobs of the new partitions to the Blob Store
    committedAdditions <- Future.sequence(additions.map(p => writeEngine.put(p)))
    _ <- publishApi.updateIndex(indexLayerId,
                                committedAdditions.toIterator,
                                removals.toIterator)
  } yield Done
}
CompletionStage<Done> updateIndex =
    writeEngine
        // first you have to upload corresponding blobs of the new partitions
        // to the Blob Store
        .put(newPartition)
        .thenCompose(
            commitPartition -> {
              Iterator<CommitPartition> additions = Arrays.asList(commitPartition).iterator();

              // use DeleteIndexPartitionBuilder to define partitions that you plan to remove
              CommitPartition deletePartition =
                  new CommitPartition.Builder()
                      .deleteIndexPartition()
                      .withLayer(indexLayerId)
                      .withDataHandle(dataHandle)
                      .build();

              Iterator<CommitPartition> removals = Arrays.asList(deletePartition).iterator();

              return publishApi.updateIndex(indexLayerId, additions, removals);
            });

インデックス レイヤーから削除します

インデックス レイヤーでメタデータおよびデータを削除する必要がある場合 PublishApi.deleteIndex は、 API コールを使用できます。 PublishApi.deleteIndex API の呼び出しが成功すると、削除操作がスケジュールされます。

このメソッドは、次の 2 つの引数を取ります

  • layer - 一部のレコードを削除するレイヤーのレイヤー ID 。
  • queryString -RSQL クエリ言語で記述 された文字列で、インデックス レイヤーをクエリします。

このメソッドは次の値を返します

  • deleteId - 後で削除ステータスのクエリに使用できる文字列。

削除ステータスを確認するには QueryApi.queryIndexDeleteStatus 、 API コールを使用します。

このメソッドは、次の 1 つの引数を取ります

  • deleteId - PublishApi.deleteIndex API 呼び出しから削除リクエスト ID が返されました。

このメソッドは次の値を返します

  • DeleteIndexesStatusResponse - この応答は、削除ステータスリクエスト時に削除されたパーティションの状態と数に関する情報を提供します。

以下のスニペットで PublishApi.deleteIndex は、 API および QueryApi.queryIndexDeleteStatus API の使用方法について説明します。

Scala
Java
import scala.concurrent.Await
import scala.concurrent.duration._

val queryString = "someIntKey>42;someStringKey!=abc"
val deleteId =
  Await.result(publishApi.deleteIndex(indexLayerId, queryString), 45.seconds)

// Note that the delete operation for deleting records in index layer is an async operation
// This example will return the current status of the delete request
// If user wants to wait for the delete status to be in Succeeded state, user may have to
// perform multiple delete status calls
// It is recommended to use exponential backoff policy to reduce the rate of delete status
// calls to the server
val deleteStatusResponse =
  Await.result(queryApi.queryIndexDeleteStatus(indexLayerId, deleteId), 5.seconds)
println("Current state of index delete request is " + deleteStatusResponse.state)
String queryString = "someIntKey>42;someStringKey!=abc";
String deleteId =
    publishApi.deleteIndex(indexLayerId, queryString).toCompletableFuture().join();

// Note that the delete operation for deleting records in index layer is an async operation
// This example will return the current status of the delete request
// If user wants to wait for the delete status to be in Succeeded state, user may have to
// perform multiple delete status calls
// It is recommended to use exponential backoff policy to reduce the rate of delete status
// calls to the server
DeleteIndexesStatusResponse deleteStatusResponse =
    queryApi.queryIndexDeleteStatus(indexLayerId, deleteId).toCompletableFuture().join();
System.out.println(
    "Current state of index delete request is " + deleteStatusResponse.state());

オブジェクトをオブジェクト ストア レイヤーにアップロードします

オブジェクト ストア レイヤーはキー / 値ストアです。 既存のキーまたは存在しないキーにデータをアップロードできます。 データは変更可能で、並列書き込みが可能です。 でキーを公開し / て階層構造を作成し、同じプレフィックスの下にキーを一覧表示できます。

次のコード スニペットは、オブジェクト ストア レイヤーにデータをアップロードする方法を示しています。

Scala
Java
// create writeEngine for a catalog
val writeEngine = DataEngine().writeEngine(catalogHrn)

writeEngine.uploadObject2(layer,
                          key,
                          NewPartition.ByteArrayData(blobData),
                          Some(ContentTypes.`application/json`.toString()))
// create writeEngine for a catalog
WriteEngine writeEngine = DataEngine.get(myActorSystem).writeEngine(catalogHrn);

CompletableFuture<Done> futureUploadObject =
    writeEngine
        .uploadObject2(
            layer,
            key,
            new NewPartition.ByteArrayData(blobData),
            Optional.of(ContentTypes.APPLICATION_JSON.toString()),
            Optional.empty())
        .toCompletableFuture();

オブジェクト ストア レイヤーからオブジェクトを削除します

オブジェクトはオブジェクト ストア レイヤーから削除できます。 削除リクエストを送信すると、最終的にオブジェクトがレイヤーから削除されます。

次のコード スニペットは、オブジェクト ストア レイヤーからオブジェクトを削除する方法を示しています。

Scala
Java
// create writeEngine
val writeEngine = DataEngine().writeEngine(catalogHrn)

writeEngine.deleteObject(layer, key)
// create writeEngine
WriteEngine writeEngine = DataEngine.get(myActorSystem).writeEngine(catalogHrn);

CompletableFuture<Done> futureUploadObject =
    writeEngine.deleteObject(layer, key).toCompletableFuture();

オブジェクト ストア レイヤー内のオブジェクトをコピーします

Object Store を使用すると、同じレイヤー内のオブジェクトをサーバー側でコピーできます。

次のコード スニペットは、同じオブジェクト ストア レイヤー内のオブジェクトをコピーする方法を示しています。

Scala
Java
// create writeEngine
val writeEngine = DataEngine().writeEngine(catalogHrn)

writeEngine.copyObject(layer, destinationKey, sourceKey)
// create writeEngine
WriteEngine writeEngine = DataEngine.get(myActorSystem).writeEngine(catalogHrn);

CompletableFuture<Done> futureUploadObject =
    writeEngine.copyObject(layer, destinationKey, sourceKey).toCompletableFuture();

Interactive マップ レイヤーに公開します

インタラクティブなマップ レイヤーにデータを公開するには、 API を公開を使用します。「インタラクティブ」 API の POST 、 PUT 、またはパッチリクエストを使用して、データをレイヤーに送信します。 対話型マップ レイヤーからデータを削除するには、削除リクエストを使用します。

Interactive マップ レイヤーにデータをアップロードします

指定した機能をインタラクティブなマップ レイヤーにアップロードする場合PublishApi.putFeature は、 API コールを使用できます。

このメソッドは、次の 3 つの引数を取ります

  • layer - 更新するレイヤーのレイヤー ID 。
  • feature - インタラクティブマップ レイヤーでアップロードする機能。
  • featureId - 機能の機能 ID 。

指定した FeatureCollection を対話型のマップ レイヤーにアップロードする場合PublishApi.putFeatureCollection は、 API 呼び出しを使用できます。

このメソッドは、次の 2 つの引数を取ります

  • layer - 更新するレイヤーのレイヤー ID 。
  • featureCollection - インタラクティブなマップ レイヤーでアップロードする FeatureCollection 。

次のスニペットは PublishApi.putFeatureCollection 、 API の使用方法を示しています。

Scala
Java
// create publishApi
val publishApi = DataClient().publishApi(catalogHrn, settings)

val featureCollection =
  FeatureCollection(
    features = immutable.Seq(
      Feature(id = Some("feature-1"),
              geometry = Some(Point(coordinates = Some(immutable.Seq(10.0, 12.0)))),
              properties = Some(Map("prop1" -> "some-value", "prop2" -> 10)))
    ))

val futureResponse = publishApi.putFeatureCollection(layerId, featureCollection)
val response = Await.result(futureResponse, timeout)
// create queryApi
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);

// Map of properties in the feature
Map properties = new HashMap();
properties.put("prop1", "some-value");
properties.put("prop2", 10);

Feature feature =
    new Feature.Builder()
        .withId("feature-1")
        .withGeometry(
            new Point.Builder()
                .withCoordinates(new ArrayList<>(Arrays.asList(10.0, 12.0)))
                .build())
        .withProperties(properties)
        .build();
FeatureCollection featureCollection =
    new FeatureCollection.Builder()
        .withFeatures(new ArrayList<>(Arrays.asList(feature)))
        .build();

FeatureCollection response =
    publishApi.putFeatureCollection(layerId, featureCollection).toCompletableFuture().join();

地理座標

Interactive マップ レイヤーは、地理座標を使用して動作します。 地理座標の意味は 、 GeoJSON RFC7946 で定義されています。 RFC7946 の「位置付け」の章を引用 :

" 位置は数値の配列です。 2 つ以上の要素が必要です。 最初の 2 つの要素は、経度と緯度、または東距と北距です。正確には、この順序で 10 進数を使用します。 高度または高度は、オプションの 3 番目の要素として含めることができます。 "

RFC7946 の「バウンディング ボックス」の章を引用するバウンディングボックスの場合 :

"bbox メンバーの値は、長さ 2*n の配列である必要があります。 n は、含まれているジオメトリで表される寸法の数です。最も南寄りのポイントのすべての軸の後に、より北寄りのポイントのすべての軸が続きます。 bbox の軸の順序は、形状の軸の順序に従います。 "bbox" 値は、一定の経度、緯度、および高度の線に続くエッジを持つシェイプを定義します。

インタラクティブマップ レイヤーを更新します

指定した FeatureCollection を Interactive マップ レイヤーに更新する場合PublishApi.postFeatureCollection は、 API 呼び出しを使用できます。

このメソッドには 5 つの引数があります。

  • layer - 更新するレイヤーのレイヤー ID 。
  • featureCollection - インタラクティブなマップ レイヤーで更新する FeatureCollection 。
  • ifExist - 指定した ID の機能が存在する場合に実行するアクション。 デフォルトは patch です。
  • ifNotExist - 指定した ID の機能が存在しないか、またはその機能に ID が含まれていない場合に実行するアクション。デフォルトは create です。
  • transactional - これがトランザクション操作であるかどうかを定義します。 デフォルトは true です。

またはPublishApi.postFeatureModifications 、 API コールを使用することもできます。

このメソッドは、次の 3 つの引数を取ります

  • layer - 更新するレイヤーのレイヤー ID 。
  • featureModificationList - featureModificationList には、 FeatureModification オブジェクトのリストが含まれています。 各 FeatureModification オブジェクトには、 onFeatureNotExists という名前のフィーチャのリストが含まれています。この param は、指定された ID のフィーチャが存在しない場合に実行するアクションを定義します。また、 onFeatureExists という名前の param は、指定された ID のフィーチャが存在する場合に実行するアクションを定義し
  • transactional - これがトランザクション操作であるかどうかを定義します。 デフォルトは true です。

インタラクティブマップ レイヤーで指定した機能にパッチを適用する場合PublishApi.patchFeature は、 API コールを使用できます。

このメソッドは、次の 3 つの引数を取ります

  • layer - 更新するレイヤーのレイヤー ID 。
  • feature - インタラクティブなマップ レイヤーで更新する機能。
  • featureId - 機能の機能 ID 。

次のスニペットは PublishApi.patchFeature 、 API の使用方法を示しています。

Scala
Java
// create publishApi
val publishApi = DataClient().publishApi(catalogHrn, settings)

val featureId = "feature-1"
val feature = Feature(geometry = Some(Point(coordinates = Some(immutable.Seq(10.0, 15.0)))))

val futureResponse = publishApi.patchFeature(layerId, feature, featureId)
val response = Await.result(futureResponse, timeout)
// create queryApi
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);

// The featureId to be updated
String featureId = "feature-1";

// Updated feature
Feature feature =
    new Feature.Builder()
        .withGeometry(
            new Point.Builder()
                .withCoordinates(new ArrayList<>(Arrays.asList(10.0, 15.0)))
                .build())
        .build();

Feature response =
    publishApi.patchFeature(layerId, feature, featureId).toCompletableFuture().join();

Interactive マップ レイヤーから削除します

インタラクティブマップ レイヤーから指定した機能を削除する場合 PublishApi.deleteFeatures は、 API コールを使用できます。

このメソッドは、次の 3 つの引数を取ります

  • layer - 更新するレイヤーのレイヤー ID 。
  • ids - 削除する機能 ID のリスト。
  • context - インタラクティブマップコンテキスト ( オプションのパラメータ )- 有効な値の一覧については、以下を参照してください。

次のスニペットは PublishApi.deleteFeatures 、 API の使用方法を示しています。

Scala
Java
// create publishApi
val publishApi = DataClient().publishApi(catalogHrn, settings)

// List of feature ids to delete
val ids = Seq("feature-1", "feature-2")

val futureResponse = publishApi.deleteFeatures(layerId, ids, Some(context))
val response = Await.result(futureResponse, timeout)

val deletedIds: Seq[String] = response.deleted.get
// create queryApi
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);

List<String> ids = new ArrayList<>(Arrays.asList("feature-1", "feature-2"));

FeatureCollection response =
    publishApi.deleteFeatures(layerId, ids, Optional.of(context)).toCompletableFuture().join();

// Check the list of deleted feature Ids
List<String> deletedFeatures = response.getDeleted();

Interactive マップ レイヤーから変更セットを削除します

インタラクティブマップ レイヤーから 1 つ以上の変更セットを削除する場合 PublishApi.deleteFeatureChanges は、 API 呼び出しを使用できます。

このメソッドは、次の 2 つの引数を取ります

  • layer - 更新するレイヤーのレイヤー ID 。
  • versionQuery - 削除するバージョンを指定するために使用されるクエリ パラメーター

次のスニペットは PublishApi.deleteFeatureChanges 、 API の使用方法を示しています。

Scala
Java
// create publishApi
val publishApi = DataClient().publishApi(catalogHrn, settings)

// delete all changesets bellow version=8L
val response = publishApi.deleteFeatureChanges(
  layerId,
  VersionQuery.lessThan(8L)
)
// create publishApi
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);

// delete all changesets bellow version=8L
publishApi.deleteFeatureChanges(layerId, VersionQuery.lessThan(8L));

データエンジンを使用して Interactive マップ レイヤーに機能をアップロードします

インタラクティブなマップ レイヤーに機能をアップロードする場合 writeEngine.uploadIMLFeaturesAsSource は、 API コールを使用できます。

このメソッドは、次の 3 つの引数を取ります

  • layer - 更新するレイヤーのレイヤー ID 。
  • features - アップロードする機能のソース。
  • batchsize - アップロードバッチ内の機能の数。

次のスニペットは writeEngine.uploadIMLFeaturesAsSource 、 API の使用方法を示しています。

Scala
Java
// create writeEngine
val writeEngine = DataEngine().writeEngine(catalogHrn)

val batchSize = 100

// Source of Feature to upload
val source: Source[Feature, NotUsed] = Source(featureCollection.features)

val response = writeEngine.uploadIMLFeaturesAsSource(layerId, source, Some(batchSize))
Await.result(response, timeout)
// create writeEngine
WriteEngine writeEngine = DataEngine.get(myActorSystem).writeEngine(catalogHrn);

int batchSize = 100;

// Source of feature to upload
Source<Feature, NotUsed> source = Source.from(featureCollection.getFeatures());

Done done =
    writeEngine
        .uploadIMLFeaturesAsSource(layerId, source, OptionalInt.of(batchSize))
        .toCompletableFuture()
        .join();

Data-Engine を使用して、 FeatureCollection を Interactive マップ レイヤーにアップロードします

大きい FeatureCollection をインタラクティブなマップ レイヤーにアップロードする場合 writeEngine.uploadIMLFeatureCollection は、 API 呼び出しを使用できます。

このメソッドは、次の 3 つの引数を取ります

  • layer - 更新するレイヤーのレイヤー ID 。
  • featureCollection- アップロードする FeatureCollection 。
  • batchsize - アップロードバッチ内の機能の数。

次のスニペットは writeEngine.uploadIMLFeatureCollection 、 API の使用方法を示しています。

Scala
Java
// create publishApi
val writeEngine = DataEngine().writeEngine(catalogHrn)

// Large FeatureCollection to upload
val featureCollection = new FeatureCollection.JsonBuilder(json).build
val batchSize = 100

val response =
  writeEngine.uploadIMLFeatureCollection(layerId, featureCollection, Some(batchSize))
Await.result(response, timeout)
// create writeEngine
WriteEngine writeEngine = DataEngine.get(myActorSystem).writeEngine(catalogHrn);

int batchSize = 100;

// Large feature collection to upload
FeatureCollection featureCollection = new FeatureCollection.JsonBuilder(json).build();

Done done =
    writeEngine
        .uploadIMLFeatureCollection(layerId, featureCollection, OptionalInt.of(batchSize))
        .toCompletableFuture()
        .join();

Interactive Map リクエストの有効な値

コンテキスト

  • DEFAULT = 指定されていない場合のデフォルト値。 複合レイヤーの場合、拡張ルールに基づいて操作が行われます。 通常のレイヤーでは、これが唯一の有効なコンテキストです。
  • EXTENSION = 操作は拡張内でのみ実行され、拡張レイヤーでは実行されません。
  • SUPER = 読み取り操作にのみ適用されます。 操作は、拡張するレイヤー(スーパーレイヤー)でのみ実行されます。

VersionQuery

インタラクティブマップ レイヤーから変更セットを削除する場合、現在サポートされているフィルタは lessThan のみです。 version 特定のののフィルタ version は、ファクトリメソッドを使用して作成できます。 VersionQuery.lessThan(version)

」に一致する結果は 件です

    」に一致する結果はありません