メタデータを入手します

カタログ、レイヤー、およびパーティションに関する情報を取得するに data-client は、プロジェクトへの依存関係としてモジュールを追加します。

カタログのバージョンを検索します

カタログのバージョンまたはバージョン情報を取得するために使用されるメソッドが含まれています。

注 : バージョンバージョンレイヤーで使用されます

バージョン付レイヤーのみがバージョン管理を行います。 以下に示す例では、メソッド as getPartitionsgetPartitionsByIdgetPartitionsAsIteratorgetChangesgetChangesByIdgetChangesAsIteratorを使用してメタデータを取得するために使用できる、特殊なバージョンまたは一連のバージョンについての情報を提供 します。QueryApi インスタンス内。

カタログの最新バージョンを検索するには、次のものを追加します。

Scala
Java
// create queryApi for source catalog
val queryApi = DataClient().queryApi(catalogHrn, settings)

// fetch latest version of catalog
queryApi.getLatestVersion(None).flatMap {
  case Some(version) =>
    println(s"Query catalog for $version")
    Future.successful(Done)

  case None =>
    println(s"Catalog has no versions")
    Future.successful(Done)
}
// create queryApi for source catalog
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);

// fetch latest version of catalog
queryApi
    .getLatestVersion(OptionalLong.empty())
    .thenAccept(
        version -> {
          if (version.isPresent())
            System.out.println("Catalog latest version is: " + version.getAsLong());
          else System.out.println("Catalog has no versions");
        });

特定のカタログバージョンに関する情報を取得するには、次のものを追加します。

Scala
Java
// fetch version information for specific version of catalog
queryApi.getVersion(0).map { versionInfo =>
  println(s"Query catalog for $versionInfo")
}
// fetch version information for specific version of catalog
queryApi
    .getVersion(0L)
    .thenAccept(
        versionInfo -> {
          System.out.println("Query catalog for " + versionInfo);
        });

カタログの一連のバージョンに関する情報を取得するには、次のものを追加します。

Scala
Java
// fetch version information for version range (0, 10)
queryApi.getVersions(0, 10).map { versionInfos =>
  versionInfos.foreach(println)
}
// fetch version details for version range (0, 10)
queryApi
    .getVersions(0L, 10L)
    .thenAccept(versionInfos -> System.out.println("Catalog versions: " + versionInfos));

注 : 明示バージョンを使用しています

1 つの処理ジョブが頻繁に HERE platform を複数回呼び出しているため、データを取得するときに特定のカタログバージョンを明示的に定義します。 特定のカタログバージョンを使用すると、カタログ内で発生する可能性のある変更によって、以降のリクエストに対する応答で情報が予期せず変更されないようにできます。

他のカタログの一連のバージョンと互換性のあるカタログのバージョンを取得する API もありdependenciesます () 。

次のいずれかのプロパティがある場合、カタログバージョンに互換性があります。

  • (直接または間接的に)では、のカタログの少なくとも 1 つに依存 dependencies し、これらのすべての依存関係はで指定された正確なカタログバージョンにあります dependencies
  • のどのカタログにも依存しません dependencies

互換性のあるすべてのバージョンを最新バージョンから古いバージョンに注文するには、次のものを追加します。

Scala
Java
// Get all dependencies of a specific version of our catalog
val dependencies: Future[Set[CatalogVersion]] = queryApi
  .getVersion(0)
  .map(_.dependencies.map(d => CatalogVersion(d.hrn, d.version)).toSet)

val otherQueryApi = DataClient().queryApi(otherHrn)
// Find all versions of `otherHrn` that are compatible with our catalog version
dependencies
  .flatMap(otherQueryApi.getCompatibleVersionsAsIterator)
  .map { compatibleVersions: Iterator[CompatibleVersion] =>
    compatibleVersions.foreach(println)
  }
// get all dependencies of a specific version of our catalog
CompletionStage<Set<CatalogVersion>> dependencies =
    queryApi
        .getVersion(0L)
        .thenApply(
            versionInfo ->
                versionInfo.getDependencies().stream()
                    .map(d -> new CatalogVersion(d.getHrn(), d.getVersion()))
                    .collect(Collectors.toSet()));

// create queryApi for another catalog
final QueryApi otherQueryApi = DataClient.get(myActorSystem).queryApi(otherHrn);

// find all versions of `otherHrn` that are compatible with our catalog version
dependencies
    .thenCompose(otherQueryApi::getCompatibleVersionsAsIterator)
    .thenAccept(
        compatibleVersions ->
            compatibleVersions.forEachRemaining(
                compatibleVersion -> System.out.println(compatibleVersion.toString())));

