カタログ通知

カタログ通知では、カタログが更新されたことを通知します。

通知を有効にします

通知を有効に するには、カタログの設定で通知が有効になっている必要があります。

通知をサブスクライブします

通知ストリームにサブスクライブするには、 QueryApi から subscribeToNotifications を呼び出す必要があります。

Scala
Java
// create queryApi for target catalog
val queryApi = DataClient().queryApi(catalogHrn)

// create subscription configuration
val consumerSettings =
  NotificationConsumerSettings(groupName = "consumer-group-name")

// subscription to notifications
val control: Future[NotificationSubscriptionControl] =
  queryApi.subscribeToNotifications(
    consumerSettings, { notification =>
      // this callback is called each time a new batch publication happens in catalog
      println(s"catalog ${catalogHrn} has a new version ${notification.version}")
    }
  )
// create queryApi for target catalog
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);

// create subscription configuration
NotificationConsumerSettings consumerSettings =
    new NotificationConsumerSettings.Builder().withGroupName("consumer-group-name").build();

// subscription to notifications
CompletionStage<NotificationSubscriptionControl> controlStage =
    queryApi.subscribeToNotifications(
        consumerSettings,
        notification ->
            // this callback is called each time a new batch publication happens in catalog
            System.out.printf(
                "catalog %s has a new version %d\n",
                catalogHrn, notification.getCatalogVersion()));

より的確に管理できるように、 Akka ストリームソースにサブスクライブできます。

Scala
Java
// create queryApi for target catalog
val queryApi = DataClient().queryApi(catalogHrn)

// create subscription configuration
val consumerSettings =
  NotificationConsumerSettings(groupName = "consumer-group-name")

// subscription to notifications
val control: Future[NotificationSubscriptionControl] =
  queryApi
    .subscribeToNotifications(consumerSettings)
    .map { subscription =>
      // consume the notification streams
      subscription.notifications
        .runWith(Sink.foreach { notification: BatchPublicationNotification =>
          // this callback is called each time a new batch publication happens in catalog
          println(s"catalog ${catalogHrn} has a new version ${notification.version}")
        })

      subscription.subscriptionControl
    }

// create queryApi for target catalog
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);

// create subscription configuration
NotificationConsumerSettings consumerSettings =
    new NotificationConsumerSettings.Builder().withGroupName("consumer-group-name").build();

// subscription to notifications
CompletionStage<NotificationSubscriptionControl> controlStage =
    queryApi
        .subscribeToNotifications(consumerSettings)
        .thenApply(
            subscription -> {
              subscription
                  .notifications()
                  .runWith(
                      Sink.foreach(
                          notification ->
                              // this callback is called each time a new batch publication
                              // happens in catalog
                              System.out.printf(
                                  "catalog %s has a new version %d\n",
                                  catalogHrn, notification.getCatalogVersion())),
                      myMaterializer);

              return subscription.subscriptionControl();
            });

サブスクリプションをキャンセルするには、 NotificationSubscriptionControl.shutdown を呼び出します。

Scala
Java
control.flatMap(_.shutdown())
controlStage.thenCompose(control -> control.shutdown());

」に一致する結果は 件です

    」に一致する結果はありません