Subscribe to catalog and layer-level changes

Objectives: Create and manage subscriptions to catalog and layer-level changes.

Complexity: Intermediate

Prerequisites: Get your credentials, Verify your credentials

Time to complete: 60 min

Source code: Download

Notifications let you know when a catalog, layer, or partition is updated and provide information about which data has changed. The notifications function is useful when you run a stream application on the platform or service outside the platform that depends on the versioned map data. With this function, you no longer need to continually poll for changes because your application automatically receives a notification when the catalog is updated. Upon receiving a notification, your application only needs to request the corresponding data.

You can choose for which portion of the catalog you will receive notifications based on the specific requirements:

This tutorial demonstrates how to create and manage subscriptions to catalog and layer-level changes using OLP CLI.

Once the subscription is created, the tutorial demonstrates how to implement a consumer application using Data Client Library.

Note

However, your use case might be processing map data to transform or derive information from it. Such processing typically means consuming one or multiple versioned partitioned catalogs and outputting a different versioned partitioned catalog instead. Whenever one input catalog changes, you typically need to run your process to update the output.

In this case, you don't need to use the notification mechanics described in this tutorial. Instead, it is recommended to use the Data Processing library and batch processing pipelines. These are designed to automatically run when an input catalog changes. To get started, use the Copy a Catalog Using the Data Processing Library tutorial.

To follow this tutorial, you need to have the CLI installed and have both platform and repository credentials set up on your local machine.

Catalog-level notification

This tutorial demonstrates how to subscribe to catalog-level changes and consume the notifications.

When notifications are enabled for a catalog, a stream is automatically created by the HERE platform with the same HERE Resource Name (HRN) as the catalog. Whenever the catalog data is updated, a new version of the catalog is created, and a new record is written to the stream.

The notification record contains the timestamp of the change and the new catalog version number. The version number is particularly useful, as you can use it to request the updated data.

To receive notifications about new catalog version publications, perform the following steps:

  • Create a source catalog with notifications enabled with OLP CLI.
  • Implement a notification listener application using Data Client Library.
  • Upload new data into the catalog with OLP CLI to trigger a notification. In real-life situations, someone else would update the data, but for the purposes of this tutorial, update it on your side.
Catalog-level notification
Figure 1. Catalog-level notification

Create a catalog with notifications enabled

Create a catalog that you can then monitor for changes using notifications with the configuration below. When the data in the catalog is changed, the notifications will be sent to subscribers.

The catalog configuration contains a sample versioned layer with heretile partitioningScheme with tileLevel 14.

For the sake of simplicity, the "contentType": "text/plain" is used.

The catalog should have the notifications enabled. To enable notifications for a new catalog, make sure you have the following property in the catalog configuration file used for the catalog creation:

  "notifications": {
    "enabled": true
  }

The complete catalog configuration:

source-catalog.json
{
  "id": "notifications-source",
  "name": "Catalog Notifications Source Tutorial",
  "summary": "Catalog Notifications Source",
  "description": "When changed, the notifications will be sent to subscribers",
  "notifications": {
    "enabled": true
  },
  "layers": [
    {
      "id": "notification-source-versioned-layer",
      "name": "Notifications Source Tutorial layer",
      "summary": "Messages for the notifications tutorial.",
      "description": "Messages for the notifications tutorial.",
      "layerType": "versioned",
      "volume": {
        "volumeType": "durable"
      },
      "partitioning": {
        "scheme": "heretile",
        "tileLevels": [
          14
        ]
      },
      "contentType": "text/plain"
    }
  ]
}

Replace {{YOUR_USERNAME}} below with your username and run the command using the above json file:

olp catalog create {{YOUR_USERNAME}}-source-catalog "{{YOUR_USERNAME}} source catalog" --config source-catalog.json

You will receive the following message in the CLI:

Catalog {{SOURCE_CATALOG_HRN}} has been created.

Note

If a billing tag is required in your realm, update the config file by adding the billingTag: ["YOUR_BILLING_TAG"] property to the layer section.

To enable notifications for the already existing catalog, use the olp catalog update [...] --notifications true command.

Set up the Maven project

You may download the source code at the beginning of the tutorial and put it in a folder of your choice, or create a folder structure from scratch:

