ストリーム レイヤーから読み取ります
この例では、 HERE Data SDK for TypeScript を使用して Node.js のストリーム レイヤーからパーティション メタデータおよびパーティション データを読み取る方法を示します。
Node.js でアプリをビルドして実行します
アプリをビルドする前に、すべての依存関係がインストールされていることを確認してください。
Node.js でアプリをビルドして実行するには、次の手順に従います。
-
NPM プロジェクトを作成します。
mkdir example-app && cd example-app && npm init
-
TypeScript プロジェクトを初期化します。
tsc --init
-
インストールノードタイプ。
npm install --save-dev @types/node
-
SDK モジュールを取り付けます。
npm install --save @here/olp-sdk-authentication @here/olp-sdk-dataservice-read
ここで、すべてがアプリを作成するように設定されます。
-
index.ts
ファイルとアプリスケルトンを作成します。
class App {
run() {
console.log("App works!");
}
}
const app = new App();
app.run();
-
アプリをコンパイルして実行します。
tsc && node .
正常に実行されると、コンソールに次のメッセージが表示されます。
App works!
作成 StreamLayerClient
このStreamLayerClient
クラスを使用して、ストリーム レイヤーからデータをストリームするキューからデータを要求できます。 消費者がデータを読み取ると、その消費者はデータを利用できなくなりますが、データは他の消費者が利用できます。
ストリームレイヤーは、保持時間または存続可能時間 (TTL) を使用して設定できます。この設定を行うと、指定した期間が経過した後に未使用のデータが削除されます。
StreamLayerClient
インスタンスを作成するには :
-
OlpClientSettings
オブジェクトを作成します。
手順については、「プラットフォームクライアント設定を作成する」を参照してください。
-
StreamLayerClient
StreamLayerClientParams
ステップ 1 でカタログ HERE リソースネーム 、レイヤー ID 、プラットフォームクライアント設定を含むインスタンスを作成します。
const streamLayerClient = new StreamLayerClient({
catalogHrn: HRN.fromString("your-catalog-hrn"),
layerId: "your-layer-id",
settings: olpClientSettings,
});
ストリーム レイヤーにサブスクライブします
ストリーム レイヤーにサブスクライブするには、次の手順を実行し
-
StreamLayerClient
オブジェクトを作成します。
手順については 、「 StreamLayerClient を作成する」を参照してください。
-
SubscribeRequest
serial
または parallel
サブスクリプションタイプでオブジェクトを作成します。
-
アプリが単一のサブスクリプションを使用して小さい量のデータを読み取る必要がある場合 serial
は、サブスクリプションタイプを使用します。
const subscribeRequest = new SubscribeRequest().withMode("serial");
-
アプリが大量のデータを並行して読み取る場合 parallel
は、サブスクリプションタイプとサブスクリプション ID を使用します。
const subscribeRequest = new SubscribeRequest()
.withMode("parallel")
.withSubscriptionId("your-subscription-id");
-
subscribe
subscribeRequest
パラメーターを使用してメソッドを呼び出します。
const subscribtionId = await streamLayerClient.subscribe(subscribeRequest);
リクエストされたサブスクリプションから選択したレイヤーへのサブスクリプション ID を受け取ります。
データを取得するには Poll
、メソッドを呼び出します。
レイヤーにサブスクライブしている場合は、ストリーム レイヤーからメッセージを読み取ることができます。 メッセージには、データおよび次のパーティション メタデータが含まれています。
- データ ハンドル
- ID
- データサイズ
- 圧縮されたデータサイズ
- チェックサム
- タイムスタンプ
ストリーム レイヤーからデータを取得するには、次の手順に従います。
-
streamLayerClient
オブジェクトを作成します。
手順については 、「 StreamLayerClient を作成する」を参照してください。
-
ストリーム レイヤーにサブスクライブします。 『ストリーム レイヤーのサブスクライブ』を参照してください
-
poll
サブスクリプション ID を使用してメソッドを呼び出します。
const messages = await streamLayerClient.poll(
new PollRequest().withMode("serial").withSubscriptionId(subscribtionId)
);
レイヤーデータとパーティション メタデータでメッセージを受信します。 poll
このメソッドはオフセットもコミットするので、新しいメッセージのポーリングを続行できます。
例 :
{
"messages": [
{
"metaData": {
"partition": "314010583",
"checksum": "ff7494d6f17da702862e550c907c0a91",
"compressedDataSize": 152417,
"dataSize": 250110,
"data": "iVBORw0KGgoAAAANSUhEUgAAADAAAAAwBAMAAAClLOS0AAAABGdBTUEAALGPC/xhBQAAABhQTFRFvb29AACEAP8AhIKEPb5x2m9E5413aFQirhRuvAMqCw+6kE2BVsa8miQaYSKyshxFvhqdzKx8UsPYk9gDEcY1ghZXcPbENtax8g5T+3zHYufF1Lf9HdIZBfNEiKAAAAAElFTkSuQmCC",
"dataHandle": "",
"timestamp": 1517916706
},
"offset": {
"partition": 7,
"offset": 38562
}
}
]
}
データサイズが 1 MB 未満の場合、データフィールドに値が入力されます。 データサイズが 1 MB を超える場合、 blob ストアに保存されているオブジェクトをポイントするデータ ハンドルが取得されます。
-
データサイズが 1 MB を超える場合 getData
Messages
は、オブジェクトを使用してメソッドを呼び出します。
const data = await streamLayerClient.getData({
metaData: {
partition: "314010583",
checksum: "ff7494d6f17da702862e550c907c0a91",
compressedDataSize: 152417,
dataSize: 250110,
data: "",
dataHandle:
"iVBORw0KGgoAAAANSUhEUgAAADAAAAAwBAMAAAClLOS0AAAABGdBTUEAALGPC/xhBQAAABhQTFRFvb29AACEAP8AhIKEPb5x2m9E5413aFQirhRuvAMqCw+6kE2BVsa8miQaYSKyshxFvhqdzKx8UsPYk9gDEcY1ghZXcPbENtax8g5T+3zHYufF1Lf9HdIZBfNEiKAAAAAElFTkSuQmCC",
timestamp: 1517916706,
},
offset: {
partition: 7,
offset: 38562,
},
});
要求されたパーティションからデータを取得します。
事前定義されたオフセットを探します
指定したオフセットからデータの読み取りを開始できます。 レイヤー(トピック)オフセットからメッセージの消費を開始するには、メッセージポインタをレイヤー(トピック) に移動します。 一度オフセットを検索すると、最初のオフセットが保存されない限り、最初のオフセットに戻ることはできません。
await streamLayerClient.seek(
new SeekRequest()
.withMode("serial")
.withSubscriptionId(subscribtionId)
.withSeekOffsets({ offsets: [{ partition: 7, offset: 38562 }] })
);
レイヤーへのサブスクリプションを削除します
レイヤー ( トピック ) へのサブスクリプションを削除できます。 この操作を行うと、サービスからサブスクリプションが削除されます。
await streamLayerClient.unsubscribe(
new UnsubscribeRequest()
.withMode("serial")
.withSubscriptionId(subscribtionId)
)
);