カタログおよびレイヤーレベルの変更にサブスクライブします

目的: カタログおよびレイヤーレベルの変更に対するサブスクリプションを作成および管理します。

複雑さ: 中級者です

前提条件: 資格情報を取得し、資格情報を確認します

所要時間: 60 分

ソースコード: ダウンロード

通知によって、カタログ、レイヤー、またはパーティションが更新されたことを通知し、どのデータが変更されたかについての情報を提供します。 通知機能は versioned 、マップ データに依存するプラットフォームまたはプラットフォーム外のサービスでストリームアプリケーションを実行する場合に役立ちます。 この機能を使用すると、カタログが更新されたときにアプリケーションが自動的に通知を受け取るため、変更を継続的にポーリングする必要がなくなります。 通知を受信すると、アプリケーションは対応するデータのみを要求する必要があります。

特定の要件に基づいて、通知を受け取るカタログの一部を選択できます。

このチュートリアルで は、 OLP CLI を使用してカタログおよびレイヤーレベルの変更に対するサブスクリプションを作成および管理する方法について説明します。

サブスクリプションが作成されると、このチュートリアルで は、データ クライアント ライブラリを使用してコンシューマアプリケーションを実装する方法について説明します。

ただし、ユースケースがマップ データを処理して、情報を変換または取得している可能性があります。 このような処理は、通常、 1 つ以上のバージョン化されたパーティション化されたカタログを消費し、代わりに別のバージョン化されたパーティション化された 1 つの入力カタログが変更された場合は、通常、プロセスを実行して出力を更新する必要があります。

この場合、このチュートリアルで説明する通知の仕組みを使用する必要はありません。 代わり に、データ プロセッシング ライブラリおよびバッチ処理パイプラインを使用することをお勧めします。 これらは、入力カタログの変更時に自動的に実行されるように設計されています。 はじめにには 、データ プロセッシング ライブラリ チュートリアルを使用してカタログをコピーを使用します。

このチュートリアルを実行 するには、 CLI がインストールされプラットフォームとリポジトリ資格情報の両方がローカルマシンに設定されている必要があります。

カタログレベルの通知

このチュートリアルでは、カタログレベルの変更をサブスクライブして通知を使用する方法について説明します。

カタログで通知が有効化されている場合、ストリームはカタログと同じ HERE リソースネーム ( HERE リソースネーム )を使用して HERE platform によって自動的に作成されます。 カタログデータが更新されるたびに、カタログの新しいバージョンが作成され、新しいレコードがストリームに書き込まれます。

通知レコードには、変更のタイムスタンプと新しいカタログバージョン番号が含まれています。 バージョン番号は、更新されたデータの要求に使用できるため、特に便利です。

新しいカタログバージョンのパブリケーションについての通知を受け取るには、次の手順を実行します。

  • notifications OLP CLI で有効化されたソースカタログを作成します。
  • データ クライアント ライブラリを使用して通知リスナーアプリケーションを実装します。
  • OLP CLI を使用して新しいデータをカタログにアップロードし、通知をトリガーします。 実際の状況では、他のユーザーがデータを更新しますが、このチュートリアルでは、ユーザー側でデータを更新します。
カタログレベルの通知
図 1. カタログレベルの通知

通知を有効にしてカタログを作成します

以下の設定で通知を使用して変更を監視できるカタログを作成します。 カタログ内のデータが変更されると、通知がサブスクライバに送信されます。

カタログ設定に は、heretilepartitioningSchemetileLevel14ともに使用するサンプルversionedレイヤーが含ま れています。

簡素化のために、 "contentType": "text/plain" を使用します。

カタログ notifications でが有効になっている必要があります。 新しいカタログの通知を有効にするには、カタログの作成に使用するカタログ設定ファイルに次のプロパティがあることを確認してください。

  "notifications": {
    "enabled": true
  }

完全なカタログ設定 :