catalog-notification
└── src
    └── main
        ├── java
        └── resources
        └── scala

You can do this with a single bash command:

mkdir -p catalog-notification/src/main/{java,resources,scala}

The Maven POM file is similar to the file in the Verify Maven Settings tutorial.

The parent POM and dependencies sections are updated as follows:

Parent POM:

<parent>
    <groupId>com.here.platform</groupId>
    <artifactId>sdk-stream-bom_2.12</artifactId>
    <version>2.54.3</version>
    <relativePath/>
</parent>

The POM uses sdk-stream-bom_2.12 as a parent POM. It makes it possible to get the Flink libraries' versions from the environment POM.

Dependencies:

<dependencies>
    <!-- HERE dependencies -->
    <dependency>
        <groupId>com.here.platform.data.client</groupId>
        <artifactId>flink-support_${scala.compat.version}</artifactId>
    </dependency>

    <!-- External dependencies -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.compat.version}</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
    </dependency>
</dependencies>

The following dependencies are used:

Implement a notification consumer application

In this tutorial, the source versioned catalog contains a sample dataset of average values for temperature and humidity in a certain area. An example of the data:

Temperature,Humidity
18,72

The data is updated every month, and new data is uploaded to the catalog.

Let's create a simple Java/Scala application to receive notifications when the catalog containing the dataset is updated. The application prints the catalog version and timestamp to the console.

Change the {{SOURCE_CATALOG_HRN}} placeholder to your source catalog HRN created in the previous chapter.

Java
Scala

private static final HRN CATALOG_HRN = HRN.fromString("{{SOURCE_CATALOG_HRN}}");


private val CatalogHrn = HRN("{{SOURCE_CATALOG_HRN}}")

Java
Scala

/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.hrn.HRN;
import com.here.platform.data.client.flink.javadsl.FlinkDataClient;
import com.here.platform.data.client.flink.javadsl.FlinkQueryApi;
import com.here.platform.data.client.settings.NotificationConsumerSettings;
import java.util.Date;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class CatalogLevelNotification {

  private static final HRN CATALOG_HRN = HRN.fromString("{{SOURCE_CATALOG_HRN}}");
  public static final String CONSUMER_GROUP_NAME = "catalog-level-notification";

  public static void main(String[] args) throws Exception {
    // Create queryApi for the source catalog
    final FlinkDataClient flinkDataClient = new FlinkDataClient();
    FlinkQueryApi queryApi = flinkDataClient.queryApi(CATALOG_HRN);

    // Specify Kafka consumer group name
    NotificationConsumerSettings consumerSettings =
        new NotificationConsumerSettings.Builder().withGroupName(CONSUMER_GROUP_NAME).build();

    // Create the context in which a streaming program is executed
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env
        // source function that emits catalog-level notifications
        .addSource(queryApi.subscribeToNotifications(consumerSettings))
        // map notification to a human-readable string to be displayed in the console
        .map(
            notification ->
                String.format(
                    "===> BatchPublicationNotification: New catalog version %s, date/time: %s",
                    notification.getCatalogVersion(), new Date(notification.timestamp())))
        // Write to standard output
        .print();

    // Trigger the program execution
    env.execute("Catalog-level notification listener");
  }
}



/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.hrn.HRN
import com.here.platform.data.client.flink.scaladsl.FlinkDataClient
import com.here.platform.data.client.settings.NotificationConsumerSettings
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import java.util.Date

object CatalogLevelNotificationScala {
  private val CatalogHrn = HRN("{{SOURCE_CATALOG_HRN}}")
  private val ConsumerGroupName = "catalog-level-notification"

  def main(args: Array[String]): Unit = {
    // Create queryApi for the source catalog
    val flinkDataClient = new FlinkDataClient()
    val queryApi = flinkDataClient.queryApi(CatalogHrn)

    // Specify Kafka consumer group name
    val consumerSettings = NotificationConsumerSettings(ConsumerGroupName)

    // Create the context in which a streaming program is executed
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env
    // source function that emits catalog-level notifications
      .addSource(queryApi.subscribeToNotifications(consumerSettings))
      // map BatchPublicationNotification to a human-readable string to be displayed in the console
      .map { notification =>
        s"===> BatchPublicationNotification: New catalog version ${notification.version}, date/time: ${new Date(notification.timestamp)}"
      }
      // Write to standard output
      .print()

    // Trigger the program execution
    env.execute("Catalog-level notification listener")
  }
}