ボラタイル レイヤーからパーティションを取得します

ボラタイル レイヤーから特定のパーティションを取得するには、を使用 VolatilePartitionsFilterします。

特定の時刻以降に変更されたボラタイル レイヤーからパーティションを取得するには、次の手順を実行します。

Scala
Java
// create queryApi for source catalog
  import com.here.platform.data.client.common.VolatilePartitionsFilter._
  val queryApi = DataClient().queryApi(catalogHrn, settings)
  val layerName = ""
  val timestamp = 0L // specific timestamp
  val partition1 = ""
  val partition2 = ""
  // define a function how to process partitions
  def processPartitions(partitions: Source[Partition, NotUsed]): Future[Done] =
    partitions.runForeach(println).andThen {
      case Success(_) => println("Done")
      case Failure(exception) => println(s"Failed with $exception")
    }

  //  get changes of volatile layer since specific time and for specific partitions
  queryApi
    .getVolatilePartitions(layerName, since(timestamp) and byIds(Set(partition1, partition2)))
    .flatMap(processPartitions)
}
// create queryApi for source catalog
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
long sinceTime = 0;
String layerName = "volatileLayer";
String partition1 = "partition1";
String partition2 = "partition2";
Set<String> partitions = new HashSet<String>();
partitions.add(partition1);
partitions.add(partition2);

VolatilePartitionsFilter.Builder builder = new VolatilePartitionsFilter.Builder();
builder.withIds(partitions).withSinceTimestamp(sinceTime);
//  get changes of volatile layer since specific time and for specific partitions
Source<Partition, NotUsed> changesSource =
    queryApi
        .getVolatilePartitions(layerName, builder.build(), AdditionalFields.AllFields())
        .toCompletableFuture()
        .get();
changesSource
    .runForeach(
        partition -> {
          // define a function how to process partitions
          System.out.println(partition);
        },
        myMaterializer)
    .whenCompleteAsync(
        (result, e) -> {
          if (e != null) {
            e.printStackTrace();
          } else {
            System.out.println("DONE!");
          }
        });

VolatilePartitionsFilter ボラタイル レイヤーからデータを取得するための単純なフィルタの組み合わせを作成するために使用されるフィルタ。 これは、パーティション ID およびそれ以降の時間によってフィルタリングできます。 メソッド getVolatilePartitions には、パーティションの反復を返すツインがあり getVolatilePartitionsAsIteratorます。

バージョンレイヤーからパーティションを読み取ります

カタログ HERE リソースネーム および初期セットアップ環境の準備ができたら、 HERE platform からメタデータを取得できます。

特定のレイヤーのカタログ内のパーティションの最新バージョンを読み取るには、次のものを追加します。

Scala
Java
// create queryApi for source catalog
val queryApi = DataClient().queryApi(catalogHrn, settings)

// define a function how to process partitions
def processPartitions(partitions: Source[Partition, NotUsed]): Future[Done] =
  partitions.runForeach(println).andThen {
    case Success(_) => println("Done")
    case Failure(exception) => println(s"Failed with $exception")
  }

// fetches partitions for a specific version of a versioned layer in catalog
def fetchPartitions(catalogVersion: Long, layer: String): Future[Source[Partition, NotUsed]] =
  queryApi.getPartitions(catalogVersion, layer, AdditionalFields.All)

// It is possible to fetch partitions only if catalog has some versions
// It is not possible to fetch partitions for empty catalog
queryApi.getLatestVersion(None).flatMap {
  case Some(version) =>
    println(s"Query catalog for $version")

    for {
      partitions <- fetchPartitions(version, layer)
      _ <- processPartitions(partitions)
    } yield Done

  case None =>
    println(s"Catalog has no versions")
    Future.successful(Done)
}
// create queryApi for source catalog
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);

// get latest version
Long catalogVersion =
    queryApi
        .getLatestVersion(OptionalLong.empty())
        .toCompletableFuture()
        .get()
        .orElseThrow(() -> new Exception("Catalog has no version"));

// fetches partitions for a specific version of a versioned layer in catalog
queryApi
    .getPartitions(catalogVersion, layer, AdditionalFields.AllFields())
    .thenCompose(
        partitions ->
            partitions
                .runForeach(System.out::println, myMaterializer)
                .whenCompleteAsync(
                    (result, e) -> {
                      if (e != null) {
                        e.printStackTrace();
                      } else {
                        System.out.println("DONE!");
                      }
                    }));

カタログ内のすべてのレイヤーのすべてのパーティションをダウンロードするには fetchPartitions 、次のように機能を使用します。