Source-catalog.json
{
  "id": "notifications-source",
  "name": "Catalog Notifications Source Tutorial",
  "summary": "Catalog Notifications Source",
  "description": "When changed, the notifications will be sent to subscribers",
  "notifications": {
    "enabled": true
  },
  "layers": [
    {
      "id": "notification-source-versioned-layer",
      "name": "Notifications Source Tutorial layer",
      "summary": "Messages for the notifications tutorial.",
      "description": "Messages for the notifications tutorial.",
      "layerType": "versioned",
      "volume": {
        "volumeType": "durable"
      },
      "partitioning": {
        "scheme": "heretile",
        "tileLevels": [
          14
        ]
      },
      "contentType": "text/plain"
    }
  ]
}

{{YOUR_USERNAME}} 以下をユーザー名で置き換え、上記 json のファイルを使用してコマンドを実行します。

olp catalog create {{YOUR_USERNAME}}-source-catalog "{{YOUR_USERNAME}} source catalog" --config source-catalog.json

CLI で次のメッセージが表示されます。

Catalog {{SOURCE_CATALOG_HRN}} has been created.

レルムで請求タグが必要な場合 は、layerセクションにbillingTag: ["YOUR_BILLING_TAG"]プロパティを追加して設定 ファイルを更新します。

既存のカタログの通知を有効にするに は、olp catalog update [...] --notifications trueコマンドを使用します。

Maven プロジェクトを設定します

ソースコードはチュートリアルの冒頭でダウンロードし、任意のフォルダーに配置するか、またはフォルダー構造を最初から作成できます。

catalog-notification
└── src
    └── main
        ├── java
        └── resources
        └── scala

この操作は、次の bash 1 つのコマンドで実行できます。

mkdir -p catalog-notification/src/main/{java,resources,scala}

Maven POM ファイルは 'Maven 設定の確認 チュートリアルのファイルに似ています

親 POM および依存関係セクションは次のように更新されます。

親 POM :

<parent>
    <groupId>com.here.platform</groupId>
    <artifactId>sdk-stream-bom_2.12</artifactId>
    <version>2.54.3</version>
    <relativePath/>
</parent>

POM は sdk-stream-bom_2.12 親 POM として使用します。 Flink ライブラリのバージョンを 環境 POM から取得できます

依存関係 :

<dependencies>
    <!-- HERE dependencies -->
    <dependency>
        <groupId>com.here.platform.data.client</groupId>
        <artifactId>flink-support_${scala.compat.version}</artifactId>
    </dependency>

    <!-- External dependencies -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.compat.version}</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
    </dependency>
</dependencies>

次の依存関係が使用されます。

通知コンシューマアプリケーションを実装します

このチュートリアルでは、ソースのバージョン管理されたカタログに、特定の領域の温度と湿度の平均値のサンプルデータセットが含まれています。 データの例 :

Temperature,Humidity
18,72

データは毎月更新され、新しいデータがカタログにアップロードされます。

簡単 な Java/Scala アプリケーションを作成して、データセットを含むカタログが更新されたときに通知を受け取るようにします。 カタログのバージョンとタイムスタンプがコンソールに印刷されます。

{{SOURCE_CATALOG_HRN}}の章で作成したソースカタログ HERE リソースネーム にプレースホルダーを変更します。

Java
Scala

private static final HRN CATALOG_HRN = HRN.fromString("{{SOURCE_CATALOG_HRN}}");


private val CatalogHrn = HRN("{{SOURCE_CATALOG_HRN}}")

Java
Scala

/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.hrn.HRN;
import com.here.platform.data.client.flink.javadsl.FlinkDataClient;
import com.here.platform.data.client.flink.javadsl.FlinkQueryApi;
import com.here.platform.data.client.settings.NotificationConsumerSettings;
import java.util.Date;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class CatalogLevelNotification {

  private static final HRN CATALOG_HRN = HRN.fromString("{{SOURCE_CATALOG_HRN}}");
  public static final String CONSUMER_GROUP_NAME = "catalog-level-notification";

  public static void main(String[] args) throws Exception {
    // Create queryApi for the source catalog
    final FlinkDataClient flinkDataClient = new FlinkDataClient();
    FlinkQueryApi queryApi = flinkDataClient.queryApi(CATALOG_HRN);

    // Specify Kafka consumer group name
    NotificationConsumerSettings consumerSettings =
        new NotificationConsumerSettings.Builder().withGroupName(CONSUMER_GROUP_NAME).build();

    // Create the context in which a streaming program is executed
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env
        // source function that emits catalog-level notifications
        .addSource(queryApi.subscribeToNotifications(consumerSettings))
        // map notification to a human-readable string to be displayed in the console
        .map(
            notification ->
                String.format(
                    "===> BatchPublicationNotification: New catalog version %s, date/time: %s",
                    notification.getCatalogVersion(), new Date(notification.timestamp())))
        // Write to standard output
        .print();

    // Trigger the program execution
    env.execute("Catalog-level notification listener");
  }
}