The application uses Flink Data Client Library subscribeToNotifications method to subscribe to the catalog defined by the CATALOG_HRN variable.

The consumer group name is a Kafka Consumer Group name. It is set to catalog-level-notification in this tutorial.

Run the application locally

To execute the catalog-level notification consumer application, run the following command:

Java
Scala

mvn compile exec:java -D"exec.mainClass"="CatalogLevelNotification"


mvn compile exec:java -D"exec.mainClass"="CatalogLevelNotificationScala"

Once you get the console output similar to:

[...] Subscribe to notifications with group name: catalog-level-notifications-consumer-group, offset: LatestOffset and consumer id: c458fd12-c525-4c58-ac71-11a149c8234c

you are subscribed to the notifications.

Once there is new data in the source catalog, you will receive a notification in the console. Note that the current console is busy listening to the notifications, so you may want to open another console instance to proceed with uploading data.

The tutorial contains sample data in the src/main/resources/data folder. The dataset represents the weather summary in Berlin and Munich in August 2021.

Let's upload the data into the partition 377894444 representing Berlin City Centre on zoom level 14. For more information on calculating partition tile IDs and zoom level, refer to Calculate partition Tile IDs tutorial.

To upload the data into a catalog, use the OLP CLI.

olp catalog layer partition put {{SOURCE_CATALOG_HRN}} notification-source-versioned-layer --partitions 377894444:src/main/resources/data/august-berlin.csv

The command uploads data to the catalog, and a new version of the catalog is created. The resulting output will contain a notification with the catalog version and the time of the change.

===> BatchPublicationNotification: New catalog version 0, date/time: Mon Nov 15 14:46:10 EET 2021

To get another change notification, upload more data.

The application is a typical streaming program and thus runs indefinitely. Hit Ctrl + C in the console to stop the program.

Note that if there was another versioned layer in the catalog, you would receive notifications for changes in any of them.

Layer-level notification

Layer-level notifications allow you to get a notification whenever changes occur in a particular layer.

Note

Layer-level notifications are currently available only for versioned layers with HERE Tile partitioning.

Notifications are essentially a data stream. Notifications are written as messages to a stream layer. You receive notifications by subscribing to the layer and consuming the messages as you would consume data from any stream layer in the HERE platform.

To receive notifications when the specific partition or the specific geographic area is updated, perform the following steps:

  • Create a notification catalog with a stream layer where the notifications will be published with OLP CLI.
  • Implement a notification listener application using Data Client Library. The application is simply a consumer of data published into the notification-stream-layer layer of the notification catalog created in the previous chapter.
  • Subscribe to a layer in the source catalog with OLP CLI.
  • Upload new data into the layer you're subscribed to with OLP CLI. In real-life situations, someone else would update the data, but for the purposes of this tutorial, update it on your side.
Layer-level notification
Figure 2. Layer-level notification

Create the notification catalog

For this part of the tutorial, you will need two catalogs:

  • source catalog created in the previous chapter
  • notification catalog with a stream layer to receive change notifications

Note that this time a stream layer should exist to receive notifications about changes in the versioned layer of the source catalog, as opposed to the catalog-level notifications.

Create the notification catalog using the configuration below. When the source catalog is changed, the notification message will be sent to the stream layer of the notification catalog.

The catalog configuration contains a sample stream layer with generic partitioningScheme.

For the sake of simplicity, the "contentType": "text/plain" is used.

The complete catalog configuration:

notification-catalog.json
{
  "id": "notifications-sink",
  "name": "Catalog Notifications Stream Tutorial",
  "summary": "Catalog for Notifications Stream",
  "description": "The catalog containing the notification stream",
  "layers": [
    {
      "id": "notification-stream-layer",
      "name": "Notification Stream",
      "summary": "Stream for notification",
      "description": "Stream for notification",
      "layerType": "stream",
      "volume": {
        "volumeType": "durable"
      },
      "partitioning": {
        "scheme": "generic"
      },
      "contentType": "text/plain"
    }
  ]
}

Replace {{YOUR_USERNAME}} below with your username and run the following:

olp catalog create {{YOUR_USERNAME}}-notification-catalog "{{YOUR_USERNAME}} notification catalog" --config notification-catalog.json

You will receive the following message in the CLI:

Catalog {{NOTIFICATION_CATALOG_HRN}} has been created.

Note

If a billing tag is required in your realm, update the config file by adding the billingTags: ["YOUR_BILLING_TAG"] property to the layer section.

Implement a notification consumer application

In this tutorial, the source version catalog contains two datasets of average values for temperature and humidity stored in different partitions: Berlin and Munich. The structure of the datasets is the same as in the Catalog-level notification consumer application.

Let's create a simple Java/Scala application to receive notifications when a new message is published to the stream layer. The application prints the notification message containing information about changed partitions.

Change the {{NOTIFICATION_CATALOG_HRN}} placeholder to your source catalog HRN created in the previous chapter.

Java
Scala

private static final HRN NOTIFICATION_CATALOG_HRN = HRN.fromString("{{NOTIFICATION_CATALOG_HRN}}");


private val NotificationCatalogHrn = HRN("{{NOTIFICATION_CATALOG_HRN}}")

Java
Scala

/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.hrn.HRN;
import com.here.platform.data.client.flink.javadsl.FlinkDataClient;
import com.here.platform.data.client.flink.javadsl.FlinkQueryApi;
import com.here.platform.data.client.flink.javadsl.FlinkReadEngine;
import com.here.platform.data.client.javadsl.Partition;
import com.here.platform.data.client.settings.ConsumerSettings;
import com.here.platform.data.client.settings.ConsumerSettings.Builder;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class LayerLevelNotification {
  private static final HRN NOTIFICATION_CATALOG_HRN =
      HRN.fromString("{{NOTIFICATION_CATALOG_HRN}}");
  private static final String NOTIFICATION_STREAM_LAYER_ID = "notification-stream-layer";

  public static final String CONSUMER_GROUP_NAME = "layer-level-notification";

  public static void main(String[] args) throws Exception {
    // Create queryApi for the source catalog
    final FlinkDataClient flinkDataClient = new FlinkDataClient();
    FlinkQueryApi queryApi = flinkDataClient.queryApi(NOTIFICATION_CATALOG_HRN);

    // Specify Kafka consumer group name
    ConsumerSettings consumerSettings =
        new Builder().withLatestOffset().withGroupName(CONSUMER_GROUP_NAME).build();

    // Create the context in which a streaming program is executed
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env
        // source function that emits catalog-level notifications
        .addSource(queryApi.subscribe(NOTIFICATION_STREAM_LAYER_ID, consumerSettings))
        // read partition bytes and map to human-readable string
        .map(new SampleMapper())
        // Write to standard output
        .print();

    // Trigger the program execution
    env.execute("Layer-level notification listener");
  }

  private static class SampleMapper extends RichMapFunction<Partition, String> {
    private transient FlinkDataClient flinkDataClient;
    private transient FlinkReadEngine flinkReadEngine;

    @Override
    public void open(Configuration parameters) {
      // Create read engine to read the partition bytes
      flinkDataClient = new FlinkDataClient();
      flinkReadEngine = flinkDataClient.readEngine(NOTIFICATION_CATALOG_HRN);
    }

    @Override
    public String map(Partition partition) {
      byte[] dataAsBytes = flinkReadEngine.getDataAsBytes(partition);
      return "===> Received " + new String(dataAsBytes);
    }

    @Override
    public void close() {
      // FlinkDataClient is a heavyweight object that needs to be created once, reused and
      // terminated.
      flinkDataClient.terminate();
    }
  }
}