Scala
def fetchPartitions(catalogVersion: Long): Future[Source[Partition, NotUsed]] =
  queryApi.getConfiguration().map { config =>
    // for each a versioned layer in list of layers in catalog configuration fetch all partitions from server
    Source(config.layers.toList)
      .mapAsync(1) { layer =>
        queryApi.getPartitions(catalogVersion, layer.name, AdditionalFields.All)
      }
      .flatMapConcat(identity)
  }

バージョンレイヤーから変更を取得します

データを段階的に処理するには、メタデータをフェッチして、カタログ内の指定したバージョン間でどのパーティションが変更されるかを判断します。

カタログ内のすべてのレイヤーの変更を取得するには、次の項目を追加します。

Scala
Java
// create queryApi for source catalog
val queryApi = DataClient().queryApi(catalogHrn, settings)

// define a function how to process partitions
def processPartitions(partitions: Source[Partition, NotUsed]): Future[Done] =
  partitions.runForeach(println)

// fetches changes partitions for a specific version range of a versioned layer in catalog
def fetchPartitions(startVersion: Long,
                    endVersion: Long): Future[Source[Partition, NotUsed]] =
  queryApi.getConfiguration().map { config =>
    // for each a versioned layer in list of layers in catalog configuration fetch all partitions from server
    Source(config.layers.toList)
      .mapAsync(1) { layer =>
        val parts = queryApi.getChangesParts(layer.name, 2)
        val source = Source
          .unfoldAsync[Future[Seq[String]], Source[Partition, NotUsed]](parts.map(_.parts)) {
            input =>
              input.flatMap {
                case Nil => Future.successful(None)
                case head :: tail =>
                  queryApi
                    .getChanges(startVersion,
                                endVersion,
                                layer.name,
                                AdditionalFields.All,
                                Some(head))
                    .map { source =>
                      Some(Future.successful(tail) -> source)
                    }
              }
          }
          .flatMapConcat(identity)
        Future(source)
      }
      .flatMapConcat(identity)
  }

// latest version that was already processed
val startVersion = 1L

queryApi.getLatestVersion(None).flatMap {
  case Some(endVersion) =>
    println(s"Query catalog for changes for ($startVersion, $endVersion]")

    for {
      partitions <- fetchPartitions(startVersion, endVersion)
      _ <- processPartitions(partitions)
    } yield Done

  case None =>
    println(s"Catalog has no versions")
    Future.successful(Done)
}
// create queryApi for source catalog
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);

// latest version that was already processed
Long startVersion = 1L;

queryApi
    .getLatestVersion(OptionalLong.empty())
    .thenCompose(
        maybeEndVersion -> {
          if (maybeEndVersion.isPresent()) {
            Long endVersion = maybeEndVersion.getAsLong();
            System.out.println(
                "Query catalog for changes for (" + startVersion + ", " + endVersion + ")");

            return queryApi
                .getConfiguration()
                .thenCompose(
                    config -> {
                      // for each a versioned layer in list of layers in catalog configuration
                      // fetch all partitions from server
                      Source<Partition, NotUsed> changesSource =
                          Source.from(config.getLayers())
                              .map(Layer::getName)
                              .mapAsync(
                                  1,
                                  layer -> {
                                    // fetches changes partitions for a specific version range
                                    // in catalog
                                    return queryApi.getChanges(
                                        startVersion,
                                        endVersion,
                                        layer,
                                        AdditionalFields.AllFields());
                                  })
                              .flatMapConcat(i -> i);

                      return changesSource
                          .runForeach(
                              partition -> {
                                // define a function how to process partitions
                                System.out.println(partition);
                              },
                              myMaterializer)
                          .whenCompleteAsync(
                              (result, e) -> {
                                if (e != null) {
                                  e.printStackTrace();
                                } else {
                                  System.out.println("DONE!");
                                }
                              });
                    });
          } else {
            System.out.println("Catalog has no versions");
            return CompletableFuture.completedFuture(Done.getInstance());
          }
        });

バージョンレイヤーのパーティションの追加フィールドを取得します

デフォルトでは、 HERE platform は応答にパーティションの基本情報のみを含めます。 基本情報に layer は、およびが含ま partitionれています。 dataSize やなどの追加のフィールドを取得する checksumには、明示的に要求する必要があります。

回答には、カタログの公開時にこれらのフィールドが含まれていた場合にのみ、追加のフィールドに関する情報が含まれます。 これらのフィールドが公開中に含まれていない場合、リクエストしてもこれらのフィールドは応答に含まれません。

追加のフィールドの情報を要求するには、次の情報を追加します。