/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.hrn.HRN
import com.here.platform.data.client.flink.scaladsl.FlinkDataClient
import com.here.platform.data.client.settings.NotificationConsumerSettings
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import java.util.Date

object CatalogLevelNotificationScala {
  private val CatalogHrn = HRN("{{SOURCE_CATALOG_HRN}}")
  private val ConsumerGroupName = "catalog-level-notification"

  def main(args: Array[String]): Unit = {
    // Create queryApi for the source catalog
    val flinkDataClient = new FlinkDataClient()
    val queryApi = flinkDataClient.queryApi(CatalogHrn)

    // Specify Kafka consumer group name
    val consumerSettings = NotificationConsumerSettings(ConsumerGroupName)

    // Create the context in which a streaming program is executed
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env
    // source function that emits catalog-level notifications
      .addSource(queryApi.subscribeToNotifications(consumerSettings))
      // map BatchPublicationNotification to a human-readable string to be displayed in the console
      .map { notification =>
        s"===> BatchPublicationNotification: New catalog version ${notification.version}, date/time: ${new Date(notification.timestamp)}"
      }
      // Write to standard output
      .print()

    // Trigger the program execution
    env.execute("Catalog-level notification listener")
  }
}

アプリケーションは 、 Flink データ クライアント ライブラリsubscribeToNotifications メソッドを使用して CATALOG_HRN 、変数によって定義されたカタログにサブスクライブします。

コンシューマグループ名は Kafka Consumer Group 名です。 catalog-level-notification このチュートリアルでは、に設定されています。

アプリケーションをローカルで実行します

カタログレベルの通知コンシューマアプリケーションを実行するには、次のコマンドを実行します。

Java
Scala

mvn compile exec:java -D"exec.mainClass"="CatalogLevelNotification"


mvn compile exec:java -D"exec.mainClass"="CatalogLevelNotificationScala"

次のようなコンソール出力が表示されたら、

[...] Subscribe to notifications with group name: catalog-level-notifications-consumer-group, offset: LatestOffset and consumer id: c458fd12-c525-4c58-ac71-11a149c8234c

通知にサブスクライブされました。

ソースカタログに新しいデータが追加されると、コンソールに通知が表示されます。 現在のコンソールは通知をリッスンしているため、別のコンソールインスタンスを開いてデータのアップロードを続行することができます。

このチュートリアルでは 、 src/main/resources/data フォルダーにサンプルデータが含まれています。 このデータセットは、 2021 年 8 月のベルリンとミュンヘンの天気予報を表しています。

パーティション 377894444 のベルリン市内中心を表すズーム レベル14データをアップロードしてみ ましょう。 パーティションタイル ID およびズーム レベルの計算について詳しく は、「パーティションタイル ID の計算」チュートリアルを参照してください。

データをカタログにアップロードするには 、 OLP CLI を使用します。

olp catalog layer partition put {{SOURCE_CATALOG_HRN}} notification-source-versioned-layer --partitions 377894444:src/main/resources/data/august-berlin.csv

コマンドはカタログにデータをアップロードし、カタログの新しいバージョンが作成されます。 結果の出力には、カタログバージョンと変更日時の通知が含まれます。

===> BatchPublicationNotification: New catalog version 0, date/time: Mon Nov 15 14:46:10 EET 2021

別の変更通知を取得するには、より多くのデータをアップロードしてください。

このアプリケーションは一般的なストリーミングプログラムであるため、無期限に実行されます。 Ctrl + C コンソールでを押して、プログラムを停止します。

カタログに別のversionedレイヤーがある場合 は、そのレイヤーの変更に関する通知が表示されます

レイヤーレベルの通知

レイヤーレベルの通知を使用すると、特定のレイヤーで変更が発生したときに通知を受け取ることができます。

