カタログおよびレイヤーレベルの変更にサブスクライブします
目的: カタログおよびレイヤーレベルの変更に対するサブスクリプションを作成および管理します。
複雑さ: 中級者です
前提条件: 資格情報を取得し、資格情報を確認します
所要時間: 60 分
ソースコード: ダウンロード
通知によって、カタログ、レイヤー、またはパーティションが更新されたことを通知し、どのデータが変更されたかについての情報を提供します。 通知機能は versioned
、マップ データに依存するプラットフォームまたはプラットフォーム外のサービスでストリームアプリケーションを実行する場合に役立ちます。 この機能を使用すると、カタログが更新されたときにアプリケーションが自動的に通知を受け取るため、変更を継続的にポーリングする必要がなくなります。 通知を受信すると、アプリケーションは対応するデータのみを要求する必要があります。
特定の要件に基づいて、通知を受け取るカタログの一部を選択できます。
- カタログの新しいバージョンについて通知を受け取るカタログレベルの通知
- レイヤーまたはレイヤーの地理的領域の変更について通知を受け取るレイヤーレベルの通知
このチュートリアルで は、 OLP CLI を使用してカタログおよびレイヤーレベルの変更に対するサブスクリプションを作成および管理する方法について説明します。
サブスクリプションが作成されると、このチュートリアルで は、データ クライアント ライブラリを使用してコンシューマアプリケーションを実装する方法について説明します。
注
ただし、ユースケースがマップ データを処理して、情報を変換または取得している可能性があります。 このような処理は、通常、 1 つ以上のバージョン化されたパーティション化されたカタログを消費し、代わりに別のバージョン化されたパーティション化された 1 つの入力カタログが変更された場合は、通常、プロセスを実行して出力を更新する必要があります。
この場合、このチュートリアルで説明する通知の仕組みを使用する必要はありません。 代わり に、データ プロセッシング ライブラリおよびバッチ処理パイプラインを使用することをお勧めします。 これらは、入力カタログの変更時に自動的に実行されるように設計されています。 はじめにには 、データ プロセッシング ライブラリ チュートリアルを使用してカタログをコピーを使用します。
このチュートリアルを実行 するには、 CLI がインストールされ、プラットフォームとリポジトリ資格情報の両方がローカルマシンに設定されている必要があります。
カタログレベルの通知
このチュートリアルでは、カタログレベルの変更をサブスクライブして通知を使用する方法について説明します。
カタログで通知が有効化されている場合、ストリームはカタログと同じ HERE リソースネーム ( HERE リソースネーム )を使用して HERE platform によって自動的に作成されます。 カタログデータが更新されるたびに、カタログの新しいバージョンが作成され、新しいレコードがストリームに書き込まれます。
通知レコードには、変更のタイムスタンプと新しいカタログバージョン番号が含まれています。 バージョン番号は、更新されたデータの要求に使用できるため、特に便利です。
新しいカタログバージョンのパブリケーションについての通知を受け取るには、次の手順を実行します。
notifications
OLP CLI で有効化されたソースカタログを作成します。- データ クライアント ライブラリを使用して通知リスナーアプリケーションを実装します。
- OLP CLI を使用して新しいデータをカタログにアップロードし、通知をトリガーします。 実際の状況では、他のユーザーがデータを更新しますが、このチュートリアルでは、ユーザー側でデータを更新します。
通知を有効にしてカタログを作成します
以下の設定で通知を使用して変更を監視できるカタログを作成します。 カタログ内のデータが変更されると、通知がサブスクライバに送信されます。
カタログ設定に は、heretile
partitioningScheme
とtileLevel
14
ともに使用するサンプルversioned
レイヤーが含ま れています。
簡素化のために、 "contentType": "text/plain"
を使用します。
カタログ notifications
でが有効になっている必要があります。 新しいカタログの通知を有効にするには、カタログの作成に使用するカタログ設定ファイルに次のプロパティがあることを確認してください。
"notifications": {
"enabled": true
}
完全なカタログ設定 :
{
"id": "notifications-source",
"name": "Catalog Notifications Source Tutorial",
"summary": "Catalog Notifications Source",
"description": "When changed, the notifications will be sent to subscribers",
"notifications": {
"enabled": true
},
"layers": [
{
"id": "notification-source-versioned-layer",
"name": "Notifications Source Tutorial layer",
"summary": "Messages for the notifications tutorial.",
"description": "Messages for the notifications tutorial.",
"layerType": "versioned",
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "heretile",
"tileLevels": [
14
]
},
"contentType": "text/plain"
}
]
}
{{YOUR_USERNAME}}
以下をユーザー名で置き換え、上記 json
のファイルを使用してコマンドを実行します。
olp catalog create {{YOUR_USERNAME}}-source-catalog "{{YOUR_USERNAME}} source catalog" --config source-catalog.json
CLI で次のメッセージが表示されます。
Catalog {{SOURCE_CATALOG_HRN}} has been created.
注
レルムで請求タグが必要な場合 は、layer
セクションにbillingTag: ["YOUR_BILLING_TAG"]
プロパティを追加して設定 ファイルを更新します。
既存のカタログの通知を有効にするに は、olp catalog update [...] --notifications true
コマンドを使用します。
Maven プロジェクトを設定します
ソースコードはチュートリアルの冒頭でダウンロードし、任意のフォルダーに配置するか、またはフォルダー構造を最初から作成できます。
catalog-notification
└── src
└── main
├── java
└── resources
└── scala
この操作は、次の bash
1 つのコマンドで実行できます。
mkdir -p catalog-notification/src/main/{java,resources,scala}
Maven POM ファイルは 'Maven 設定の確認 チュートリアルのファイルに似ています
親 POM および依存関係セクションは次のように更新されます。
親 POM :
<parent>
<groupId>com.here.platform</groupId>
<artifactId>sdk-stream-bom_2.12</artifactId>
<version>2.54.3</version>
<relativePath/>
</parent>
POM は sdk-stream-bom_2.12
親 POM として使用します。 Flink ライブラリのバージョンを 環境 POM から取得できます。
依存関係 :
<dependencies>
<!-- HERE dependencies -->
<dependency>
<groupId>com.here.platform.data.client</groupId>
<artifactId>flink-support_${scala.compat.version}</artifactId>
</dependency>
<!-- External dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.compat.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
</dependencies>
次の依存関係が使用されます。
-
flink-support
- FlinkDataClient および FlinkQueryApi を使用 して HERE platform にサブスクライブし、データを取得します。 -
flink-streaming-scala
- StreamExecutionEnvironment を使用し て Apache Flink アプリを実行します。 -
slf4j-api
およびslf4j-log4j12
- FlinkDataClient からのログを参照します。
通知コンシューマアプリケーションを実装します
このチュートリアルでは、ソースのバージョン管理されたカタログに、特定の領域の温度と湿度の平均値のサンプルデータセットが含まれています。 データの例 :
Temperature,Humidity
18,72
データは毎月更新され、新しいデータがカタログにアップロードされます。
簡単 な Java/Scala アプリケーションを作成して、データセットを含むカタログが更新されたときに通知を受け取るようにします。 カタログのバージョンとタイムスタンプがコンソールに印刷されます。
{{SOURCE_CATALOG_HRN}}
前 の章で作成したソースカタログ HERE リソースネーム にプレースホルダーを変更します。
private static final HRN CATALOG_HRN = HRN.fromString("{{SOURCE_CATALOG_HRN}}");
private val CatalogHrn = HRN("{{SOURCE_CATALOG_HRN}}")
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.here.hrn.HRN;
import com.here.platform.data.client.flink.javadsl.FlinkDataClient;
import com.here.platform.data.client.flink.javadsl.FlinkQueryApi;
import com.here.platform.data.client.settings.NotificationConsumerSettings;
import java.util.Date;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class CatalogLevelNotification {
private static final HRN CATALOG_HRN = HRN.fromString("{{SOURCE_CATALOG_HRN}}");
public static final String CONSUMER_GROUP_NAME = "catalog-level-notification";
public static void main(String[] args) throws Exception {
// Create queryApi for the source catalog
final FlinkDataClient flinkDataClient = new FlinkDataClient();
FlinkQueryApi queryApi = flinkDataClient.queryApi(CATALOG_HRN);
// Specify Kafka consumer group name
NotificationConsumerSettings consumerSettings =
new NotificationConsumerSettings.Builder().withGroupName(CONSUMER_GROUP_NAME).build();
// Create the context in which a streaming program is executed
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
// source function that emits catalog-level notifications
.addSource(queryApi.subscribeToNotifications(consumerSettings))
// map notification to a human-readable string to be displayed in the console
.map(
notification ->
String.format(
"===> BatchPublicationNotification: New catalog version %s, date/time: %s",
notification.getCatalogVersion(), new Date(notification.timestamp())))
// Write to standard output
.print();
// Trigger the program execution
env.execute("Catalog-level notification listener");
}
}
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.here.hrn.HRN
import com.here.platform.data.client.flink.scaladsl.FlinkDataClient
import com.here.platform.data.client.settings.NotificationConsumerSettings
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import java.util.Date
object CatalogLevelNotificationScala {
private val CatalogHrn = HRN("{{SOURCE_CATALOG_HRN}}")
private val ConsumerGroupName = "catalog-level-notification"
def main(args: Array[String]): Unit = {
// Create queryApi for the source catalog
val flinkDataClient = new FlinkDataClient()
val queryApi = flinkDataClient.queryApi(CatalogHrn)
// Specify Kafka consumer group name
val consumerSettings = NotificationConsumerSettings(ConsumerGroupName)
// Create the context in which a streaming program is executed
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env
// source function that emits catalog-level notifications
.addSource(queryApi.subscribeToNotifications(consumerSettings))
// map BatchPublicationNotification to a human-readable string to be displayed in the console
.map { notification =>
s"===> BatchPublicationNotification: New catalog version ${notification.version}, date/time: ${new Date(notification.timestamp)}"
}
// Write to standard output
.print()
// Trigger the program execution
env.execute("Catalog-level notification listener")
}
}
アプリケーションは 、 Flink データ クライアント ライブラリsubscribeToNotifications
メソッドを使用して CATALOG_HRN
、変数によって定義されたカタログにサブスクライブします。
コンシューマグループ名は Kafka Consumer Group 名です。 catalog-level-notification
このチュートリアルでは、に設定されています。
アプリケーションをローカルで実行します
カタログレベルの通知コンシューマアプリケーションを実行するには、次のコマンドを実行します。
mvn compile exec:java -D"exec.mainClass"="CatalogLevelNotification"
mvn compile exec:java -D"exec.mainClass"="CatalogLevelNotificationScala"
次のようなコンソール出力が表示されたら、
[...] Subscribe to notifications with group name: catalog-level-notifications-consumer-group, offset: LatestOffset and consumer id: c458fd12-c525-4c58-ac71-11a149c8234c
通知にサブスクライブされました。
ソースカタログに新しいデータが追加されると、コンソールに通知が表示されます。 現在のコンソールは通知をリッスンしているため、別のコンソールインスタンスを開いてデータのアップロードを続行することができます。
このチュートリアルでは 、 src/main/resources/data フォルダーにサンプルデータが含まれています。 このデータセットは、 2021 年 8 月のベルリンとミュンヘンの天気予報を表しています。
パーティション 377894444
のベルリン市内中心を表すズーム レベル14
にデータをアップロードしてみ ましょう。 パーティションタイル ID およびズーム レベルの計算について詳しく は、「パーティションタイル ID の計算」チュートリアルを参照してください。
データをカタログにアップロードするには 、 OLP CLI を使用します。
olp catalog layer partition put {{SOURCE_CATALOG_HRN}} notification-source-versioned-layer --partitions 377894444:src/main/resources/data/august-berlin.csv
コマンドはカタログにデータをアップロードし、カタログの新しいバージョンが作成されます。 結果の出力には、カタログバージョンと変更日時の通知が含まれます。
===> BatchPublicationNotification: New catalog version 0, date/time: Mon Nov 15 14:46:10 EET 2021
別の変更通知を取得するには、より多くのデータをアップロードしてください。
このアプリケーションは一般的なストリーミングプログラムであるため、無期限に実行されます。 Ctrl + C
コンソールでを押して、プログラムを停止します。
カタログに別のversioned
レイヤーがある場合 は、そのレイヤーの変更に関する通知が表示されます。
レイヤーレベルの通知
レイヤーレベルの通知を使用すると、特定のレイヤーで変更が発生したときに通知を受け取ることができます。
注
現在、レイヤーレベルの通知は 、HERE Tile
パーティション分割されたversioned
レイヤーでのみ利用できます。
通知は基本的にデータストリームです。 通知は、ストリーム レイヤーへのメッセージとして書き込まれます。 レイヤーにサブスクライブし、 HERE platform の任意のストリーム レイヤーからデータを消費するのと同じようにメッセージを消費することで、通知を受信します。
特定のパーティションまたは特定の地理的領域が更新されたときに通知を受信するには、次の手順を実行します。
stream
OLP CLI で通知が公開されるレイヤーを使用して、通知カタログを作成します。- データ クライアント ライブラリを使用して通知リスナーアプリケーションを実装します。 このアプリケーションは、
notification-stream-layer
前の章で作成した通知カタログのレイヤーにパブリッシュされたデータのコンシューマにすぎません。 - OLP CLI を使用してソースカタログのレイヤーにサブスクライブします。
- OLP CLI でサブスクライブしているレイヤーに新しいデータをアップロードします。実際の状況では、他のユーザーがデータを更新しますが、このチュートリアルでは、ユーザー側でデータを更新します。
通知カタログを作成します
このチュートリアルでは、次の 2 つのカタログが必要です。
- 前 のチャプタで作成されたソースカタログ
- 変更通知を受け取るストリーム レイヤーを含む通知カタログ
今回 stream
は versioned
、カタログレベルの通知ではなく、ソースカタログのレイヤーの変更に関する通知を受け取るレイヤーが存在する必要があります。
以下の設定を使用して、通知カタログを作成します。 ソースカタログが変更されると、通知メッセージが stream
通知カタログのレイヤーに送信されます。
カタログ設定には 、generic
partitioningScheme
のサンプルstream
レイヤーが含ま れています。
簡素化のために、 "contentType": "text/plain"
を使用します。
完全なカタログ設定 :
{
"id": "notifications-sink",
"name": "Catalog Notifications Stream Tutorial",
"summary": "Catalog for Notifications Stream",
"description": "The catalog containing the notification stream",
"layers": [
{
"id": "notification-stream-layer",
"name": "Notification Stream",
"summary": "Stream for notification",
"description": "Stream for notification",
"layerType": "stream",
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "generic"
},
"contentType": "text/plain"
}
]
}
{{YOUR_USERNAME}}
以下をユーザー名に置き換えて、次のコマンドを実行します。
olp catalog create {{YOUR_USERNAME}}-notification-catalog "{{YOUR_USERNAME}} notification catalog" --config notification-catalog.json
CLI で次のメッセージが表示されます。
Catalog {{NOTIFICATION_CATALOG_HRN}} has been created.
注
レルムで請求タグが必要な場合 は、layer
セクションにbillingTags: ["YOUR_BILLING_TAG"]
プロパティを追加して設定 ファイルを更新します。
通知コンシューマアプリケーションを実装します
このチュートリアルでは、ソースバージョンのカタログに、異なるパーティションに保存されている温度と湿度の平均値の 2 つのデータセットが含まれています。 ベルリンとミュンヘン。 データセットの構造は 、カタログレベルの通知コンシューマアプリケーションと同じです。
新しい メッセージが stream
レイヤーに公開されたときに通知を受け取る簡単な Java/Scala アプリケーションを作成してみましょう。 変更されたパーティションに関する情報を含む通知メッセージが印刷されます。
{{NOTIFICATION_CATALOG_HRN}}
前 の章で作成したソースカタログ HERE リソースネーム にプレースホルダーを変更します。
private static final HRN NOTIFICATION_CATALOG_HRN = HRN.fromString("{{NOTIFICATION_CATALOG_HRN}}");
private val NotificationCatalogHrn = HRN("{{NOTIFICATION_CATALOG_HRN}}")
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.here.hrn.HRN;
import com.here.platform.data.client.flink.javadsl.FlinkDataClient;
import com.here.platform.data.client.flink.javadsl.FlinkQueryApi;
import com.here.platform.data.client.flink.javadsl.FlinkReadEngine;
import com.here.platform.data.client.javadsl.Partition;
import com.here.platform.data.client.settings.ConsumerSettings;
import com.here.platform.data.client.settings.ConsumerSettings.Builder;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class LayerLevelNotification {
private static final HRN NOTIFICATION_CATALOG_HRN =
HRN.fromString("{{NOTIFICATION_CATALOG_HRN}}");
private static final String NOTIFICATION_STREAM_LAYER_ID = "notification-stream-layer";
public static final String CONSUMER_GROUP_NAME = "layer-level-notification";
public static void main(String[] args) throws Exception {
// Create queryApi for the source catalog
final FlinkDataClient flinkDataClient = new FlinkDataClient();
FlinkQueryApi queryApi = flinkDataClient.queryApi(NOTIFICATION_CATALOG_HRN);
// Specify Kafka consumer group name
ConsumerSettings consumerSettings =
new Builder().withLatestOffset().withGroupName(CONSUMER_GROUP_NAME).build();
// Create the context in which a streaming program is executed
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
// source function that emits catalog-level notifications
.addSource(queryApi.subscribe(NOTIFICATION_STREAM_LAYER_ID, consumerSettings))
// read partition bytes and map to human-readable string
.map(new SampleMapper())
// Write to standard output
.print();
// Trigger the program execution
env.execute("Layer-level notification listener");
}
private static class SampleMapper extends RichMapFunction<Partition, String> {
private transient FlinkDataClient flinkDataClient;
private transient FlinkReadEngine flinkReadEngine;
@Override
public void open(Configuration parameters) {
// Create read engine to read the partition bytes
flinkDataClient = new FlinkDataClient();
flinkReadEngine = flinkDataClient.readEngine(NOTIFICATION_CATALOG_HRN);
}
@Override
public String map(Partition partition) {
byte[] dataAsBytes = flinkReadEngine.getDataAsBytes(partition);
return "===> Received " + new String(dataAsBytes);
}
@Override
public void close() {
// FlinkDataClient is a heavyweight object that needs to be created once, reused and
// terminated.
flinkDataClient.terminate();
}
}
}
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.here.hrn.HRN
import com.here.platform.data.client.flink.scaladsl.{FlinkDataClient, FlinkReadEngine}
import com.here.platform.data.client.scaladsl.Partition
import com.here.platform.data.client.settings.{ConsumerSettings, LatestOffset}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object LayerLevelNotificationScala {
private val NotificationCatalogHrn = HRN("{{NOTIFICATION_CATALOG_HRN}}")
private val NotificationStreamLayerId = "notification-stream-layer"
val ConsumerGroupName = "layer-level-notification"
def main(args: Array[String]): Unit = {
// Create queryApi for the source catalog
val flinkDataClient = new FlinkDataClient()
val queryApi = flinkDataClient.queryApi(NotificationCatalogHrn)
// Specify Kafka consumer group name
val consumerSettings = ConsumerSettings(ConsumerGroupName, LatestOffset)
// Create the context in which a streaming program is executed
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env
// source function that emits catalog-level notifications
.addSource(queryApi.subscribe(NotificationStreamLayerId, consumerSettings))
// read partition bytes and map to human-readable string
.map(new SampleMapper(NotificationCatalogHrn))
// Write to standard output
.print()
// Trigger the program execution
env.execute("Layer-level notification listener")
}
class SampleMapper(hrn: HRN) extends RichMapFunction[Partition, String] with Serializable {
@transient
private lazy val flinkDataClient: FlinkDataClient = new FlinkDataClient()
@transient
private lazy val flinkReadEngine: FlinkReadEngine =
// Create read engine to read the partition bytes
flinkDataClient.readEngine(hrn)
override def map(partition: Partition): String =
s"===> Received ${new String(flinkReadEngine.getDataAsBytes(partition))}"
override def close(): Unit =
// FlinkDataClient is a heavyweight object that needs to be created once, reused and terminated.
flinkDataClient.terminate()
}
}
アプリケーションは 、 Flink データ クライアント ライブラリsubscribe
メソッドを使用して 、NOTIFICATION_CATALOG_HRN
変数によって定義されたカタログのnotification-stream-layer
レイヤーにサブスクライブします。
コンシューマグループ名は Kafka Consumer Group 名です。 layer-level-notification
この時間に設定されています。
アプリケーションをローカルで実行します
レイヤーレベルの通知コンシューマアプリケーションを実行するには、次のコマンドを実行します。
mvn compile exec:java -D"exec.mainClass"="LayerLevelNotification"
mvn compile exec:java -D"exec.mainClass"="LayerLevelNotificationScala"
次のようなコンソール出力が表示されたら、
ConsumerCoordinator:799 - [Consumer clientId=[...], groupId=[...].layer-level-notification] Setting offset for partition [...]
通知にサブスクライブされました。
ソースカタログに新しいデータがあると notification-source-versioned-layer
、コンソールに通知が表示されます。 現在のコンソールは通知をリッスンしているため、別のコンソールインスタンスを開いてデータのアップロードを続行することができます。
OLP CLI を使用して通知にサブスクライブします
通知を受け取る変更を選択できます。
- レイヤー内のすべての変更
- 地理的領域の変更
- 特定のパーティションへの変更
1 つのカタログに複数のサブスクリプションを作成できますが、サブスクリプションは常に 1 つのカタログに関連します。 カタログ内のいくつかのバージョン管理されたレイヤー、またはそのすべてのレイヤーにサブスクライブできます。 サブスクリプション領域と通知タイプを、サブスクライブしているすべてのレイヤーで同じにするか、または異なるように設定することもできます。
サブスクリプションを作成するときに、通知を受け取る変更を指定できます。
OLP CLI を使用して、レイヤー内のすべての変更をサブスクライブします
レイヤー内のすべての変更をサブスクライブするには olp catalog layer notification create [...]
、次のコマンドを使用します。
olp catalog layer notification create {{NOTIFICATION_CATALOG_HRN}} notification-stream-layer \
--source-catalog {{SOURCE_CATALOG_HRN}} --source-layers notification-source-versioned-layer
CLI で次のメッセージが表示されます。
Subscription {{ALL_CHANGES_SUBSCRIPTION_ID}} has been created.
パーティション 377894444
のベルリン市内中心を表すズーム レベルにデータをアップロードしてみ 14
ましょう。 パーティションタイル ID およびズーム レベルの計算について詳しく は、「パーティションタイル ID の計算」チュートリアルを参照してください。
8 月の天気予報の概要データを含む最初のエントリを作成して、天気予報の概要データセットを更新しましょう。 データをカタログにアップロードするには 、 OLP CLI を使用します。
olp catalog layer partition put {{SOURCE_CATALOG_HRN}} notification-source-versioned-layer --partitions 377894444:src/main/resources/data/august-berlin.csv
パーティションがアップロードされると、コンソールに結果が表示されます。
===> Received {"catalogHRN":"{{SOURCE_CATALOG_HRN}}","catalogVersion":0,"layerId":"notification-source-versioned-layer","partitions":[{"version":0,"partition":"377894444","layer":"notification-source-versioned-layer","dataHandle":"d974c61f-45f1-42fa-9ccd-165a4decd236","deleted":false}],"timestamp":1637532678413,"subscriptionResultType":"FULL"}
ミュンヘンの 8 月の天気予報の概要データを使用して、ミュンヘンの天気予報の概要データセットを更新し、 通知がまだ送信されていることを確認しましょう。
olp catalog layer partition put {{SOURCE_CATALOG_HRN}} notification-source-versioned-layer --partitions 377782524:src/main/resources/data/august-munich.csv
パーティションがアップロードされると、コンソールに結果が表示されます。
===> Received {"catalogHRN":"{{SOURCE_CATALOG_HRN}}","catalogVersion":1,"layerId":"notification-source-versioned-layer","partitions":[{"version":1,"partition":"377782524","layer":"notification-source-versioned-layer","dataHandle":"cf101de8-20f8-4dc0-a9a9-b754af6ad3ed","deleted":false}],"timestamp":1637532827487,"subscriptionResultType":"FULL"}
ベルリン、ミュンヘン、その他の国々など、アップロードされたパーティションの通知が受信されます。
続行する前にサブスクリプションをキャンセルしてみましょう。 サブスクリプションをキャンセルするには olp catalog layer notification delete ALL_CHANGES_SUBSCRIPTION_ID
、次のコマンドを使用します。
olp catalog layer notification delete {{NOTIFICATION_CATALOG_HRN}} notification-stream-layer --subscription-ids {{ALL_CHANGES_SUBSCRIPTION_ID}}
CLI で次のメッセージが表示されます。
Subscription {{ALL_CHANGES_SUBSCRIPTION_ID}} has been deleted.
OLP CLI を使用して、地理的領域の変更をサブスクライブします
地理的領域の変更をサブスクライブするには olp catalog layer notification create [...] --subscription-area wkt:WKT
、次のコマンドを使用します。
この場合 WKT
、つまり、よく知られているジオメトリのテキスト表現は POINT(13.409419 52.520817)
、ベルリンのテレビ塔の座標と同じです。
olp catalog layer notification create {{NOTIFICATION_CATALOG_HRN}} notification-stream-layer \
--source-catalog {{SOURCE_CATALOG_HRN}} --source-layers notification-source-versioned-layer \
--subscription-area wkt:"POINT(13.409419 52.520817)"
CLI で次のメッセージが表示されます。
Subscription {{BERLIN_TELEVISION_TOWER_CHANGES_SUBSCRIPTION_ID}} has been created.
ベルリンの 8 月の天気予報の概要データを使用して、ベルリンの天気予報の概要データセットを更新し、 通知が送信されたことを確認しましょう。
olp catalog layer partition put {{SOURCE_CATALOG_HRN}} notification-source-versioned-layer --partitions 377894444:src/main/resources/data/august-berlin.csv
ベルリンのテレビ塔は 377894444
、同じパーティション on ズーム レベルにあり 14
ます。そのため、パーティションをアップロードすると、コンソールに結果が表示されます。
===> Received {"catalogHRN":"{{SOURCE_CATALOG_HRN}}","catalogVersion":2,"layerId":"notification-source-versioned-layer","partitions":[{"version":2,"partition":"377894444","layer":"notification-source-versioned-layer","dataHandle":"d974c61f-45f1-42fa-9ccd-165a4decd236","deleted":false}],"timestamp":1637593998401,"subscriptionResultType":"FULL"}
ミュンヘンの 8 月の天気予報の概要データを使用して、ミュンヘンの天気予報の概要データセットを更新して、 今回の通知が送信されていないことを確認してみましょう。
olp catalog layer partition put {{SOURCE_CATALOG_HRN}} notification-source-versioned-layer --partitions 377782524:src/main/resources/data/august-munich.csv
パーティションがアップロードされると 、コンソールに新しいエントリが表示されなくなります。
続行する前にサブスクリプションをキャンセルしてみましょう。 サブスクリプションをキャンセルするには olp catalog layer notification delete
、次のコマンドを使用します。
olp catalog layer notification delete {{NOTIFICATION_CATALOG_HRN}} notification-stream-layer --subscription-ids {{BERLIN_TELEVISION_TOWER_CHANGES_SUBSCRIPTION_ID}}
CLI で次のメッセージが表示されます。
Subscription {{BERLIN_TELEVISION_TOWER_CHANGES_SUBSCRIPTION_ID}} has been deleted.
OLP CLI を使用して、特定のパーティションの変更をサブスクライブします
特定のパーティションの通知にサブスクライブすると、すべての子パーティション(タイル)を含む、指定したパーティションへの変更によって通知がトリガーされます。 地理的領域の変更をサブスクライブするには olp catalog layer notification create [...] --subscription-area partitions:PARTITIONS
、次のコマンドを使用します。
次のことを試してください。
377894444
ベルリンでパーティションのサブスクリプションを作成します- ベルリンとミュンヘンにデータをアップロードします
その結果、通知は 1 つだけ受信する必要があります。
結論
このチュートリアルでは、 OLP CLI を使用して、カタログおよびレイヤーレベルの変更に対するサブスクリプションを作成および管理する方法を学習しました。また、 Data Client を使用してサブスクリプション通知を使用する方法についても学習しました。
詳細情報
詳細については、次の追加リソースを参照してください。
- データ クライアント ライブラリでのデータの書き込みおよび読み取り方法 : データ クライアント ライブラリ開発者ガイド 。
- Data API の使用方法 : Data API 開発者ガイド 。
- カタログレベルの通知の設定方法 : Data API カタログレベル通知ガイド
- レイヤーレベルの通知の設定方法 : Data API レイヤーレベル通知ガイド
- OLP CLI を使用して サブスクリプションを管理する方法 : OLP CLI 通知コマンド。