ストリーム レイヤーからデータを取得します
ストリーム レイヤーからデータを取得するには、stream
v1
API を使用してレイヤーへのサブスクリプションを作成し、レイヤーからメッセージを読み取ります。
ストリーム レイヤーからデータを取得するプロセスは次のとおりです。
- 認可トークンを取得します。
- API ベースの URL を取得します。
- サブスクリプションを作成します。
- メッセージを読む。
- オフセットをコミットします。
これらの各ステップについて、以下で詳しく説明します。
注
一部 stream
の API エンドポイントでは、 API ルックアップサービスによって返されたベース URL ではなく、サブスクリプションリクエストによって返されたベース URL が使用されます。 stream
さまざまなエンドポイントについては、 API のドキュメントに記載されている URL に注意してください。
認可トークン を取得します
HTTP 要求の認可トークンを取得します。 手順について は、『 Identity & Access Management Guide 』を参照してください。
API ベースの URL を取得します
API ルックアップサービスを使用して、データを取得するカタログの API エンドポイントを取得します。 手順については 、『 API Lookup 開発者ガイド』を参照してください。
サブスクリプションを作成します
承認トークンおよび API ベースの URL を取得したら、ストリーム レイヤーへのサブスクリプションを作成できます。 サブスクリプションモードには、 serial と parallel の 2 つがあります。 次の表で、各モードについて説明します。
モード | 説明 |
シリアル番号 | アプリケーションが 1 つのサブスクリプションを使用して少量のデータを読み取る場合は、シリアルサブスクリプションを使用します。 これがデフォルトのサブスクリプションモードです。 シリアルサブスクリプションでは、アプリケーションはサブスクリプションリクエストを作成してストリーム レイヤーにサブスクライブし、サーバーはnodeBaseURL とsubscriptionId を発行します。 次に、アプリケーションがメッセージを読み取り nodeBaseURL 、提供されたを使用して必要な回数だけオフセットをコミットします。
メッセージサイズが 8 KB 、インジェストレートが 175 メッセージ / 秒の場合、このモードでは 1400 kbps のスループットが得られます。 |
平行( Parallel ) | アプリケーションが大量のデータを並行して読み取る場合は、並列サブスクリプションを使用します。 サブスクリプションとメッセージの読み取りワークフローは、複数 aid, catalog, layer, group.id のプロセス / スレッドを使用して同じ組み合わせで複数のサブスクリプションを作成できる点を除き、シリアルサブスクリプションと似ています。 これにより、各サブスクリプションのメッセージを並行して読み取り、コミットできます。
consumerId オプションで、グループ内のコンシューマーを特定するためにサブスクライブするときにを提供できます。 指定しない consumerId 場合、システムは一意のを作成します。 特定 aid, catalog, layer, group.id, consumerId の組み合わせでは、は subscriptionId 常に同じになります。 そのため、を永続化する代わり subscriptionId subscriptionId aid, catalog, layer, group.id, consumerId に、サブスクライブ要求中にと同じ値を提供し、その値が残っている場所から再開することで、いつでも同じ値に戻すことができます(サブスクリプションの存続可能時間が切れていないことを前提としています)。
メッセージサイズが 10 KB で、 65500 kbps (Kilo Bytes/Second) の出力で作成されたレイヤーの場合、システムは 65500 kbps (Kilo Bytes/Second) のスループットを実現し、 32 のサブスクリプションで並行して読み取りました。注意 : レイヤーで利用できるデータ量には制限があります。 |
注意
同じ aid, catalog, layer, group.id
組み合わせで複数のサブスクリプションを作成する場合は、シリアルサブスクリプションを使用しないでください。 この組み合わせでサーバーに作成できるコンシューマは 1 つだけです。そのため、異なるトークンを使用してサーバーにアクセスしようとする複数のアプリケーション / プロセス / スレッドによって、サブスクリプションが進行しない可能性があります。 これは、システムによって トークン更新イベントとして分類され 、同じメッセージのセットが返される可能性があるためです。
使用するサブスクリプションモードを決定したら、次のリクエストを使用してサブスクリプションを作成します。
シリアルサブスクリプション
POST /<Base path for the stream API from the API Lookup Service>/layers/<Layer ID>/subscribe?mode=serial HTTP/1.1
Host: <Hostname for the stream API from the API Lookup Service>
Authorization: Bearer <Authorization Token>
Content-Type: application/json
並行サブスクリプション
1 つの並列処理の単位は、現在、インバウンド 1000 kbps ( 1 秒あたりのキロバイト数)またはアウトバウンド 2000 kbps ( 1 秒あたりのキロバイト数)のいずれか大きい方になり、最も近い整数に切り上げられます。 同じグループ内のサブスクリプション数が、許容されている並列処理数を超えることはできません。
ストリームレイヤーのインバウンドおよびアウトバウンドのスループットを確認することで、グループ内のサブスクリプションの最大数を計算できます。このスループットは、レイヤーの設定ページのプラットフォームポータルにあります。 まず、設定されているアウトバウンドスループットを 2 で割り、結果を最も近い整数に丸めます。 設定されている最大インバウンドスループットと結果を比較します。 いずれか大きい方がサブスクリプションの最大数です。
たとえば、ストリーム レイヤーが次のように設定されているとします。
インバウンドスループット : 1000 kbps ( 約 1 Mbps) のアウトバウンドスループット : 3000 kbps ( 約 3 Mbps)
グループごとのサブスクリプションの最大数を計算するには、次の手順を実行します。
- 3/2 = 1.5
- 1.5 最も近い整数に切り上げ : 2.
- 2 がインバウンドスループット( 1 )より大きいため、グループ内のサブスクリプションの最大数は 2 です。
別の例 :
インバウンドスループット : 5000 kbps ( 約 5 Mbps) の送信スループット : 2000 kbps (約 2 Mbps )
グループごとのサブスクリプションの最大数を計算するには、次の手順を実行します。
- 2 / 2 = 1
- 1 はすでに整数であるため、切り上げの必要はありません。
- 1 がインバウンドスループット( 5 )よりも少ないため、グループ内のサブスクリプションの最大数は 5 です。
POST /<Base path for the stream API from the API Lookup Service>/layers/<Layer ID>/subscribe?mode=parallel&consumerId=<consumerId> HTTP/1.1
Host: <Hostname for the stream API from the API Lookup Service>
Authorization: Bearer <Authorization Token>
Content-Type: application/json
応答には、メッセージの読み取りに使用する基本 URL およびサブスクリプション ID が含まれています。
{
"nodeBaseURL": "<The base URL to use to access the subscription.>",
"subscriptionId": "<The unique ID for this subscription>"
}
サブスクリプションのプロパティ
次の Kafka Consumer プロパティは、 SUBSCRIBE 要求の HTTP 本体に渡すことで上書きできます。
- auto.commit.interval.ms
- 自動オフセットリセット
- enable.auto.commit
- 最大バイトを取得します
- フェッチ最大待機時間ミリ秒
- 最小バイトを取得します
- グループ ID
- パーティションの最大フェッチバイト数
- 最大ポーリングレコード数
これらはすべて任意です。 指定しない場合は、デフォルトが使用されます。 デフォルトの詳細については、stream
API の API リファレンス を参照してください。
メッセージを読む
サブスクリプションを作成すると、そのサブスクリプションnodeBaseURL
subscriptionId
を使用してストリーム からメッセージを読み取る ことができ、サブスクリプションの作成ステップで取得できます。
GET /layers/<Layer ID>/partitions?subscriptionId=<subscriptionId from prior step> HTTP/1.1
Host: <nodeBaseURL from prior step>
Authorization: Bearer <Authorization Token>
Accept: application/json
応答には、metadata
およびoffset
オブジェクトを含むメッセージの配列が含まれています。 metaData
オブジェクトの内容は、メッセージサイズによって異なります。
- メッセージサイズが 1 MB を超える場合、
dataSize
およびdataHandle
の各フィールドがmetaData
含まれます。 - メッセージサイズが 1 MB 以下の場合、
metaData
にはdata
フィールドのみが含まれます。
たとえば、次の応答では、最初のメッセージは 1 MB 未満で、 2 番目のメッセージは 1 MB を超えています。
{
"messages": [
{
"metaData": {
"partition": "<UTF-8 byte encoded string of HERE Partition ID>",
"data": "<UTF-8 byte encoded string of the actual data>",
"timestamp": <The time the message was accepted by the HERE platform>
},
"offset": {
"partition": <Kafka Topic partition ID>,
"offset": <Kafka Message offset ID>
}
},
{
"metaData": {
"partition": "<UTF-8 byte encoded string of HERE Partition ID>",
"checksum": "<Checksum of payload (default SHA1) encoded as hex>",
"compressedDataSize": <compressed (gzipped) size in bytes of the blob>,
"dataSize": <un-compressed data size in bytes of the blob>,
"dataHandle": "<reference to blob in the Blob service>",
"timestamp": <The time the message was accepted by the HERE platform>
},
"offset": {
"partition": <Kafka Topic partition ID>,
"offset": <Kafka Message offset ID>
}
}
]
}
応答の詳細については、『 API リファレンス for the stream
API 』を参照してください。
ストリームからのデータを使用しなくなった場合は、サブスクリプションを解除する必要があります。 詳細 については、「ストリーム レイヤーからのサブスクライブ解除」を参照してください。
オフセットをコミットします
データの読み取り後、各パーティションから最後に読み取られたメッセージのオフセットをコミットして、アプリケーションのクラッシュなどのサブスクリプションの中断が発生した場合に、アプリケーションが正しいパーティションから新しいメッセージの読み取りを再開できるようにする必要があります。 新しいサブスクリプションでオフセットからデータの読み取りを開始できるため、サブスクリプションを削除してから同じレイヤーのサブスクリプションを再作成する場合にも、オフセットを使用すると便利です。
オフセットをコミットするに は、 stream
API のエンドポイント/offset
を使用してサブスクリプションnodeBaseURL
subscriptionId
を指定し、サブスクリプションを作成ステップで取得します。 リクエスト本文には、各パーティション で最後に読み取られたメッセージのオフセット が含まれている必要があります。 Kafka の消費者ドキュメントに記載されているように、オフセット +1 を渡さないでください。 HERE platform は、必要に応じてリクエスト本文のオフセット に 1 を追加します。
PUT /layers/fake-layer-name/offsets?subscriptionId=<subscriptionId>
Host: <nodeBaseURL>
Authorization: Bearer <Authorization Token>
Accept: application/json
{ "offsets":
[
{
"partition": <Partition ID>,
"offset": <Offset Number>
},
{
"partition": <Partition ID>,
"offset": <Offset Number>
}
]
}
たとえば、最初のオフセットにパーティション 0 およびオフセット 1 を、 2 番目のサブスクリプションにパーティション 1 オフセット 1 を指定した場合、このサブスクリプションを使用して次回データを読み取ると、パーティション 0 のオフセット 7 から開始したメッセージ、およびパーティション 1 のオフセット 1 から開始したメッセージが返されます。
ストリーム データを検索しています
ストリーム レイヤーからデータを読み取る場合、通常は、最後に読んだメッセージの後の最初のメッセージから開始してメッセージを読む必要があります。 代わりに、別のオフセットからデータを読み取ることができ、ストリームで前後にスキップできます。
特定のオフセットからデータを読み取るには :
-
/seek
データの読み取りを開始するオフセットを指定するように、エンドポイントに要求を作成します。 リクエストで nodeBaseURL
、との値 subscriptionId
は、 「サブスクリプションの作成」で説明したサブスクリプションリクエストの応答で返される値です。 例 :
PUT /layers/fake-layer-name/seek?subscriptionId=<subscriptionId> HTTP/1.1
Host: <nodeBaseURL>
Authorization: Bearer <Authorization Token>
Content-Type: application/json
Accept: application/json
{
"offsets":
[
{
"partition": <Partition ID>,
"offset": <Offset Number>
},
{
"partition": <Partition ID>,
"offset": <Offset Number>
}
]
}
-
「 メッセージの読み取り」の説明に従って、データを読み取るように/partitions
エンドポイントに要求を作成します。
認証トークンを更新中です
ストリーム レイヤーからのメッセージを継続的に読み取るに GET /layers/<LayerID>/partitions
は、その要求が使用する認証トークンを有効期限が切れる前に更新する必要があります。 トークンの更新中に、サブスクライブを解除して再度サブスクライブする必要はありません。
トークンの変更中は、サーバーの作業量を増やす必要があるため、パフォーマンスがわずかに低下することがあります。 トークンの更新操作の数を最小限に抑えることをお勧めします。
注
トークンの更新は /partitions
、および /seek
エンドポイントでのみ許可されます。
メッセージの発注保証
強力なメッセージ注文保証 ( ストリーミングパーティションレベル ) を必要とするシステムは、以下の推奨事項に従う必要があります。
- 「並列」サブスクリプションモードではなく、「シリアル」サブスクリプションモードを使用します。パーティションは、同じグループに属するサブスクリプション間で分散できます。「並列」 サブスクリプションモードの ID で、システムの障害やスケールアウト / インが発生した場合、パーティションをサブスクリプション間で再バランスでき、再バランス中に発注保証の失敗につながる可能性があります。
- システムは、特定のサブスクリプションに対するインフライト HTTP リクエストの数を最大 1 つに維持する必要があります。つまり、最初のリクエストを作成し、最初のリクエストへの応答が得られるまで 2 番目のリクエストの作成を待機します。