現在、レイヤーレベルの通知は 、HERE Tileパーティション分割されたversionedレイヤーでのみ利用できます。

通知は基本的にデータストリームです。 通知は、ストリーム レイヤーへのメッセージとして書き込まれます。 レイヤーにサブスクライブし、 HERE platform の任意のストリーム レイヤーからデータを消費するのと同じようにメッセージを消費することで、通知を受信します。

特定のパーティションまたは特定の地理的領域が更新されたときに通知を受信するには、次の手順を実行します。

  • stream OLP CLI で通知が公開されるレイヤーを使用して、通知カタログを作成します。
  • データ クライアント ライブラリを使用して通知リスナーアプリケーションを実装します。 このアプリケーションは、 notification-stream-layer前の章で作成した通知カタログのレイヤーにパブリッシュされたデータのコンシューマにすぎません。
  • OLP CLI を使用してソースカタログのレイヤーにサブスクライブします。
  • OLP CLI でサブスクライブしているレイヤーに新しいデータをアップロードします。実際の状況では、他のユーザーがデータを更新しますが、このチュートリアルでは、ユーザー側でデータを更新します。
レイヤーレベルの通知
図 2. レイヤーレベルの通知

通知カタログを作成します

このチュートリアルでは、次の 2 つのカタログが必要です。

今回 streamversioned 、カタログレベルの通知ではなく、ソースカタログのレイヤーの変更に関する通知を受け取るレイヤーが存在する必要があります。

以下の設定を使用して、通知カタログを作成します。 ソースカタログが変更されると、通知メッセージが stream 通知カタログのレイヤーに送信されます。

カタログ設定には 、genericpartitioningSchemeのサンプルstreamレイヤーが含ま れています。

簡素化のために、 "contentType": "text/plain" を使用します。

完全なカタログ設定 :

notification-catalog.json
{
  "id": "notifications-sink",
  "name": "Catalog Notifications Stream Tutorial",
  "summary": "Catalog for Notifications Stream",
  "description": "The catalog containing the notification stream",
  "layers": [
    {
      "id": "notification-stream-layer",
      "name": "Notification Stream",
      "summary": "Stream for notification",
      "description": "Stream for notification",
      "layerType": "stream",
      "volume": {
        "volumeType": "durable"
      },
      "partitioning": {
        "scheme": "generic"
      },
      "contentType": "text/plain"
    }
  ]
}

{{YOUR_USERNAME}} 以下をユーザー名に置き換えて、次のコマンドを実行します。

olp catalog create {{YOUR_USERNAME}}-notification-catalog "{{YOUR_USERNAME}} notification catalog" --config notification-catalog.json

CLI で次のメッセージが表示されます。

Catalog {{NOTIFICATION_CATALOG_HRN}} has been created.

レルムで請求タグが必要な場合 は、layerセクションにbillingTags: ["YOUR_BILLING_TAG"]プロパティを追加して設定 ファイルを更新します。

通知コンシューマアプリケーションを実装します

このチュートリアルでは、ソースバージョンのカタログに、異なるパーティションに保存されている温度と湿度の平均値の 2 つのデータセットが含まれています。 ベルリンとミュンヘン。 データセットの構造は 、カタログレベルの通知コンシューマアプリケーションと同じです。

新しい メッセージが stream レイヤーに公開されたときに通知を受け取る簡単な Java/Scala アプリケーションを作成してみましょう。 変更されたパーティションに関する情報を含む通知メッセージが印刷されます。

{{NOTIFICATION_CATALOG_HRN}}の章で作成したソースカタログ HERE リソースネーム にプレースホルダーを変更します。

Java
Scala

private static final HRN NOTIFICATION_CATALOG_HRN = HRN.fromString("{{NOTIFICATION_CATALOG_HRN}}");


private val NotificationCatalogHrn = HRN("{{NOTIFICATION_CATALOG_HRN}}")

Java
Scala

