カタログで通知が有効化されている場合、ストリームはカタログと同じ HERE リソースネーム ( HERE リソースネーム )を使用して HERE platform によって自動的に作成されます。 カタログデータが更新されるたびに、カタログの新しいバージョンが作成され、新しいレコードがストリームに書き込まれます。
{"id":"notifications-source","name":"Catalog Notifications Source Tutorial","summary":"Catalog Notifications Source","description":"When changed, the notifications will be sent to subscribers","notifications":{"enabled":true},"layers":[{"id":"notification-source-versioned-layer","name":"Notifications Source Tutorial layer","summary":"Messages for the notifications tutorial.","description":"Messages for the notifications tutorial.","layerType":"versioned","volume":{"volumeType":"durable"},"partitioning":{"scheme":"heretile","tileLevels":[14]},"contentType":"text/plain"}]}
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/importcom.here.hrn.HRN;importcom.here.platform.data.client.flink.javadsl.FlinkDataClient;importcom.here.platform.data.client.flink.javadsl.FlinkQueryApi;importcom.here.platform.data.client.settings.NotificationConsumerSettings;importjava.util.Date;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;publicclassCatalogLevelNotification{privatestaticfinalHRN CATALOG_HRN = HRN.fromString("{{SOURCE_CATALOG_HRN}}");publicstaticfinalString CONSUMER_GROUP_NAME ="catalog-level-notification";publicstaticvoidmain(String[] args)throwsException{// Create queryApi for the source catalogfinalFlinkDataClient flinkDataClient =newFlinkDataClient();FlinkQueryApi queryApi = flinkDataClient.queryApi(CATALOG_HRN);// Specify Kafka consumer group nameNotificationConsumerSettings consumerSettings =newNotificationConsumerSettings.Builder().withGroupName(CONSUMER_GROUP_NAME).build();// Create the context in which a streaming program is executedStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env
// source function that emits catalog-level notifications.addSource(queryApi.subscribeToNotifications(consumerSettings))// map notification to a human-readable string to be displayed in the console.map(
notification ->String.format("===> BatchPublicationNotification: New catalog version %s, date/time: %s",
notification.getCatalogVersion(),newDate(notification.timestamp())))// Write to standard output.print();// Trigger the program execution
env.execute("Catalog-level notification listener");}}
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/importcom.here.hrn.HRN
importcom.here.platform.data.client.flink.scaladsl.FlinkDataClient
importcom.here.platform.data.client.settings.NotificationConsumerSettings
importorg.apache.flink.api.scala.createTypeInformationimportorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment
importjava.util.Date
object CatalogLevelNotificationScala {privateval CatalogHrn = HRN("{{SOURCE_CATALOG_HRN}}")privateval ConsumerGroupName ="catalog-level-notification"def main(args: Array[String]):Unit={// Create queryApi for the source catalogval flinkDataClient =new FlinkDataClient()val queryApi = flinkDataClient.queryApi(CatalogHrn)// Specify Kafka consumer group nameval consumerSettings = NotificationConsumerSettings(ConsumerGroupName)// Create the context in which a streaming program is executedval env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env
// source function that emits catalog-level notifications.addSource(queryApi.subscribeToNotifications(consumerSettings))// map BatchPublicationNotification to a human-readable string to be displayed in the console.map { notification =>
s"===> BatchPublicationNotification: New catalog version ${notification.version}, date/time: ${new Date(notification.timestamp)}"}// Write to standard output.print()// Trigger the program execution
env.execute("Catalog-level notification listener")}}
[...] Subscribe to notifications with group name: catalog-level-notifications-consumer-group, offset: LatestOffset and consumer id: c458fd12-c525-4c58-ac71-11a149c8234c
{"id":"notifications-sink","name":"Catalog Notifications Stream Tutorial","summary":"Catalog for Notifications Stream","description":"The catalog containing the notification stream","layers":[{"id":"notification-stream-layer","name":"Notification Stream","summary":"Stream for notification","description":"Stream for notification","layerType":"stream","volume":{"volumeType":"durable"},"partitioning":{"scheme":"generic"},"contentType":"text/plain"}]}
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/importcom.here.hrn.HRN;importcom.here.platform.data.client.flink.javadsl.FlinkDataClient;importcom.here.platform.data.client.flink.javadsl.FlinkQueryApi;importcom.here.platform.data.client.flink.javadsl.FlinkReadEngine;importcom.here.platform.data.client.javadsl.Partition;importcom.here.platform.data.client.settings.ConsumerSettings;importcom.here.platform.data.client.settings.ConsumerSettings.Builder;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;publicclassLayerLevelNotification{privatestaticfinalHRN NOTIFICATION_CATALOG_HRN =
HRN.fromString("{{NOTIFICATION_CATALOG_HRN}}");privatestaticfinalString NOTIFICATION_STREAM_LAYER_ID ="notification-stream-layer";publicstaticfinalString CONSUMER_GROUP_NAME ="layer-level-notification";publicstaticvoidmain(String[] args)throwsException{// Create queryApi for the source catalogfinalFlinkDataClient flinkDataClient =newFlinkDataClient();FlinkQueryApi queryApi = flinkDataClient.queryApi(NOTIFICATION_CATALOG_HRN);// Specify Kafka consumer group nameConsumerSettings consumerSettings =newBuilder().withLatestOffset().withGroupName(CONSUMER_GROUP_NAME).build();// Create the context in which a streaming program is executedStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env
// source function that emits catalog-level notifications.addSource(queryApi.subscribe(NOTIFICATION_STREAM_LAYER_ID, consumerSettings))// read partition bytes and map to human-readable string.map(newSampleMapper())// Write to standard output.print();// Trigger the program execution
env.execute("Layer-level notification listener");}privatestaticclassSampleMapperextendsRichMapFunction<Partition,String>{privatetransientFlinkDataClient flinkDataClient;privatetransientFlinkReadEngine flinkReadEngine;@Overridepublicvoidopen(Configuration parameters){// Create read engine to read the partition bytes
flinkDataClient =newFlinkDataClient();
flinkReadEngine = flinkDataClient.readEngine(NOTIFICATION_CATALOG_HRN);}@OverridepublicStringmap(Partition partition){byte[] dataAsBytes = flinkReadEngine.getDataAsBytes(partition);return"===> Received "+newString(dataAsBytes);}@Overridepublicvoidclose(){// FlinkDataClient is a heavyweight object that needs to be created once, reused and// terminated.
flinkDataClient.terminate();}}}
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/importcom.here.hrn.HRN
importcom.here.platform.data.client.flink.scaladsl.{FlinkDataClient, FlinkReadEngine}importcom.here.platform.data.client.scaladsl.Partition
importcom.here.platform.data.client.settings.{ConsumerSettings, LatestOffset}importorg.apache.flink.api.common.functions.RichMapFunction
importorg.apache.flink.api.scala.createTypeInformationimportorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object LayerLevelNotificationScala {privateval NotificationCatalogHrn = HRN("{{NOTIFICATION_CATALOG_HRN}}")privateval NotificationStreamLayerId ="notification-stream-layer"val ConsumerGroupName ="layer-level-notification"def main(args: Array[String]):Unit={// Create queryApi for the source catalogval flinkDataClient =new FlinkDataClient()val queryApi = flinkDataClient.queryApi(NotificationCatalogHrn)// Specify Kafka consumer group nameval consumerSettings = ConsumerSettings(ConsumerGroupName, LatestOffset)// Create the context in which a streaming program is executedval env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env
// source function that emits catalog-level notifications.addSource(queryApi.subscribe(NotificationStreamLayerId, consumerSettings))// read partition bytes and map to human-readable string.map(new SampleMapper(NotificationCatalogHrn))// Write to standard output.print()// Trigger the program execution
env.execute("Layer-level notification listener")}class SampleMapper(hrn: HRN)extends RichMapFunction[Partition,String]with Serializable {@transientprivatelazyval flinkDataClient: FlinkDataClient =new FlinkDataClient()@transientprivatelazyval flinkReadEngine: FlinkReadEngine =// Create read engine to read the partition bytes
flinkDataClient.readEngine(hrn)overridedef map(partition: Partition):String=
s"===> Received ${new String(flinkReadEngine.getDataAsBytes(partition))}"overridedef close():Unit=// FlinkDataClient is a heavyweight object that needs to be created once, reused and terminated.
flinkDataClient.terminate()}}
olp catalog layer partition put {{SOURCE_CATALOG_HRN}} notification-source-versioned-layer --partitions 377894444:src/main/resources/data/august-berlin.csv
パーティションがアップロードされると、コンソールに結果が表示されます。
===> Received {"catalogHRN":"{{SOURCE_CATALOG_HRN}}","catalogVersion":0,"layerId":"notification-source-versioned-layer","partitions":[{"version":0,"partition":"377894444","layer":"notification-source-versioned-layer","dataHandle":"d974c61f-45f1-42fa-9ccd-165a4decd236","deleted":false}],"timestamp":1637532678413,"subscriptionResultType":"FULL"}
olp catalog layer partition put {{SOURCE_CATALOG_HRN}} notification-source-versioned-layer --partitions 377782524:src/main/resources/data/august-munich.csv
パーティションがアップロードされると、コンソールに結果が表示されます。
===> Received {"catalogHRN":"{{SOURCE_CATALOG_HRN}}","catalogVersion":1,"layerId":"notification-source-versioned-layer","partitions":[{"version":1,"partition":"377782524","layer":"notification-source-versioned-layer","dataHandle":"cf101de8-20f8-4dc0-a9a9-b754af6ad3ed","deleted":false}],"timestamp":1637532827487,"subscriptionResultType":"FULL"}
olp catalog layer partition put {{SOURCE_CATALOG_HRN}} notification-source-versioned-layer --partitions 377894444:src/main/resources/data/august-berlin.csv
ベルリンのテレビ塔は 377894444 、同じパーティション on ズーム レベルにあり 14ます。そのため、パーティションをアップロードすると、コンソールに結果が表示されます。
===> Received {"catalogHRN":"{{SOURCE_CATALOG_HRN}}","catalogVersion":2,"layerId":"notification-source-versioned-layer","partitions":[{"version":2,"partition":"377894444","layer":"notification-source-versioned-layer","dataHandle":"d974c61f-45f1-42fa-9ccd-165a4decd236","deleted":false}],"timestamp":1637593998401,"subscriptionResultType":"FULL"}