揮発性データの読み取りと書き込み

ボラタイル レイヤーからデータストリームにパーティションを読み取ります

以下のスニペットは、カタログ内のボラタイル レイヤーのパーティションを取得する方法を示しています。

Scala
Java
// create dataclient
val client = new FlinkDataClient()
val queryApi = client.queryApi(hrn)

// subscribe to a volatile layer
import com.here.platform.data.client.common.VolatilePartitionsFilter._

// specific timestamp
val timestamp = 0L
val partition1 = "somePartition1"
val partition2 = "somePartition2"

val volatilePartitions: DataStream[Partition] =
  env.addSource(
    queryApi.getVolatilePartitions(volatileLayer,
                                   since(timestamp) and byIds(Set(partition1, partition2))))
FlinkDataClient client = new FlinkDataClient();
FlinkQueryApi queryApi = client.queryApi(hrn);

Long timestamp = 0L;
String partition1 = "somePartition1";
String partition2 = "somePartition2";

Set<String> ids = new HashSet<>(Arrays.asList(partition1, partition2));
VolatilePartitionsFilter filter =
    new VolatilePartitionsFilter.Builder().withIds(ids).withSinceTimestamp(timestamp).build();

// retrieve partitions of a volatile layer
DataStream<Partition> partitions =
    env.addSource(
        queryApi.getVolatilePartitions(volatileLayer, filter, Collections.emptySet()));

メモ :

  • VolatiblePartitionsFilter が設定されていない場合、 "VolatillePartitionsFilter.empty" がデフォルトで使用され、すべてのパーティションが読み取られます。

揮発性データを公開します

BlobstoreIdGenerator データを書き換えるには、確定的な関数と組み合わせて使用します。

Scala
Java
// deterministic function that always generate same dataHandle for same partition
val idGenerator: BlobIdGenerator =
  new StableBlobIdGenerator(version = 0L)

// given a stream of PendingPartitions to be uploaded
val pendingPartitions: DataStream[PendingPartition] =
  getPendingPartitionsStream()

// add our publish sink to upload data
pendingPartitions.addSink(new FlinkDataClient().writeEngine(hrn, idGenerator).publish())
// deterministic function that always generate same dataHandle for same partition
BlobIdGenerator idGenerator = new StableBlobIdGenerator.Builder().withVersion(0L).build();

// given a stream of PendingPartitions to be uploaded
DataStream<PendingPartition> pendingPartitions = getPendingPartitionsStream();

// add our publish sink to upload data
pendingPartitions.addSink(new FlinkDataClient().writeEngine(hrn, idGenerator).publish());

」に一致する結果は 件です

    」に一致する結果はありません