/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.hrn.HRN;
import com.here.platform.data.client.flink.javadsl.FlinkDataClient;
import com.here.platform.data.client.flink.javadsl.FlinkQueryApi;
import com.here.platform.data.client.flink.javadsl.FlinkReadEngine;
import com.here.platform.data.client.javadsl.Partition;
import com.here.platform.data.client.settings.ConsumerSettings;
import com.here.platform.data.client.settings.ConsumerSettings.Builder;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class LayerLevelNotification {
  private static final HRN NOTIFICATION_CATALOG_HRN =
      HRN.fromString("{{NOTIFICATION_CATALOG_HRN}}");
  private static final String NOTIFICATION_STREAM_LAYER_ID = "notification-stream-layer";

  public static final String CONSUMER_GROUP_NAME = "layer-level-notification";

  public static void main(String[] args) throws Exception {
    // Create queryApi for the source catalog
    final FlinkDataClient flinkDataClient = new FlinkDataClient();
    FlinkQueryApi queryApi = flinkDataClient.queryApi(NOTIFICATION_CATALOG_HRN);

    // Specify Kafka consumer group name
    ConsumerSettings consumerSettings =
        new Builder().withLatestOffset().withGroupName(CONSUMER_GROUP_NAME).build();

    // Create the context in which a streaming program is executed
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env
        // source function that emits catalog-level notifications
        .addSource(queryApi.subscribe(NOTIFICATION_STREAM_LAYER_ID, consumerSettings))
        // read partition bytes and map to human-readable string
        .map(new SampleMapper())
        // Write to standard output
        .print();

    // Trigger the program execution
    env.execute("Layer-level notification listener");
  }

  private static class SampleMapper extends RichMapFunction<Partition, String> {
    private transient FlinkDataClient flinkDataClient;
    private transient FlinkReadEngine flinkReadEngine;

    @Override
    public void open(Configuration parameters) {
      // Create read engine to read the partition bytes
      flinkDataClient = new FlinkDataClient();
      flinkReadEngine = flinkDataClient.readEngine(NOTIFICATION_CATALOG_HRN);
    }

    @Override
    public String map(Partition partition) {
      byte[] dataAsBytes = flinkReadEngine.getDataAsBytes(partition);
      return "===> Received " + new String(dataAsBytes);
    }

    @Override
    public void close() {
      // FlinkDataClient is a heavyweight object that needs to be created once, reused and
      // terminated.
      flinkDataClient.terminate();
    }
  }
}



/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.hrn.HRN
import com.here.platform.data.client.flink.scaladsl.{FlinkDataClient, FlinkReadEngine}
import com.here.platform.data.client.scaladsl.Partition
import com.here.platform.data.client.settings.{ConsumerSettings, LatestOffset}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object LayerLevelNotificationScala {
  private val NotificationCatalogHrn = HRN("{{NOTIFICATION_CATALOG_HRN}}")
  private val NotificationStreamLayerId = "notification-stream-layer"

  val ConsumerGroupName = "layer-level-notification"

  def main(args: Array[String]): Unit = {
    // Create queryApi for the source catalog
    val flinkDataClient = new FlinkDataClient()
    val queryApi = flinkDataClient.queryApi(NotificationCatalogHrn)

    // Specify Kafka consumer group name
    val consumerSettings = ConsumerSettings(ConsumerGroupName, LatestOffset)

    // Create the context in which a streaming program is executed
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env
    // source function that emits catalog-level notifications
      .addSource(queryApi.subscribe(NotificationStreamLayerId, consumerSettings))
      // read partition bytes and map to human-readable string
      .map(new SampleMapper(NotificationCatalogHrn))
      // Write to standard output
      .print()

    // Trigger the program execution
    env.execute("Layer-level notification listener")
  }

  class SampleMapper(hrn: HRN) extends RichMapFunction[Partition, String] with Serializable {
    @transient
    private lazy val flinkDataClient: FlinkDataClient = new FlinkDataClient()

    @transient
    private lazy val flinkReadEngine: FlinkReadEngine =
      // Create read engine to read the partition bytes
      flinkDataClient.readEngine(hrn)

    override def map(partition: Partition): String =
      s"===> Received ${new String(flinkReadEngine.getDataAsBytes(partition))}"

    override def close(): Unit =
      // FlinkDataClient is a heavyweight object that needs to be created once, reused and terminated.
      flinkDataClient.terminate()
  }

}

アプリケーションは 、 Flink データ クライアント ライブラリsubscribe メソッドを使用して 、NOTIFICATION_CATALOG_HRN変数によって定義されたカタログのnotification-stream-layerレイヤーにサブスクライブします。

