ストリーム レイヤーから読み取ります

この例では、 HERE Data SDK for TypeScript を使用して Node.js のストリーム レイヤーからパーティション メタデータおよびパーティション データを読み取る方法を示します。

Node.js でアプリをビルドして実行します

アプリをビルドする前に、すべての依存関係がインストールされていることを確認してください。

Node.js でアプリをビルドして実行するには、次の手順に従います。

  1. NPM プロジェクトを作成します。

    mkdir example-app && cd example-app && npm init
    
  2. TypeScript プロジェクトを初期化します。

    tsc --init
    
  3. インストールノードタイプ。

    npm install --save-dev @types/node
    
  4. SDK モジュールを取り付けます。

    npm install --save @here/olp-sdk-authentication @here/olp-sdk-dataservice-read
    

    ここで、すべてがアプリを作成するように設定されます。

  5. index.ts ファイルとアプリスケルトンを作成します。

    /**
     * Example of the Node.js app used for reading a stream layer from the datastore.
     */
    
    class App {
      run() {
        console.log("App works!");
      }
    }
    
    const app = new App();
    app.run();
    
  6. アプリをコンパイルして実行します。

    tsc && node .
    

正常に実行されると、コンソールに次のメッセージが表示されます。

App works!

作成 StreamLayerClient

このStreamLayerClientクラスを使用して、ストリーム レイヤーからデータをストリームするキューからデータを要求できます。 消費者がデータを読み取ると、その消費者はデータを利用できなくなりますが、データは他の消費者が利用できます。

ストリームレイヤーは、保持時間または存続可能時間 (TTL) を使用して設定できます。この設定を行うと、指定した期間が経過した後に未使用のデータが削除されます。

StreamLayerClient インスタンスを作成するには :

  1. OlpClientSettings オブジェクトを作成します。

    手順については、「プラットフォームクライアント設定を作成する」を参照してください。

  2. StreamLayerClientStreamLayerClientParams ステップ 1 でカタログ HERE リソースネーム 、レイヤー ID 、プラットフォームクライアント設定を含むインスタンスを作成します。

    const streamLayerClient = new StreamLayerClient({
     catalogHrn: HRN.fromString("your-catalog-hrn"),
     layerId: "your-layer-id",
     settings: olpClientSettings,
    });
    

ストリーム レイヤーにサブスクライブします

ストリーム レイヤーにサブスクライブするには、次の手順を実行し

  1. StreamLayerClient オブジェクトを作成します。

    手順については 、「 StreamLayerClient を作成する」を参照してください。

  2. SubscribeRequestserial または parallel サブスクリプションタイプでオブジェクトを作成します。

    • アプリが単一のサブスクリプションを使用して小さい量のデータを読み取る必要がある場合 serial は、サブスクリプションタイプを使用します。

      const subscribeRequest = new SubscribeRequest().withMode("serial");
      
    • アプリが大量のデータを並行して読み取る場合 parallel は、サブスクリプションタイプとサブスクリプション ID を使用します。

      const subscribeRequest = new SubscribeRequest()
        .withMode("parallel")
        .withSubscriptionId("your-subscription-id");
      
  3. subscribesubscribeRequest パラメーターを使用してメソッドを呼び出します。

    const subscribtionId = await streamLayerClient.subscribe(subscribeRequest);
    

    リクエストされたサブスクリプションから選択したレイヤーへのサブスクリプション ID を受け取ります。

データを取得するには Poll 、メソッドを呼び出します。

ストリーム レイヤーからデータとパーティション メタデータを取得します

レイヤーにサブスクライブしている場合は、ストリーム レイヤーからメッセージを読み取ることができます。 メッセージには、データおよび次のパーティション メタデータが含まれています。

  • データ ハンドル
  • ID
  • データサイズ
  • 圧縮されたデータサイズ
  • チェックサム
  • タイムスタンプ

ストリーム レイヤーからデータを取得するには、次の手順に従います。

  1. streamLayerClient オブジェクトを作成します。

    手順については 、「 StreamLayerClient を作成する」を参照してください。

  2. ストリーム レイヤーにサブスクライブします。 『ストリーム レイヤーのサブスクライブ』を参照してください

  3. poll サブスクリプション ID を使用してメソッドを呼び出します。

    const messages = await streamLayerClient.poll(
      new PollRequest().withMode("serial").withSubscriptionId(subscribtionId)
    );
    

    レイヤーデータとパーティション メタデータでメッセージを受信します。 poll このメソッドはオフセットもコミットするので、新しいメッセージのポーリングを続行できます。

    例 :

    {
    "messages": [
        {
        "metaData": {
            "partition": "314010583",
            "checksum": "ff7494d6f17da702862e550c907c0a91",
            "compressedDataSize": 152417,
            "dataSize": 250110,
            "data": "iVBORw0KGgoAAAANSUhEUgAAADAAAAAwBAMAAAClLOS0AAAABGdBTUEAALGPC/xhBQAAABhQTFRFvb29AACEAP8AhIKEPb5x2m9E5413aFQirhRuvAMqCw+6kE2BVsa8miQaYSKyshxFvhqdzKx8UsPYk9gDEcY1ghZXcPbENtax8g5T+3zHYufF1Lf9HdIZBfNEiKAAAAAElFTkSuQmCC",
            "dataHandle": "",
            "timestamp": 1517916706
        },
        "offset": {
            "partition": 7,
            "offset": 38562
        }
        }
    ]
    }
    

    データサイズが 1 MB 未満の場合、データフィールドに値が入力されます。 データサイズが 1 MB を超える場合、 blob ストアに保存されているオブジェクトをポイントするデータ ハンドルが取得されます。

  4. データサイズが 1 MB を超える場合 getDataMessages は、オブジェクトを使用してメソッドを呼び出します。

    const data = await streamLayerClient.getData({
      metaData: {
        partition: "314010583",
        checksum: "ff7494d6f17da702862e550c907c0a91",
        compressedDataSize: 152417,
        dataSize: 250110,
        data: "",
        dataHandle:
           "iVBORw0KGgoAAAANSUhEUgAAADAAAAAwBAMAAAClLOS0AAAABGdBTUEAALGPC/xhBQAAABhQTFRFvb29AACEAP8AhIKEPb5x2m9E5413aFQirhRuvAMqCw+6kE2BVsa8miQaYSKyshxFvhqdzKx8UsPYk9gDEcY1ghZXcPbENtax8g5T+3zHYufF1Lf9HdIZBfNEiKAAAAAElFTkSuQmCC",
        timestamp: 1517916706,
      },
      offset: {
        partition: 7,
        offset: 38562,
      },
    });
    

要求されたパーティションからデータを取得します。

事前定義されたオフセットを探します

指定したオフセットからデータの読み取りを開始できます。 レイヤー(トピック)オフセットからメッセージの消費を開始するには、メッセージポインタをレイヤー(トピック) に移動します。 一度オフセットを検索すると、最初のオフセットが保存されない限り、最初のオフセットに戻ることはできません。

await streamLayerClient.seek(
  new SeekRequest()
    .withMode("serial")
    .withSubscriptionId(subscribtionId)
    .withSeekOffsets({ offsets: [{ partition: 7, offset: 38562 }] })
);

レイヤーへのサブスクリプションを削除します

レイヤー ( トピック ) へのサブスクリプションを削除できます。 この操作を行うと、サービスからサブスクリプションが削除されます。

await streamLayerClient.unsubscribe(
  new UnsubscribeRequest()
    .withMode("serial")
    .withSubscriptionId(subscribtionId)
  )
);

」に一致する結果は 件です

    」に一致する結果はありません