カタログ、レイヤー、およびパーティションに関する情報を取得するに data-client
は、プロジェクトへの依存関係としてモジュールを追加します。
カタログのバージョンを検索します
カタログのバージョンまたはバージョン情報を取得するために使用されるメソッドが含まれています。
注 : バージョンバージョンレイヤーで使用されます
バージョン付レイヤーのみがバージョン管理を行います。 以下に示す例では、メソッド as getPartitions
、getPartitionsById
、getPartitionsAsIterator
、getChanges
、getChangesById
、getChangesAsIterator
を使用してメタデータを取得するために使用できる、特殊なバージョンまたは一連のバージョンについての情報を提供 します。QueryApi
インスタンス内。
カタログの最新バージョンを検索するには、次のものを追加します。
val queryApi = DataClient().queryApi(catalogHrn, settings)
queryApi.getLatestVersion(None).flatMap {
case Some(version) =>
println(s"Query catalog for $version")
Future.successful(Done)
case None =>
println(s"Catalog has no versions")
Future.successful(Done)
}
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
queryApi
.getLatestVersion(OptionalLong.empty())
.thenAccept(
version -> {
if (version.isPresent())
System.out.println("Catalog latest version is: " + version.getAsLong());
else System.out.println("Catalog has no versions");
});
特定のカタログバージョンに関する情報を取得するには、次のものを追加します。
queryApi.getVersion(0).map { versionInfo =>
println(s"Query catalog for $versionInfo")
}
queryApi
.getVersion(0L)
.thenAccept(
versionInfo -> {
System.out.println("Query catalog for " + versionInfo);
});
カタログの一連のバージョンに関する情報を取得するには、次のものを追加します。
queryApi.getVersions(0, 10).map { versionInfos =>
versionInfos.foreach(println)
}
queryApi
.getVersions(0L, 10L)
.thenAccept(versionInfos -> System.out.println("Catalog versions: " + versionInfos));
注 : 明示バージョンを使用しています
1 つの処理ジョブが頻繁に HERE platform を複数回呼び出しているため、データを取得するときに特定のカタログバージョンを明示的に定義します。 特定のカタログバージョンを使用すると、カタログ内で発生する可能性のある変更によって、以降のリクエストに対する応答で情報が予期せず変更されないようにできます。
他のカタログの一連のバージョンと互換性のあるカタログのバージョンを取得する API もありdependencies
ます () 。
次のいずれかのプロパティがある場合、カタログバージョンに互換性があります。
- (直接または間接的に)では、のカタログの少なくとも 1 つに依存
dependencies
し、これらのすべての依存関係はで指定された正確なカタログバージョンにあります dependencies
- のどのカタログにも依存しません
dependencies
互換性のあるすべてのバージョンを最新バージョンから古いバージョンに注文するには、次のものを追加します。
val dependencies: Future[Set[CatalogVersion]] = queryApi
.getVersion(0)
.map(_.dependencies.map(d => CatalogVersion(d.hrn, d.version)).toSet)
val otherQueryApi = DataClient().queryApi(otherHrn)
dependencies
.flatMap(otherQueryApi.getCompatibleVersionsAsIterator)
.map { compatibleVersions: Iterator[CompatibleVersion] =>
compatibleVersions.foreach(println)
}
CompletionStage<Set<CatalogVersion>> dependencies =
queryApi
.getVersion(0L)
.thenApply(
versionInfo ->
versionInfo.getDependencies().stream()
.map(d -> new CatalogVersion(d.getHrn(), d.getVersion()))
.collect(Collectors.toSet()));
final QueryApi otherQueryApi = DataClient.get(myActorSystem).queryApi(otherHrn);
dependencies
.thenCompose(otherQueryApi::getCompatibleVersionsAsIterator)
.thenAccept(
compatibleVersions ->
compatibleVersions.forEachRemaining(
compatibleVersion -> System.out.println(compatibleVersion.toString())));
ボラタイル レイヤーからパーティションを取得します
ボラタイル レイヤーから特定のパーティションを取得するには、を使用 VolatilePartitionsFilter
します。
特定の時刻以降に変更されたボラタイル レイヤーからパーティションを取得するには、次の手順を実行します。
import com.here.platform.data.client.common.VolatilePartitionsFilter._
val queryApi = DataClient().queryApi(catalogHrn, settings)
val layerName = ""
val timestamp = 0L
val partition1 = ""
val partition2 = ""
def processPartitions(partitions: Source[Partition, NotUsed]): Future[Done] =
partitions.runForeach(println).andThen {
case Success(_) => println("Done")
case Failure(exception) => println(s"Failed with $exception")
}
queryApi
.getVolatilePartitions(layerName, since(timestamp) and byIds(Set(partition1, partition2)))
.flatMap(processPartitions)
}
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
long sinceTime = 0;
String layerName = "volatileLayer";
String partition1 = "partition1";
String partition2 = "partition2";
Set<String> partitions = new HashSet<String>();
partitions.add(partition1);
partitions.add(partition2);
VolatilePartitionsFilter.Builder builder = new VolatilePartitionsFilter.Builder();
builder.withIds(partitions).withSinceTimestamp(sinceTime);
Source<Partition, NotUsed> changesSource =
queryApi
.getVolatilePartitions(layerName, builder.build(), AdditionalFields.AllFields())
.toCompletableFuture()
.get();
changesSource
.runForeach(
partition -> {
System.out.println(partition);
},
myMaterializer)
.whenCompleteAsync(
(result, e) -> {
if (e != null) {
e.printStackTrace();
} else {
System.out.println("DONE!");
}
});
注
VolatilePartitionsFilter
ボラタイル レイヤーからデータを取得するための単純なフィルタの組み合わせを作成するために使用されるフィルタ。 これは、パーティション ID およびそれ以降の時間によってフィルタリングできます。 メソッド getVolatilePartitions
には、パーティションの反復を返すツインがあり getVolatilePartitionsAsIterator
ます。
バージョンレイヤーからパーティションを読み取ります
カタログ HERE リソースネーム および初期セットアップ環境の準備ができたら、 HERE platform からメタデータを取得できます。
特定のレイヤーのカタログ内のパーティションの最新バージョンを読み取るには、次のものを追加します。
val queryApi = DataClient().queryApi(catalogHrn, settings)
def processPartitions(partitions: Source[Partition, NotUsed]): Future[Done] =
partitions.runForeach(println).andThen {
case Success(_) => println("Done")
case Failure(exception) => println(s"Failed with $exception")
}
def fetchPartitions(catalogVersion: Long, layer: String): Future[Source[Partition, NotUsed]] =
queryApi.getPartitions(catalogVersion, layer, AdditionalFields.All)
queryApi.getLatestVersion(None).flatMap {
case Some(version) =>
println(s"Query catalog for $version")
for {
partitions <- fetchPartitions(version, layer)
_ <- processPartitions(partitions)
} yield Done
case None =>
println(s"Catalog has no versions")
Future.successful(Done)
}
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
Long catalogVersion =
queryApi
.getLatestVersion(OptionalLong.empty())
.toCompletableFuture()
.get()
.orElseThrow(() -> new Exception("Catalog has no version"));
queryApi
.getPartitions(catalogVersion, layer, AdditionalFields.AllFields())
.thenCompose(
partitions ->
partitions
.runForeach(System.out::println, myMaterializer)
.whenCompleteAsync(
(result, e) -> {
if (e != null) {
e.printStackTrace();
} else {
System.out.println("DONE!");
}
}));
カタログ内のすべてのレイヤーのすべてのパーティションをダウンロードするには fetchPartitions
、次のように機能を使用します。
def fetchPartitions(catalogVersion: Long): Future[Source[Partition, NotUsed]] =
queryApi.getConfiguration().map { config =>
Source(config.layers.toList)
.mapAsync(1) { layer =>
queryApi.getPartitions(catalogVersion, layer.name, AdditionalFields.All)
}
.flatMapConcat(identity)
}
バージョンレイヤーから変更を取得します
データを段階的に処理するには、メタデータをフェッチして、カタログ内の指定したバージョン間でどのパーティションが変更されるかを判断します。
カタログ内のすべてのレイヤーの変更を取得するには、次の項目を追加します。
val queryApi = DataClient().queryApi(catalogHrn, settings)
def processPartitions(partitions: Source[Partition, NotUsed]): Future[Done] =
partitions.runForeach(println)
def fetchPartitions(startVersion: Long,
endVersion: Long): Future[Source[Partition, NotUsed]] =
queryApi.getConfiguration().map { config =>
Source(config.layers.toList)
.mapAsync(1) { layer =>
val parts = queryApi.getChangesParts(layer.name, 2)
val source = Source
.unfoldAsync[Future[Seq[String]], Source[Partition, NotUsed]](parts.map(_.parts)) {
input =>
input.flatMap {
case Nil => Future.successful(None)
case head :: tail =>
queryApi
.getChanges(startVersion,
endVersion,
layer.name,
AdditionalFields.All,
Some(head))
.map { source =>
Some(Future.successful(tail) -> source)
}
}
}
.flatMapConcat(identity)
Future(source)
}
.flatMapConcat(identity)
}
val startVersion = 1L
queryApi.getLatestVersion(None).flatMap {
case Some(endVersion) =>
println(s"Query catalog for changes for ($startVersion, $endVersion]")
for {
partitions <- fetchPartitions(startVersion, endVersion)
_ <- processPartitions(partitions)
} yield Done
case None =>
println(s"Catalog has no versions")
Future.successful(Done)
}
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
Long startVersion = 1L;
queryApi
.getLatestVersion(OptionalLong.empty())
.thenCompose(
maybeEndVersion -> {
if (maybeEndVersion.isPresent()) {
Long endVersion = maybeEndVersion.getAsLong();
System.out.println(
"Query catalog for changes for (" + startVersion + ", " + endVersion + ")");
return queryApi
.getConfiguration()
.thenCompose(
config -> {
Source<Partition, NotUsed> changesSource =
Source.from(config.getLayers())
.map(Layer::getName)
.mapAsync(
1,
layer -> {
return queryApi.getChanges(
startVersion,
endVersion,
layer,
AdditionalFields.AllFields());
})
.flatMapConcat(i -> i);
return changesSource
.runForeach(
partition -> {
System.out.println(partition);
},
myMaterializer)
.whenCompleteAsync(
(result, e) -> {
if (e != null) {
e.printStackTrace();
} else {
System.out.println("DONE!");
}
});
});
} else {
System.out.println("Catalog has no versions");
return CompletableFuture.completedFuture(Done.getInstance());
}
});
バージョンレイヤーのパーティションの追加フィールドを取得します
デフォルトでは、 HERE platform は応答にパーティションの基本情報のみを含めます。 基本情報に layer
は、およびが含ま partition
れています。 dataSize
やなどの追加のフィールドを取得する checksum
には、明示的に要求する必要があります。
注
回答には、カタログの公開時にこれらのフィールドが含まれていた場合にのみ、追加のフィールドに関する情報が含まれます。 これらのフィールドが公開中に含まれていない場合、リクエストしてもこれらのフィールドは応答に含まれません。
追加のフィールドの情報を要求するには、次の情報を追加します。
def fetchPartitions(catalogVersion: Long, layer: String): Future[Source[Partition, NotUsed]] =
queryApi.getPartitions(catalogVersion,
layer,
Set(AdditionalField.Checksum, AdditionalField.DataSize))
Set<AdditionalField> fields = new HashSet<AdditionalField>();
fields.add(AdditionalField.Checksum);
queryApi.getPartitions(catalogVersion, layer, fields);
オブジェクト ストア レイヤー内のオブジェクトの場合、メタデータ情報は経由で利用 ObjectMetadata
できます。 メタデータには lastModified
、との 2 つのフィールドがあります size
。
オブジェクト ストア レイヤーでオブジェクトのメタデータを取得するには、次のものを追加します。
val readEngine = DataEngine().readEngine(catalogHrn, settings)
val objectMetadata =
readEngine
.getObjectMetadata(layer, key)
ReadEngine readEngine = DataEngine.get(myActorSystem).readEngine(catalogHrn);
CompletionStage<ObjectMetadata> dataAsSource = readEngine.getObjectMetadata(layer, key);
オブジェクト ストア レイヤーのオブジェクトを一覧表示します
オブジェクト ストア レイヤー内のオブジェクトは、レイヤーのルートの下またはプレフィックスの下に一覧表示できます。 一覧表の結果、 Akka ソースが返さ ObjectStoreListItem
れます。 ObjectStoreListItem
には、次のフィールドがあります。
-
name
: キーの名前。 -
keyType
: ObjectStoreKeyType.Object
またはを使用 ObjectStoreKeyType.CommonPrefix
できます。 -
ObjectStoreKeyType.Object
指定したキーでアップロードされたオブジェクト。 -
ObjectStoreKeyType.CommonPrefix
指定したパスの下の commonPrefix 。 たとえば、 "test-key1" & "test-key2/test-key3" という 2 つのオブジェクトがレイヤーのルートにアップロードされ、ルートに一覧表示されると、結果にはname
"test-key1」 および keyType
: ObjectStoreKeyType.Object
の 2 つの要素が含まれます。 2 つ目の要素name
には、 "test-key2" およびkeyType
: ObjectStoreKeyType.CommonPrefix
が含まれます。
-
lastModified
: このフィールドでは、オブジェクトが最後 ZonedDateTime
にフォーマットで変更された日時を定義します。 このフィールドは、 keyType
がの場合にのみ表示 ObjectStoreKeyType.Object
されます。 -
size
: このフィールドは、オブジェクトのサイズを表します。 このフィールドは、 keyType
がの場合にのみ表示 ObjectStoreKeyType.Object
されます。
オブジェクト ストア レイヤーのルートにあるオブジェクトを一覧表示するには、次のものを追加します。
val objectList =
readEngine
.listObjects(layer, None)
val objectListAsCollection =
readEngine
.listObjectsAsCollection(layer, None)
Source<ObjectStoreListItem, NotUsed> objectList = readEngine.listObjects(layer, "");
CompletionStage<List<ObjectStoreListItem>> objectListAsCollection =
readEngine.listObjectsAsCollection(layer, "");
オブジェクト ストア レイヤーのプレフィックスの下にあるオブジェクトを一覧表示するには、次のものを追加します。
val objectListUnderPrefix =
readEngine
.listObjects(layer, Some(prefix))
val objectListUnderPrefixAsCollection =
readEngine
.listObjectsAsCollection(layer, Some(prefix))
Source<ObjectStoreListItem, NotUsed> objectListUnderPrefix =
readEngine.listObjects(layer, prefix);
注 :
ユーザーは、 Source
常にまたはのいずれかの完全なリストを取得 List
できます。 ただし、内部的に使用されるデフォルトのページサイズは 1000 で、 ObjectStore
100 万個のオブジェクトを含むフォルダがある場合は 1000 件のリクエストが発行されます。 処理速度が遅いです。 そのため、すでに多数のオブジェクトが存在することがわかっているか、または疑わ pageSize
れる場合は、パラメーターの値を大きく定義することで、ランタイムを非常に最適化できます。