コンシューマグループ名は Kafka Consumer Group 名です。 layer-level-notification この時間に設定されています。

アプリケーションをローカルで実行します

レイヤーレベルの通知コンシューマアプリケーションを実行するには、次のコマンドを実行します。

Java
Scala

mvn compile exec:java -D"exec.mainClass"="LayerLevelNotification"


mvn compile exec:java -D"exec.mainClass"="LayerLevelNotificationScala"

次のようなコンソール出力が表示されたら、

ConsumerCoordinator:799 - [Consumer clientId=[...], groupId=[...].layer-level-notification] Setting offset for partition [...]

通知にサブスクライブされました。

ソースカタログに新しいデータがあると notification-source-versioned-layer、コンソールに通知が表示されます。 現在のコンソールは通知をリッスンしているため、別のコンソールインスタンスを開いてデータのアップロードを続行することができます。

OLP CLI を使用して通知にサブスクライブします

通知を受け取る変更を選択できます。

  • レイヤー内のすべての変更
  • 地理的領域の変更
  • 特定のパーティションへの変更

1 つのカタログに複数のサブスクリプションを作成できますが、サブスクリプションは常に 1 つのカタログに関連します。 カタログ内のいくつかのバージョン管理されたレイヤー、またはそのすべてのレイヤーにサブスクライブできます。 サブスクリプション領域と通知タイプを、サブスクライブしているすべてのレイヤーで同じにするか、または異なるように設定することもできます。

サブスクリプションを作成するときに、通知を受け取る変更を指定できます。

OLP CLI を使用して、レイヤー内のすべての変更をサブスクライブします

レイヤー内のすべての変更をサブスクライブするには olp catalog layer notification create [...] 、次のコマンドを使用します。

olp catalog layer notification create {{NOTIFICATION_CATALOG_HRN}} notification-stream-layer \
  --source-catalog {{SOURCE_CATALOG_HRN}} --source-layers notification-source-versioned-layer

CLI で次のメッセージが表示されます。

Subscription {{ALL_CHANGES_SUBSCRIPTION_ID}} has been created.

パーティション 377894444 のベルリン市内中心を表すズーム レベルにデータをアップロードしてみ 14ましょう。 パーティションタイル ID およびズーム レベルの計算について詳しく は、「パーティションタイル ID の計算」チュートリアルを参照してください。

8 月の天気予報の概要データを含む最初のエントリを作成して、天気予報の概要データセットを更新しましょう。 データをカタログにアップロードするには 、 OLP CLI を使用します。

olp catalog layer partition put {{SOURCE_CATALOG_HRN}} notification-source-versioned-layer --partitions 377894444:src/main/resources/data/august-berlin.csv

パーティションがアップロードされると、コンソールに結果が表示されます。

===> Received {"catalogHRN":"{{SOURCE_CATALOG_HRN}}","catalogVersion":0,"layerId":"notification-source-versioned-layer","partitions":[{"version":0,"partition":"377894444","layer":"notification-source-versioned-layer","dataHandle":"d974c61f-45f1-42fa-9ccd-165a4decd236","deleted":false}],"timestamp":1637532678413,"subscriptionResultType":"FULL"}

ミュンヘンの 8 月の天気予報の概要データを使用して、ミュンヘンの天気予報の概要データセットを更新し、 通知がまだ送信されていることを確認しましょう。

olp catalog layer partition put {{SOURCE_CATALOG_HRN}} notification-source-versioned-layer --partitions 377782524:src/main/resources/data/august-munich.csv

パーティションがアップロードされると、コンソールに結果が表示されます。

===> Received {"catalogHRN":"{{SOURCE_CATALOG_HRN}}","catalogVersion":1,"layerId":"notification-source-versioned-layer","partitions":[{"version":1,"partition":"377782524","layer":"notification-source-versioned-layer","dataHandle":"cf101de8-20f8-4dc0-a9a9-b754af6ad3ed","deleted":false}],"timestamp":1637532827487,"subscriptionResultType":"FULL"}

ベルリン、ミュンヘン、その他の国々など、アップロードされたパーティションの通知が受信されます。