/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.hrn.HRN
import com.here.platform.data.client.flink.scaladsl.{FlinkDataClient, FlinkReadEngine}
import com.here.platform.data.client.scaladsl.Partition
import com.here.platform.data.client.settings.{ConsumerSettings, LatestOffset}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object LayerLevelNotificationScala {
  private val NotificationCatalogHrn = HRN("{{NOTIFICATION_CATALOG_HRN}}")
  private val NotificationStreamLayerId = "notification-stream-layer"

  val ConsumerGroupName = "layer-level-notification"

  def main(args: Array[String]): Unit = {
    // Create queryApi for the source catalog
    val flinkDataClient = new FlinkDataClient()
    val queryApi = flinkDataClient.queryApi(NotificationCatalogHrn)

    // Specify Kafka consumer group name
    val consumerSettings = ConsumerSettings(ConsumerGroupName, LatestOffset)

    // Create the context in which a streaming program is executed
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env
    // source function that emits catalog-level notifications
      .addSource(queryApi.subscribe(NotificationStreamLayerId, consumerSettings))
      // read partition bytes and map to human-readable string
      .map(new SampleMapper(NotificationCatalogHrn))
      // Write to standard output
      .print()

    // Trigger the program execution
    env.execute("Layer-level notification listener")
  }

  class SampleMapper(hrn: HRN) extends RichMapFunction[Partition, String] with Serializable {
    @transient
    private lazy val flinkDataClient: FlinkDataClient = new FlinkDataClient()

    @transient
    private lazy val flinkReadEngine: FlinkReadEngine =
      // Create read engine to read the partition bytes
      flinkDataClient.readEngine(hrn)

    override def map(partition: Partition): String =
      s"===> Received ${new String(flinkReadEngine.getDataAsBytes(partition))}"

    override def close(): Unit =
      // FlinkDataClient is a heavyweight object that needs to be created once, reused and terminated.
      flinkDataClient.terminate()
  }

}

The application uses Flink Data Client Library subscribe method to subscribe to the notification-stream-layer layer of the catalog defined by the NOTIFICATION_CATALOG_HRN variable.

The consumer group name is a Kafka Consumer Group name. It is set to layer-level-notification this time.

Run the application locally

To execute the layer-level notification consumer application, run the following command:

Java
Scala

mvn compile exec:java -D"exec.mainClass"="LayerLevelNotification"


mvn compile exec:java -D"exec.mainClass"="LayerLevelNotificationScala"

Once you get the console output similar to:

ConsumerCoordinator:799 - [Consumer clientId=[...], groupId=[...].layer-level-notification] Setting offset for partition [...]

you are subscribed to the notifications.

Once there is new data in the source catalog's notification-source-versioned-layer, you will receive a notification in the console. Note that the current console is busy listening to the notifications, so you may want to open another console instance to proceed with uploading data.

Subscribe to notifications using OLP CLI

You can choose which changes you receive notifications for:

  • All changes in the layer
  • Changes to a geographic area
  • Changes to specific partitions

You can create multiple subscriptions for a catalog, but a subscription always pertains to a single catalog. You can subscribe to either a few versioned layers in a catalog or all its layers. You can also configure the subscription area and notification type to be either the same or different for every layer you subscribe to.

When you create a subscription, you can indicate which changes you want to be notified about.

Subscribe to all changes in the layer using OLP CLI

To subscribe to all changes in the layer, use the olp catalog layer notification create [...] command:

olp catalog layer notification create {{NOTIFICATION_CATALOG_HRN}} notification-stream-layer \
  --source-catalog {{SOURCE_CATALOG_HRN}} --source-layers notification-source-versioned-layer

You will receive the following message in the CLI:

Subscription {{ALL_CHANGES_SUBSCRIPTION_ID}} has been created.

Let's upload the data into the partition 377894444 representing Berlin City Centre on zoom level 14. For more information on calculating partition tile IDs and zoom level, refer to Calculate partition Tile IDs tutorial.

Let's update the weather summary dataset by creating a first entry containing August weather summary data. To upload the data into catalog, use the OLP CLI.

olp catalog layer partition put {{SOURCE_CATALOG_HRN}} notification-source-versioned-layer --partitions 377894444:src/main/resources/data/august-berlin.csv

Once the partition is uploaded you should see the result in the console:

===> Received {"catalogHRN":"{{SOURCE_CATALOG_HRN}}","catalogVersion":0,"layerId":"notification-source-versioned-layer","partitions":[{"version":0,"partition":"377894444","layer":"notification-source-versioned-layer","dataHandle":"d974c61f-45f1-42fa-9ccd-165a4decd236","deleted":false}],"timestamp":1637532678413,"subscriptionResultType":"FULL"}

Let's update the Munich weather summary dataset with August weather summary data in Munich to verify that the notifications are still being sent.

olp catalog layer partition put {{SOURCE_CATALOG_HRN}} notification-source-versioned-layer --partitions 377782524:src/main/resources/data/august-munich.csv