Scala
Java
// fetches partitions for a specific version of a versioned layer in catalog
def fetchPartitions(catalogVersion: Long, layer: String): Future[Source[Partition, NotUsed]] =
  queryApi.getPartitions(catalogVersion,
                         layer,
                         Set(AdditionalField.Checksum, AdditionalField.DataSize))
// fetches partitions for a specific version of a versioned layer in catalog
Set<AdditionalField> fields = new HashSet<AdditionalField>();
fields.add(AdditionalField.Checksum);
queryApi.getPartitions(catalogVersion, layer, fields);

オブジェクト ストア レイヤー内のオブジェクトのメタデータを取得します

オブジェクト ストア レイヤー内のオブジェクトの場合、メタデータ情報は経由で利用 ObjectMetadataできます。 メタデータには lastModified 、との 2 つのフィールドがあります size

オブジェクト ストア レイヤーでオブジェクトのメタデータを取得するには、次のものを追加します。

Scala
Java
// create readEngine
val readEngine = DataEngine().readEngine(catalogHrn, settings)

// get object metadata
val objectMetadata =
  readEngine
    .getObjectMetadata(layer, key)
// create readEngine
ReadEngine readEngine = DataEngine.get(myActorSystem).readEngine(catalogHrn);

// get object metadata
CompletionStage<ObjectMetadata> dataAsSource = readEngine.getObjectMetadata(layer, key);

オブジェクト ストア レイヤーのオブジェクトを一覧表示します

オブジェクト ストア レイヤー内のオブジェクトは、レイヤーのルートの下またはプレフィックスの下に一覧表示できます。 一覧表の結果、 Akka ソースが返さ ObjectStoreListItemれます。 ObjectStoreListItem には、次のフィールドがあります。

  • name: キーの名前。
  • keyType: ObjectStoreKeyType.Object またはを使用 ObjectStoreKeyType.CommonPrefixできます。
    • ObjectStoreKeyType.Object 指定したキーでアップロードされたオブジェクト。
    • ObjectStoreKeyType.CommonPrefix 指定したパスの下の commonPrefix 。 たとえば、 "test-key1" & "test-key2/test-key3" という 2 つのオブジェクトがレイヤーのルートにアップロードされ、ルートに一覧表示されると、結果にはname "test-key1」 および keyType: ObjectStoreKeyType.Objectの 2 つの要素が含まれます。 2 つ目の要素nameには、 "test-key2" およびkeyType : ObjectStoreKeyType.CommonPrefixが含まれます。
  • lastModified: このフィールドでは、オブジェクトが最後 ZonedDateTime にフォーマットで変更された日時を定義します。 このフィールドは、 keyType がの場合にのみ表示 ObjectStoreKeyType.Objectされます。
  • size: このフィールドは、オブジェクトのサイズを表します。 このフィールドは、 keyType がの場合にのみ表示 ObjectStoreKeyType.Objectされます。

オブジェクト ストア レイヤーのルートにあるオブジェクトを一覧表示するには、次のものを追加します。

Scala
Java
// list objects as Source
val objectList =
  readEngine
    .listObjects(layer, None)

// list objects as Collection
val objectListAsCollection =
  readEngine
    .listObjectsAsCollection(layer, None)
// list objects as Source
Source<ObjectStoreListItem, NotUsed> objectList = readEngine.listObjects(layer, "");

// list objects as Collection
CompletionStage<List<ObjectStoreListItem>> objectListAsCollection =
    readEngine.listObjectsAsCollection(layer, "");

オブジェクト ストア レイヤーのプレフィックスの下にあるオブジェクトを一覧表示するには、次のものを追加します。

Scala
Java
val objectListUnderPrefix =
  readEngine
    .listObjects(layer, Some(prefix))

val objectListUnderPrefixAsCollection =
  readEngine
    .listObjectsAsCollection(layer, Some(prefix))
Source<ObjectStoreListItem, NotUsed> objectListUnderPrefix =
    readEngine.listObjects(layer, prefix);

注 :

ユーザーは、 Source 常にまたはのいずれかの完全なリストを取得 Listできます。 ただし、内部的に使用されるデフォルトのページサイズは 1000 で、 ObjectStore 100 万個のオブジェクトを含むフォルダがある場合は 1000 件のリクエストが発行されます。 処理速度が遅いです。 そのため、すでに多数のオブジェクトが存在することがわかっているか、または疑わ pageSize れる場合は、パラメーターの値を大きく定義することで、ランタイムを非常に最適化できます。

」に一致する結果は 件です

    」に一致する結果はありません