続行する前にサブスクリプションをキャンセルしてみましょう。 サブスクリプションをキャンセルするには olp catalog layer notification delete ALL_CHANGES_SUBSCRIPTION_ID 、次のコマンドを使用します。

olp catalog layer notification delete {{NOTIFICATION_CATALOG_HRN}} notification-stream-layer --subscription-ids {{ALL_CHANGES_SUBSCRIPTION_ID}}

CLI で次のメッセージが表示されます。

Subscription {{ALL_CHANGES_SUBSCRIPTION_ID}} has been deleted.

OLP CLI を使用して、地理的領域の変更をサブスクライブします

地理的領域の変更をサブスクライブするには olp catalog layer notification create [...] --subscription-area wkt:WKT 、次のコマンドを使用します。

この場合 WKT、つまり、よく知られているジオメトリのテキスト表現は POINT(13.409419 52.520817)、ベルリンのテレビ塔の座標と同じです。

olp catalog layer notification create {{NOTIFICATION_CATALOG_HRN}} notification-stream-layer \
  --source-catalog {{SOURCE_CATALOG_HRN}} --source-layers notification-source-versioned-layer \
  --subscription-area wkt:"POINT(13.409419 52.520817)"

CLI で次のメッセージが表示されます。

Subscription {{BERLIN_TELEVISION_TOWER_CHANGES_SUBSCRIPTION_ID}} has been created.

ベルリンの 8 月の天気予報の概要データを使用して、ベルリンの天気予報の概要データセットを更新し、 通知が送信されたことを確認しましょう。

olp catalog layer partition put {{SOURCE_CATALOG_HRN}} notification-source-versioned-layer --partitions 377894444:src/main/resources/data/august-berlin.csv

ベルリンのテレビ塔は 377894444 、同じパーティション on ズーム レベルにあり 14ます。そのため、パーティションをアップロードすると、コンソールに結果が表示されます。

===> Received {"catalogHRN":"{{SOURCE_CATALOG_HRN}}","catalogVersion":2,"layerId":"notification-source-versioned-layer","partitions":[{"version":2,"partition":"377894444","layer":"notification-source-versioned-layer","dataHandle":"d974c61f-45f1-42fa-9ccd-165a4decd236","deleted":false}],"timestamp":1637593998401,"subscriptionResultType":"FULL"}

ミュンヘンの 8 月の天気予報の概要データを使用して、ミュンヘンの天気予報の概要データセットを更新して、 今回の通知が送信されていないことを確認してみましょう。

olp catalog layer partition put {{SOURCE_CATALOG_HRN}} notification-source-versioned-layer --partitions 377782524:src/main/resources/data/august-munich.csv

パーティションがアップロードされると 、コンソールに新しいエントリが表示されなくなります

続行する前にサブスクリプションをキャンセルしてみましょう。 サブスクリプションをキャンセルするには olp catalog layer notification delete 、次のコマンドを使用します。

olp catalog layer notification delete {{NOTIFICATION_CATALOG_HRN}} notification-stream-layer --subscription-ids {{BERLIN_TELEVISION_TOWER_CHANGES_SUBSCRIPTION_ID}}

CLI で次のメッセージが表示されます。

Subscription {{BERLIN_TELEVISION_TOWER_CHANGES_SUBSCRIPTION_ID}} has been deleted.

OLP CLI を使用して、特定のパーティションの変更をサブスクライブします

特定のパーティションの通知にサブスクライブすると、すべての子パーティション(タイル)を含む、指定したパーティションへの変更によって通知がトリガーされます。 地理的領域の変更をサブスクライブするには olp catalog layer notification create [...] --subscription-area partitions:PARTITIONS 、次のコマンドを使用します。

次のことを試してください。

  • 377894444 ベルリンでパーティションのサブスクリプションを作成します
  • ベルリンとミュンヘンにデータをアップロードします

その結果、通知は 1 つだけ受信する必要があります。

結論

このチュートリアルでは、 OLP CLI を使用して、カタログおよびレイヤーレベルの変更に対するサブスクリプションを作成および管理する方法を学習しました。また、 Data Client を使用してサブスクリプション通知を使用する方法についても学習しました。

詳細情報

詳細については、次の追加リソースを参照してください。

」に一致する結果は 件です

    」に一致する結果はありません