Once the partition is uploaded you should see the result in the console:

===> Received {"catalogHRN":"{{SOURCE_CATALOG_HRN}}","catalogVersion":1,"layerId":"notification-source-versioned-layer","partitions":[{"version":1,"partition":"377782524","layer":"notification-source-versioned-layer","dataHandle":"cf101de8-20f8-4dc0-a9a9-b754af6ad3ed","deleted":false}],"timestamp":1637532827487,"subscriptionResultType":"FULL"}

The notification is received for any partition uploaded, be it Berlin, Munich, or anywhere else in the world.

Let's cancel the subscription before moving on. To cancel the subscription, use the olp catalog layer notification delete ALL_CHANGES_SUBSCRIPTION_ID command:

olp catalog layer notification delete {{NOTIFICATION_CATALOG_HRN}} notification-stream-layer --subscription-ids {{ALL_CHANGES_SUBSCRIPTION_ID}}

You will receive the following message in the CLI:

Subscription {{ALL_CHANGES_SUBSCRIPTION_ID}} has been deleted.

Subscribe to the changes in the geographic area using OLP CLI

To subscribe to the changes in the geographic area, use the olp catalog layer notification create [...] --subscription-area wkt:WKT command:

In this case WKT, that is, a well-known text representation of geometry is equal to POINT(13.409419 52.520817) which is a coordinate of the Berlin Television Tower.

olp catalog layer notification create {{NOTIFICATION_CATALOG_HRN}} notification-stream-layer \
  --source-catalog {{SOURCE_CATALOG_HRN}} --source-layers notification-source-versioned-layer \
  --subscription-area wkt:"POINT(13.409419 52.520817)"

You will receive the following message in the CLI:

Subscription {{BERLIN_TELEVISION_TOWER_CHANGES_SUBSCRIPTION_ID}} has been created.

Let's update the Berlin weather summary dataset with August weather summary data in Berlin to verify that the notifications are sent.

olp catalog layer partition put {{SOURCE_CATALOG_HRN}} notification-source-versioned-layer --partitions 377894444:src/main/resources/data/august-berlin.csv

The Berlin Television Tower is located in the same 377894444 partition on zoom level 14, that's why, once the partition is uploaded you should see the result in the console:

===> Received {"catalogHRN":"{{SOURCE_CATALOG_HRN}}","catalogVersion":2,"layerId":"notification-source-versioned-layer","partitions":[{"version":2,"partition":"377894444","layer":"notification-source-versioned-layer","dataHandle":"d974c61f-45f1-42fa-9ccd-165a4decd236","deleted":false}],"timestamp":1637593998401,"subscriptionResultType":"FULL"}

Let's update the Munich weather summary dataset with August weather summary data in Munich to verify that the notifications are not sent this time.

olp catalog layer partition put {{SOURCE_CATALOG_HRN}} notification-source-versioned-layer --partitions 377782524:src/main/resources/data/august-munich.csv

Once the partition is uploaded, you should see that no new entries appear in the console.

Let's cancel the subscription before moving on. To cancel the subscription, use the olp catalog layer notification delete command:

olp catalog layer notification delete {{NOTIFICATION_CATALOG_HRN}} notification-stream-layer --subscription-ids {{BERLIN_TELEVISION_TOWER_CHANGES_SUBSCRIPTION_ID}}

You will receive the following message in the CLI:

Subscription {{BERLIN_TELEVISION_TOWER_CHANGES_SUBSCRIPTION_ID}} has been deleted.

Subscribe to the changes in the specific partitions using OLP CLI

When you subscribe to notifications for specific partitions, changes to the specified partitions, including all child partitions (tiles), will trigger a notification. To subscribe to the changes in the geographic area, use the olp catalog layer notification create [...] --subscription-area partitions:PARTITIONS command:

Try out the following:

  • Create a subscription to the 377894444 partition in Berlin
  • Upload data to Berlin and Munich

You should receive only one notification as a result.

Conclusion

In this tutorial, you learned to create and manage subscriptions to catalog and layer-level changes using OLP CLI. You also learned how to consume subscription notifications using Data Client.

Further information

For more information, refer to the following additional resources:

results matching ""

    No results matching ""