レイヤー操作

概要

レイヤーには、同じタイプおよび形式のセマンティック関連データが含まれています。 レイヤーはパーティションと呼ばれる小さな単位に分割され、キャッシュおよび処理のための適度なサイズの単位になります。 パーティションには任意のバイナリデータを保存できます。このデータは変更せずに公開および取得されます。 ただし、コンテンツタイプとスキーマをレイヤーに関連付けることで、データの構造とエンコードを任意で定義できます。 などのコンテンツタイプ application/geo+jsonは、インターネット標準に従って各パーティションに保存されているデータのエンコーディングを記述します。 スキーマで application/x-protobufは、などのコンテンツタイプについて、レイヤー内のパーティションの正確なデータ構造を定義します。これにより、データの作成者と利用者は、エンコードされたデータの生成方法と解釈方法を把握できます。

レイヤーとパーティションの詳細については、Data API を参照してください。

HERE Data SDK for Python では、次の 種類のレイヤーを操作できます

  • バージョン付レイヤー
  • ボラタイル レイヤー
  • インデックス レイヤー
  • ストリーム レイヤー
  • Interactive マップ レイヤー
  • オブジェクト ストア レイヤー

各レイヤータイプには、独自の使用方法とストレージパターンがあり、そのタイプのメソッドを持つクラスが分離されます。 すべてのメソッドの詳細については、レイヤーモジュールの API のドキュメントを参照してください。

  • 次のレイヤーから読み取りセクションでは、サポートされている各レイヤータイプからデータを読み取る方法についての詳細を確認できます。
  • これらの各レイヤータイプにデータを書き込む方法については 、「レイヤーへの書き込み 」セクションを参照してください。
  • カタログの複数のレイヤーへの同時書き込みについては 、「カタログの操作 」セクションを参照してください。
  • 新しいレイヤーの作成および既存のレイヤーの変更については 、「カタログの操作 」セクションを参照してください。

アダプタ

アダプターは、 データのエンコードとデコード、および異なるリプレゼンテーション間でのデータの変換に使用 されます。たとえば、パンダ DataFrame geopandas GeoDataFrame、 Python listdictオブジェクト、 GeoJSON Python ライブラリの GeoJSON オブジェクトなどです。 今後追加のアダプタを追加することも、ユーザーが独自のアダプタを開発して使用し、 HERE Data SDK for Python をさまざまなシステムおよびコンテンツのリプレゼンテーションにインターフェイスすることもできます。

HERE Data SDK for Python には、次のアダプタが含まれています。

  • デフォルトのアダプタ。他のアダプタが指定されていない場合に自動的に使用されます
  • GeoPandas アダプター

デフォルトのアダプタはデフォルトで設定されています。 ユーザーは、さまざまな機能で別のアダプターを指定することもできます。 ただし、すべてのアダプターがすべてのコンテンツタイプおよびリプレゼンテーションをサポートするわけではありません。 パラメーターを指定 encode=False すること decode=False も、読み取り時または書き込み時にエンコード、デコード、および形式変換を無効にすることもできます。 この場合 bytes 、レイヤーを読み取ると、生の形式のデータがユーザーに返されます。同様に、ユーザーは書き込み時にすでにエンコードされているデータを提供する必要があります。

でデータのダウンロードをストリーミングする場合、 stream=Trueデコードは渡されたか decode=Falseのように暗黙的に無効になります。 この状況では、 Iterator[bytes]イテレータを通過するたびにバイトの各チャンクが要求されるようにデータが返されます。

サポートされている形式

次の表に、 HERE Data SDK for Python でネイティブにサポートされているデータ形式、および追加のデータアダプタを使用してサポートされているデータ形式を示します。

レイヤーとコンテンツタイプ encode=False decode=False DefaultAdapter GeoPandasAdapter
バージョン付レイヤー
任意 読み取り / 書き込み bytes
application/(x-) protobuf 読み取り / 書き込み Message 読み取り / 書き込み DataFrame
アプリケーション /x-parquet 読み取り / 書き込み DataFrame
アプリケーション /JSON 読み取り / 書き込み dict/list 読み取り / 書き込み DataFrame
アプリケーション / (vnd) geo+json 読み取り / 書き込み FeatureCollection 読み取り / 書き込み GeoDataFrame
テキスト /CSV 読み取り / 書き込み List[dict] 読み取り / 書き込み DataFrame
ボラタイル レイヤー
任意 読み取り / 書き込み bytes
application/(x-) protobuf 読み取り / 書き込み Message 読み取り / 書き込み DataFrame
アプリケーション /x-parquet 読み取り / 書き込み DataFrame
アプリケーション /JSON 読み取り / 書き込み dict/list 読み取り / 書き込み DataFrame
アプリケーション / (vnd) geo+json 読み取り / 書き込み FeatureCollection 読み取り / 書き込み GeoDataFrame
テキスト /CSV 読み取り / 書き込み List[dict] 読み取り / 書き込み DataFrame
インデックス レイヤー
任意 読み取り / 書き込み bytes
application/(x-) protobuf 読み取り / 書き込み Message 読み取り / 書き込み DataFrame
アプリケーション /x-parquet 読み取り / 書き込み * DataFrame
アプリケーション /JSON 読み取り / 書き込み dict/list 読み取り / 書き込み DataFrame
アプリケーション / (vnd) geo+json 読み取り / 書き込み FeatureCollection 読み取り / 書き込み GeoDataFrame
テキスト /CSV 読み取り / 書き込み List[dict] 読み取り / 書き込み DataFrame
ストリーム レイヤー
任意 読み取り / 書き込み bytes
application/(x-) protobuf 読み取り / 書き込み Message 読み取り / 書き込み DataFrame
アプリケーション /x-parquet 読み取り / 書き込み DataFrame
アプリケーション /JSON 読み取り / 書き込み dict/list 読み取り / 書き込み DataFrame
アプリケーション / (vnd) geo+json 読み取り / 書き込み FeatureCollection 読み取り / 書き込み GeoDataFrame
テキスト /CSV 読み取り / 書き込み List[dict] 読み取り / 書き込み DataFrame
Interactive マップ レイヤー
アプリケーション / (vnd) geo+json 読み取り / 書き込み FeatureCollection 読み取り / 書き込み GeoDataFrame
オブジェクト ストア レイヤー
任意 読み取り / 書き込み bytes

(*) 書き込みは一度に 1 つのパーティションのみでサポートされています

レイヤーから読み込みます

次のレイヤーから、パーティション メタデータおよび各パーティション(パーティション データと呼ばれます)のコンテンツを読み取ることができます。

  • バージョン付き
  • 揮発性
  • インデックス
  • ストリーム

read_partitionsread_stream 度に 1 つの値を消費する関数および戻り値のイテレータ。

パーティション メタデータ ( および一般的な「パーティション」の概念 ) は、インタラクティブマップおよびオブジェクトストアのレイヤーには適用されません。 このページでは、それぞれに適用できる読み取りメソッドについて、これらのレイヤータイプに固有のセクションを参照してください。

データのダウンロードは、 stream=True パラメーターを使用して実行できます。 ストリーム配信されたデータはと同じように返され Iterator[bytes]、イテレーションごとにデータのチャンクがダウンロードされます。 ストリーミング・データではデコードはサポートされていませんstream=Trueデコードが提供された場合、decode=Falseデコードは暗黙的に行われます。 メモリ消費量とダウンロードスループットのバランスを取るデフォルトのチャンクサイズが、 chunk_size パラメータで上書きされない限り使用されます。

以下の読み取り例を実行するには、まず、サンプルのカタログおよびレイヤーを定義する次のコードを実行する必要があります。

from here.platform import Platform

platform = Platform()

oma_catalog = platform.get_catalog('hrn:here:data::olp-here:oma-3')
versioned_layer = oma_catalog.get_layer('topology_geometry_segment')

wx_catalog = platform.get_catalog('hrn:here:data::olp-here:live-weather-eu')
volatile_layer = wx_catalog.get_layer('latest-data')

sdii_catalog = platform.get_catalog('hrn:here:data::olp-here:olp-sdii-sample-berlin-2')
index_layer = sdii_catalog.get_layer('sample-index-layer')

traffic_catalog = platform.get_catalog('hrn:here:data::olp-here:olp-traffic-1')
stream_layer = traffic_catalog.get_layer('traffic-incidents-delta-stream')

samples_catalog = platform.get_catalog('hrn:here:data::olp-here:here-geojson-samples')
objectstore_layer = samples_catalog.get_layer('objectstore')

test_catalog = platform.get_catalog(...)
interactive_map_layer = test_catalog.get_layer(...)

バージョン付レイヤーからパーティション / データを読み取ります

バージョン付レイヤーに保存されているデータを読み取るに は、VersionedLayerクラスの READ_PARTITIONS 関数を参照してください。 パーティション識別子のセットが提供されている場合、この関数は指定されたパーティションに関連付けられているデータを返します。 パーティションが指定されていない場合、レイヤー全体の内容が返されます。

デフォルトでは、照会するカタログのバージョンを最新に指定できます。

read_partitions はレイヤーメタデータを照会し、各パーティションに関連付けられている対応するデータを 1 回の呼び出しでダウンロードしますが、後で必要に応じて、パーティション メタデータのみを照会し、関連するデータを手動で取得およびデコードすることもできます。 パーティション メタデータのみをクエリするに は、 get_partitions_metadata 関数にを使用してください。

パラメーターおよびアダプター固有のパラメーターの完全なリストの詳細については 、 VersionedLayer のドキュメントを参照してください。

例 : 2 つのパーティションの読み取り

read_partitions このメソッドは、要求された各パーティション のオブジェクトに対するイテレータを返します。 各オブジェクトは tuple です。最初の要素は VersionedPartition タイプで、2 番目の要素はそのパーティションの内容です。 パーティション コンテンツは、上記のマトリックスに従って、レイヤー設定で指定されているコンテンツタイプ(および適用されている場合はスキーマ )に従ってデコードされます。

partitions = versioned_layer.read_partitions(partition_ids=[19377307, 19377333])

for p in partitions:
    versioned_partition, partition_content = p
    print(f"{versioned_partition.id}: {type(partition_content)}")
19377307: <class 'SegmentPartition'>
19377333: <class 'SegmentPartition'>

例 : 特定のカタログバージョンを読み取り、デコードをスキップしています

この例では、データはデコードされず partition_content 、タイプもになり bytesます。

partitions = versioned_layer.read_partitions(partition_ids=[19377307, 19377333], version=10, decode=False)

for p in partitions:
    versioned_partition, partition_content = p
    print(f"{versioned_partition.id}: {type(partition_content)}")
19377307: <class 'bytes'>
19377333: <class 'bytes'>

例 : ストリーミングダウンロード

この例では、データがストリーミングされ、 partition_content タイプが異なり Iterator[bytes]ます。

partitions = versioned_layer.read_partitions(partition_ids=[19377307, 19377333], stream=True)

for p in partitions:
    versioned_partition, partition_content = p
    print(f"{versioned_partition.id}: {type(partition_content)}")
19377307: <class 'bytes'>
19377333: <generator object generate at 0x7f9594a11cb0>

例 : メタデータを取得し、データを手動で取得しています

partitions = versioned_layer.get_partitions_metadata(partition_ids=[19377307, 19377333])

for partition in partitions:
    print(partition.id)
    data = partition.get_blob()
    print(data)

ボラタイル レイヤーからパーティション / データを読み取ります

ボラタイル レイヤーに保存されているデータを読み取るに は、VolatileLayerクラスの READ_PARTITIONS 関数を参照してください。 パーティション識別子のセットが提供されている場合、この関数は指定されたパーティションに関連付けられているデータを返します。 パーティションが指定されていない場合、レイヤー全体の内容が返されます。

read_partitions はレイヤーメタデータを照会し、各パーティションに関連付けられている対応するデータを 1 回の呼び出しでダウンロードしますが、後で必要に応じて、パーティション メタデータのみを照会し、関連するデータを手動で取得およびデコードすることもできます。 パーティション メタデータのみをクエリするに は、 get_partitions_metadata 関数にを使用してください。

パラメータおよびアダプタ固有のパラメータの完全な一覧表の詳細について は、 VolatileLayer のドキュメントを参照してください。

例 : 2 つのパーティションの読み取り

read_partitions このメソッドは、要求された各パーティション のオブジェクトに対するイテレータを返します。 各オブジェクトは A で tuple、最初の要素は VolatilePartition タイプ、 2 番目の要素はそのパーティション の内容です。 パーティション コンテンツは、上記のマトリックスに従って、レイヤー設定で指定されているコンテンツタイプ(および適用されている場合はスキーマ )に従ってデコードされます。

partitions = volatile_layer.read_partitions(partition_ids=[92259, 92262])

for p in partitions:
    volatile_partition, partition_content = p
    print(f"{volatile_partition.id}: {type(partition_content)}")
92259: <class 'WeatherConditionPartition'>
92262: <class 'WeatherConditionPartition'>

例 : デコードをスキップしています

この例では、データはデコードされず partition_contents 、タイプもになり bytesます。

partitions = volatile_layer.read_partitions(partition_ids=[92259, 92262], decode=False)

for p in partitions:
    volatile_partition, partition_content = p
    print(f"{volatile_partition.id}: {type(partition_content)}")
92259: <class 'bytes'>
92262: <class 'bytes'>

例 : ストリーミングダウンロード

この例では、データがストリーミングされ、 partition_contents タイプが異なり Iterator[bytes]ます。

partitions = volatile_layer.read_partitions(partition_ids=[92259, 92262], stream=True)

for p in partitions:
    volatile_partition, partition_content = p
    print(f"{volatile_partition.id}: {type(partition_content)}")
92259: <class 'bytes'>
92262: <generator object generate at 0x7f9594a11cb0>

例 : メタデータを取得し、データを手動で取得しています

partitions = volatile_layer.get_partitions_metadata(partition_ids=[92259, 92262])

for partition in partitions:
    print(partition.id)
    data = partition.get_blob()
    print(data)

インデックス レイヤーからパーティション / データを読み取ります

インデックス レイヤーに保存されているデータを読み取るに は、IndexLayerクラスの READ_PARTITIONS 関数を参照してください。 この関数は、 RSQL クエリに一致するインデックスパーティションに関連付けられているすべてのデータを返します。

データは、レイヤー設定で指定されたコンテンツタイプに従ってデコードされます。 自動デコードのオプションは、上記のアダプタサポートマトリックスに示されているコンテンツタイプでのみサポートされています。 コンテンツタイプがサポートされていない場合 decode=False は、を使用する必要があり、読み取り後にデータを個別にデコードする必要があります。 データは、stream=Trueを使用してチャンク単位でストリーミングされる こともあります。ストリーミングではデコードがスキップされ、decode=False

read_partitions RSQL インデックスメタデータを介してクエリを実行し、各パーティションに関連付けられている対応するデータを 1 回の呼び出しでダウンロードできますが、後で必要に応じて、 RSQL 経由でパーティション メタデータのみを介してクエリを実行し、関連するデータを手動で取得およびデコードすることもできます。 パーティション メタデータのみをクエリするに は、 get_partitions_metadata 関数にを使用してください。

パラメーターおよびアダプター固有のパラメーターの完全なリストの詳細について は、 IndexLayer のドキュメントを参照してください。

例 : パーティションを問い合わせて、返されたすべてのデータを読み取ります

返される各フィールド partition のタイプは IndexPartitionで、インデックス レイヤー を作成したユーザーが定義したカスタムフィールドが含まれています。 この例では、データはデコードされず partition_data 、デコードされませ bytesん。

partitions = index_layer.read_partitions(query="hour_from=ge=10", decode=False)

for partition, partition_data in partitions:
    print(partition.fields, partition_data)

例 : クエリパーティションとストリームがデータを返しました

partitions = index_layer.read_partitions(query="hour_from=ge=10", stream=True)

for partition, partition_data in partitions:
    for partition_chunk in partition_data:
        print(partition_chunk)

例 : メタデータを取得し、データを手動で取得しています

partitions = index_layer.get_partitions_metadata(query="hour_from=ge=10")

for partition in partitions:
    print(partition.fields)
    data = partition.get_blob()
    print(data)

例 : 部品ごとにデータを取得するための部品 ID を取得します


parts = index_layer.get_parts(num_requested_parts=4, billing_tag="billing-tag-value")

例 : パーツ ID を使用してメタデータ を取得し、データを手動で取得します


resp = index_layer.get_parts(num_requested_parts=4, billing_tag="billing-tag-value")

for val in resp['parts']:

    partitions = index_layer.get_partitions_metadata(query="hour_from=ge=10", part=val['partId'])

    for partition in partitions:
        print(partition.fields)
        data = partition.get_blob()
        print(data)

ストリーム レイヤーからパーティション / データを読み取ります

ストリーム レイヤーから読み取るに は、まずStreamLayerクラスの SUBSCRIBE 関数を使用してサブスクリプションを作成する必要があります。 この関数は、 HERE platform で Kafka コンシューマをインスタンス化します。このインスタンスは、後で REST API を介して照会され、レイヤーからメッセージが読み取られます。 subscribe 関数はを返し StreamSubscriptionます。 読み取りが完了したら、プラットフォームのリソースを解放してください。

ストリーム レイヤー に保存されているデータを使用するには、READ_STREAM 関数を参照してください。 この関数はストリーム を使用して、メッセージおよび対応する内容を返します。 この方法では、が必要 StreamSubscriptionです。

read_stream はメッセージを使用し、各メッセージに関連付けられている対応するデータを 1 回の呼び出しでダウンロードしますが、後で必要に応じて、メッセージ(パーティション メタデータ)のみを使用および取得し、関連するデータを手動で取得およびデコードすることもできます。 メタデータのみを使用するに は、 get_stream_metadata 関数にを使用してください。 この関数には、も必要 StreamSubscriptionです。

パラメーターおよびアダプター固有のパラメーターの完全な一覧表の詳細については 、 StreamLayer のドキュメントを参照してください。

例 : コンテンツの使用

データは、レイヤー設定で指定されたコンテンツタイプに従ってデコードされます。 実際に返される内容の種類 partition_data は、上のマトリックスに基づいて、使用されているコンテンツの種類とアダプタによって異なります。

返される各 partition タイプは StreamPartitionです。 with ブロックが正常に終了したか、または正常に終了しなかった場合、 subscription.unsubscribe() が内部的に呼び出されます。


with stream_layer.subscribe() as subscription:
    partitions = stream_layer.read_stream(subscription=subscription)
    for partition, content in partitions:
        print(f"{partition.id}: {content}")

例 : デコードをスキップしています

この例では、データはデコードされず partition_data 、デコードされませ bytesん。

subscription = stream_layer.subscribe()

try:
    partitions = stream_layer.read_stream(subscription=subscription, decode=False)
    for partition, content in partitions:
        print(f"{partition.id}: {content}")
finally:
    subscription.unsubscribe()

例 : ストリームデータ

この例では、データはデコードされず partition_data 、デコードされませ Iterator[bytes]ん。

subscription = stream_layer.subscribe()

try:
    partitions = stream_layer.read_stream(subscription=subscription, stream=True)
    for partition, content in partitions:
        print(partition.id)
        for content_chunk in content:
            print(content)
finally:
    subscription.unsubscribe()

例 : メタデータを取得し、データを手動で取得しています

ボラタイル レイヤーの特徴は、バージョン管理やパーティション メタデータなどと比較して、ストリーム (Kafka ストリーム レイヤーのメッセージ ) には十分なサイズのデータがインラインで含まれていることです。 データは、 Blob API を介して保存されるのではなく、各メッセージに直接含めることができます。

get_blob Blob API からデータを取得して返します。get_dataは、ストリーム レイヤーに固有 で、インラインの場合はデータを返し、必要な場合にのみ Blob API からデータを取得します。そのため、 get_data を使用することをお勧めします。

subscription = stream_layer.subscribe()

try:
    partitions = stream_layer.get_stream_metadata(subscription=subscription)

    for partition in partitions:
        print(partition.id, partition.timestamp)
        data = partition.get_data()
        print(data)

finally:
    subscription.unsubscribe()

例 : 直接の Kafka を使用してコンテンツを消費および作成します

直接の Kafka サポートにより、ユーザーは Kafka Consumer および Kafka Producer のインスタンスを取得できます。 これらのインスタンスを使用すると、ユーザーはストリーム レイヤーで書き込みおよび読み取りを行うことができます。 これらのインスタンスは設定可能です。

トピックレベルのコンシューマ設定 の詳細については、 https://kafka.apache.org/11/documentation.html#newconsumerconfigsを参照してください。

トピックレベルのプロデューサー設定 の詳細については、 https://kafka.apache.org/11/documentation.html#producerconfigsを参照してください。

producer = stream_layer.kafka_producer(value_serializer=lambda x:json.dumps(x).encode('utf-8'))
topic = stream_layer.get_kafka_topic()
for x in range(10):
    data = {'x': x, '2x': x*2}
    producer.send(topic, value=data)
producer.close()

consumer = stream_layer.kafka_consumer(value_deserializer=lambda x:json.loads(x.decode('utf-8')))
for message in consumer:
    print(f"Message is {message.value}")
consumer.close()

Interactive マップ レイヤーから機能を読み取ります

このレイヤータイプには、パーティションおよびエンコードされたデータの概念はありません。 生データを読み取る関数や、 decode パラメータをサポートする関数はありません。 インタラクティブなマップレイヤーは 、パーティションではなく、 GeoJSON FeatureCollectionの観点から機能の概念を中心に設計されています。

既定のアダプタ FeatureCollection を使用する場合、または ( Feature 両方の GeoJSON の概念 ) のイテレータが直接返されます。

レイヤーからフィーチャーを取得するための機能の一部を次に示します。

  • get_features
  • search_features
  • iter_features
  • get_features_in_bounding_box
  • spatial_search
  • spatial_search_geometry

例 : を使用して、 Interactive マップ レイヤーから複数の機能を読み取ります get_features

features = interactive_map_layer.get_features(feature_ids=["feature_id1", "feature_id2", "feature_id3"])

例 : を使用して、プロパティに基づいてフィーチャーを検索および取得します search_features

features = interactive_map_layer.search_features(params={"p.name": "foo", "p.type": "bar"})

例 : を使用して、レイヤー内のすべてのフィーチャーを取得します iter_features

for feature in interactive_map_layer.iter_features():
    print(feature)

例 : を使用して、バウンディング ボックスのすべての機能を検索して取得します get_features_by_bbox

bbox_features = interactive_map_layer.get_features_in_bounding_box(bounds=(-171.791110603, 18.91619, -66.96466, 71.3577635769))

例 : を使用して、入力点の指定した半径内のすべてのフィーチャーを検索して取得します spatial_search

features = interactive_map_layer.spatial_search(lng=-95.95417, lat=41.6065, radius=1000)

例 : を使用して、任意の形状内のすべてのフィーチャーを検索および取得します spatial_search_geometry

from shapely.geometry import Polygon

polygon = Polygon([(0, 0), (1, 1), (1, 0)])
features = interactive_map_layer.spatial_search_geometry(geometry=polygon)

宛先ストリーム レイヤー へのインタラクティブマップ レイヤー サブスクリプション。

宛先ストリーム レイヤー への IML サブスクリプションの一部の機能 :

  • get_all_subscriptions
  • subscribe
  • subscription_status
  • subscription_exists
  • get_subscription
  • unsubscribe

例 : すべてのサブスクリプションを一覧表示します get_all_subscriptions


subscriptions_gen = interactive_map_layer.get_all_subscriptions(limit=10)

while True:
    try:
        subscriptions_list = next(subscriptions_gen)
        print([subscription.subscription_hrn for subscription in subscriptions_list])
    except StopIteration:
        break

例 : を使用して宛先ストリーム レイヤー にサブスクライブします subscribe

import uuid
from here.platform.layer import InteractiveMapSubscriptionType

x_idempotency_key=str(uuid.uuid4())
interactive_map_subscription = interactive_map_layer.subscribe(subscription_name="test-iml-subscription-name-123", 
                        description="this is a test iml subscription",
                        destination_catalog_hrn="hrn:here:data::olp-cs:pysdk-test-catalog-for-iml-subs",
                        destination_layer_id="stream-raw-demo",
                        interactive_map_subscription_type=InteractiveMapSubscriptionType.PER_FEATURE)

interactive_map_subscription

例 : サブスクリプションステータスを取得します subscription_status


subscription_status = interactive_map_layer.subscription_status(status_token=interactive_map_subscription.status_token)
subscription_status, subscription_status.status

例 : サブスクリプションが存在します subscription_exists


subscription_exists = interactive_map_layer.subscription_exists(subscription_hrn=interactive_map_subscription.subscription_hrn)
subscription_exists

例 : サブスクリプションを取得します get_subscription

subscription = interactive_map_layer.get_subscription(subscription_hrn=interactive_map_subscription.subscription_hrn)
subscription, subscription.subscription_name, subscription.subscription_hrn

例 : を使用して宛先ストリーム レイヤー のサブスクライブを解除します unsubscribe

import uuid

x_idempotency_key=str(uuid.uuid4())
unsubscribed = unsubscribe(subscription_hrn=interactive_map_subscription.subscription_hrn)
unsubscribed, unsubscribed.status_token

オブジェクト ストア レイヤーからデータを読み取ります

Object Store のレイヤーは、ファイルシステムの動作を模倣したキー / 値ストアを提供します。 キー は、ファイルパス + 名前 ( レイヤーのルートを基準とした相対パス ) と同じです。 対応 する値bytes は、そのファイルの内容と同じです。 この種類のレイヤーには、パーティションの概念はありません。

オブジェクトストアのレイヤーからキー / 値の情報を取得するための関数には、次のものがあります。

  • key_exists
  • list_keys
  • iter_keys
  • get_object_metadata
  • read_object

例 : レイヤーに特定のキーが含まれているかどうかを確認します key_exists

key_found = objectstore_layer.key_exists(key = "berlin/districts_of_berlin_tiled/23618356")

例 : 特定の親キー ( ディレクトリ ) の下にあるすべてのキー ( サブディレクトリおよびファイル ) をとともに一覧表示します list_keys

オプション parent のパラメータで、一覧表のルートとして使用するレイヤーのキー階層のレベルを指定します。 省略すると、レイヤー自体のルートが使用されます。 deep このパラメータは、子孫の階層全体を取得するかTrue( ) 、または直接の子孫のみを取得するか (False) を示します。 省略すると deep 、と見なさ Falseれます。 このメソッドは、キーのリストを文字列で返します。

everything_under_berlin_folder = objectstore_layer.list_keys(parent = "berlin", deep = True)

例 : 指定した親キー ( ディレクトリ ) の下にあるすべてのキーについて、イテレータを取得します iter_keys

このメソッドはに似 list_keysていますが、 List の代わりに Iterator を返します。

berlin_files_iter = objectstore_layer.iter_keys(parent = "berlin", deep = True)

例 : でオブジェクトのメタデータを取得します get_object_metadata

このメソッドは、特定のキーに関連付けられているメタデータを返します。 内容は次のとおりです。

  • 最終変更日
  • データサイズ
  • コンテンツタイプ
  • コンテンツのエンコード
obj_metadata = objectstore_layer.get_object_metadata("berlin/districts_of_berlin_tiled/23618355")

例 : で指定されたキーの読み取りオブジェクト read_object

read_object メソッドには include_metadata 、オブジェクトに関連付けられているメタデータも返すかどうかを指定するオプションのパラメーターがあります。 デフォルトでは、この値はFalseであり 、read_object要求されたコンテンツのみを返します。 <Check TrueAlignment of PHs> の場合、 content plus を含むタプルが object_metadata 返されます。 stream がの場合、 Trueデータは Iterator[bytes] ではなくとして返され bytesます。

geojson_content = objectstore_layer.read_object("berlin/districts_of_berlin_tiled/23618355")

レイヤーに書き込みます

以下の例では、サポートされている各レイヤータイプにデータを公開する方法を示します。

write_partitionswrite_stream 度に 1 つの値を生成するための関数で、イテレータを受け入れます。 が Adapter 使用されている場合、書き込み関数に渡すタイプはアダプタ固有です。

以下の例では、実際のデータはプレースホルダーで表さ ... れ、パーティション識別子は妥当です。

メモ( Jupyter 環境で実行している場合)

write_partitions 50 MB 未満のペイロードで呼び出した場合、すべてのデータが 1 つのトランザクションで公開されます。 データサイズが 50 MB を超えると 、マルチパートアップロード を使用してパフォーマンスが向上します。 マルチパートアップロードに関連する並列化は、明示的に処理されない場合、ジュピターの内部非同期動作に干渉します。 50 MB を超える容量を 1 つのパーティションにアップロードする場合は、公開する前にノートブックに以下の手順を含める必要があります。

import nest_asyncio
import asyncio
nest_asyncio.apply(loop=asyncio.get_event_loop())

バージョン付レイヤーに書き込みます

1 つ以上のバージョンレイヤーに書き込むに Publication は、を最初に作成する必要があります。 バージョン管理されたレイヤーのパブリケーションは、トランザクションと同様に機能 complete パブリケーションまたは cancel その時点までアップロードされたすべてのメタデータをドロップできます。

Publication 使用可能になると、 1 write_partitions つ以上の関数呼び出しを使用してレイヤーにデータを書き込むことができます。 各書き込み機能はレイヤーごとに異なります。 同じレイヤーまたは複数のレイヤーについて、各書き込み関数を複数回呼び出すことができます。 詳細については、 write_partitions を参照してください。

set_partitions_metadata コンテンツを同時にアップロードせずに、レイヤーのパーティションのメタデータを更新および削除するために使用します。 コンテンツは、以前に個別にアップロードする必要があります。 この関数は、パブリケーションの一部としてパーティションを削除する方法も提供します。

1 つ以上のバージョンレイヤーを含むパブリケーションを完了すると、カタログの新しいバージョンが作成されます。 詳細については、 init_publication を参照してください。

例 : パブリケーションコンテキスト管理を使用して複数のレイヤーおよびパーティションを作成しています

次のスニペット は、複数のレイヤーのパーティションが追加または変更される新しいカタログバージョンを作成します。 各パーティション のコンテンツがエンコードされ、アップロードされます。 with 内部的にコールされたブロックが正常に終了 Publication.complete() すると、トランザクションがコミットされます。 with ブロックが正常に終了しなかった場合 Publication.cancel() 、は内部的に呼び出されます。

データは、レイヤー設定で設定されたコンテンツタイプに基づいてエンコードされます。

layerA = catalog.get_layer("A")
layerB = catalog.get_layer("B")

with catalog.init_publication(layers=[layerA, layerB]) as publication:
    layerA.write_partitions(publication, { "a1": ..., "a2": ..., "a3": ... })
    layerB.write_partitions(publication, { 377893751: ..., 377893752: ... })
    layerB.write_partitions(publication, { 377893753: ..., 377893754: ... })

注 : レイヤーのコンテンツタイプがサポート encode=False されていない場合、または生のコンテンツの読み取りまたは書き込みが望ましい場合は、 write_* 関数に渡して、エンコードまたはデコードをスキップし、生のバイトを処理してください。 これは、レイヤーのコンテンツタイプなどに適用 text/plainされます。

例 : エンコードをスキップして複数のレイヤーとパーティションを記述しています

ユーザーは、各 bytes レイヤーに設定されているコンテンツタイプと互換性のある形式で、すでにエンコードされているデータを提供できます。 この場合、 encode=False を指定する必要があります。

layerA = catalog.get_layer("A")
layerB = catalog.get_layer("B")

publication = catalog.init_publication(layers=[layerA, layerB])  # ["A", "B"] also accepted

try:
    layerA.write_partitions(publication, { "a1": bytes(...), "a2": bytes(...) }, encode=False)
    layerB.write_partitions(publication, { 377893751: bytes(...), 377893752: bytes(...) }, encode=False)
    publication.complete()

except:
    publication.cancel()

ボラタイル レイヤーに書き込みます

1 つ以上の揮発性レイヤーに書き込むに Publication は、をまず作成する必要があります。 揮発性レイヤーのパブリケーションは complete 、リソースを解放するための関数を介して不要になった場合に閉じる必要があります。 cancel この関数は、ボラタイル レイヤーの性質上、レイヤーのバージョンが更新されていないため影響を受けません。また、トランザクションをサポートせず、成功した書き込みはロールバックできません。

Publication 使用可能になると、 1 write_partitions つ以上の関数呼び出しを使用してレイヤーにデータを書き込むことができます。 各書き込み機能はレイヤーごとに異なります。 同じレイヤーまたは複数のレイヤーについて、各書き込み関数を複数回呼び出すことができます。 詳細については、 write_partitions を参照してください。

set_partitions_metadata コンテンツを同時にアップロードせずに、レイヤーのパーティションのメタデータを更新および削除するために使用します。 コンテンツは、以前に個別にアップロードする必要があります。 この関数は、パーティションを削除する方法も提供します。 パーティションを削除するもう 1 つの方法は delete_partitions です。

例 : パブリケーションコンテキスト管理を使用して複数のレイヤーおよびパーティションを作成しています

次のスニペット は、 2 つの揮発性レイヤーに書き込みます。 各パーティション のコンテンツがエンコードされ、アップロードされます。 with ブロックが正常に終了すると Publication.complete() 、内部的に呼び出されます。 with ブロックが正常に終了しなかった場合 Publication.cancel() 、は内部的に呼び出されます。

データは、レイヤー設定で設定されたコンテンツタイプに基づいてエンコードされます。

layerA = catalog.get_layer("A")
layerB = catalog.get_layer("B")

with catalog.init_publication(layers=[layerA, layerB]) as publication:
    layerA.write_partitions(publication, { "a1": ..., "a2": ..., "a3": ... })
    layerB.write_partitions(publication, { 377893751: ..., 377893752: ... })

例 : エンコードをスキップして複数のレイヤーとパーティションを記述しています

ユーザーは、各 bytes レイヤーに設定されているコンテンツタイプと互換性のある形式で、すでにエンコードされているデータを提供できます。 この場合、 encode=False を指定する必要があります。

layerA = catalog.get_layer("A")
layerB = catalog.get_layer("B")

publication = catalog.init_publication(layers=[layerA, layerB])  # ["A", "B"] also accepted

try:
    layerA.write_partitions(publication, { "a1": bytes(...), "a2": bytes(...) }, encode=False)
    layerB.write_partitions(publication, { 377893751: bytes(...), 377893752: bytes(...) }, encode=False)

finally:
    publication.complete()

インデックス レイヤーに書き込みます

インデックス レイヤーへの書き込みにはパブリケーションは必要ありません。 現在、インデックス レイヤーへの書き込みは、一度に 1 つのパーティションでサポートされています。

write_single_partition 関数を使用 して、データおよび対応するメタデータをインデックス レイヤーに追加します。 functino delete_partitions を使用 して、 RSQL クエリーと一致するインデックス レイヤーのパーティションを削除します。

インデックス レイヤーメタデータでは 、 set_partitions_metadata 経由でのみ操作できます。 これにより、インデックスパーティションが一度に追加および削除されます。 追加したパーティションのコンテンツは、以前に個別にアップロードする必要があります。

例 : レイヤーにパーティションを 1 つ追加します

データは、レイヤー設定で設定されたコンテンツタイプに基づいてエンコードされます。

fields = {
    "f1": 100,
    "f2": 500
}

index_layer.write_single_partition(data=..., fields=fields)

例 : エンコーディングをスキップしているレイヤーにパーティションを 1 つ追加します

ユーザーは、各 bytes レイヤーに設定されているコンテンツタイプと互換性のある形式で、すでにエンコードされているデータを提供できます。 この場合、 encode=False を指定する必要があります。

fields = {
    "f1": 100,
    "f2": 500
}

index_layer.write_single_partition(data=bytes(...), fields=fields, encode=False)

ストリーム レイヤーに書き込みます

1 つ以上のストリームレイヤーに書き込むに Publication は、を最初に作成する必要があります。 ストリームレイヤーのパブリケーションは complete 、リソースを解放するための機能を使用して、不要になったときに閉じる必要があります。 cancel この関数はストリーム レイヤーの性質上、作成後に Kafka ストリームからメッセージを削除することはできないため、効果がありません。

Publication 使用可能になると、 1 write_stream つ以上の関数呼び出しを使用してレイヤーにデータを書き込むことができます。 各書き込み機能はレイヤーごとに異なります。 同じレイヤーまたは複数のレイヤーについて、各書き込み関数を複数回呼び出すことができます。 詳細については 'write_stream を参照してください

APPEND_STREAM_METADATA を使用 すると、コンテンツを同時にアップロードせずに、レイヤーのメタデータ(メッセージ)のみをストリームに書き込みます。 コンテンツは data 、メッセージの送信前に個別にアップロードするか、メッセージのフィールドに含める必要があります ( サイズが十分である場合 ) 。

例 : コンテキストマネージャを使用してストリームに書き込みます

各パーティション のコンテンツがエンコードされ、アップロードされます。 with ブロックが正常に終了し、内部でコールが発生した場合 Publication.complete()with ブロックが正常に終了しなかった場合 Publication.cancel() 、は内部的に呼び出されます。

データは、レイヤー設定で設定されたコンテンツタイプに基づいてエンコードされます。

layerA = catalog.get_layer("A")
layerB = catalog.get_layer("B")

with catalog.init_publication(layers=[layerA, layerB]) as publication:
    layerA.write_stream(publication, { "a1": ..., "a2": ..., "a3": ... })
    layerB.write_stream(publication, { 377893751: ..., 377893752: ... })

例 : エンコーディングをスキップしてストリームに書き込みを行っています

ユーザーは、各 bytes レイヤーに設定されているコンテンツタイプと互換性のある形式で、すでにエンコードされているデータを提供できます。 この場合、 encode=False を指定する必要があります。

layerA = catalog.get_layer("A")
layerB = catalog.get_layer("B")

publication = catalog.init_publication(layers=[layerA, layerB])  # ["A", "B"] also accepted

try:
    layerA.write_stream(publication, { "a1": bytes(...), "a2": bytes(...) }, encode=False)
    layerB.write_stream(publication, { 377893751: bytes(...), 377893752: bytes(...) }, encode=False)

finally:
    publication.complete()

Interactive マップ レイヤーに書き込みます

このレイヤータイプには、パーティションおよびデータの概念がありません。 生データを書き込む関数や、 encode パラメータをサポートする関数はありません。 Interactive マップ レイヤー API は、 GeoJSON の概念をモデルにしてい FeatureCollectionます。 詳細については、 API のドキュメントを参照してください。

既定のアダプタ FeatureCollection を使用する場合 Feature 、または ( 両方の GeoJSON の概念の ) イテレータがパラメータとして直接渡されます。

例 : GeoJSON 機能の記述

from geojson import FeatureCollection, Feature, Point, Polygon

f1 = Polygon(coordinates=[(0, 0), (0, 1), (1, 0), (0, 0)], properties={"a": 100, "b": 200})
f2 = Point(coordinates=(-1.5, 2.32), properties={"a": 50, "b": 95})
features = FeatureCollection(features=[f1, f2])

interactive_map_layer.write_features(features)

例 : GeoJSON 機能をファイルから記述しています

geojson_file_path = "~/example.geojson"

interactive_map_layer.write_features(from_file=geojson_file_path)

オブジェクト ストア レイヤーに書き込みます

オブジェクト ストア レイヤーにデータを書き込むには、キーを定義し、そのキーに関連付けられるデータを提供する必要があります。

  • キーは、次のいずれかの文字で構成される文字列です。 a-zA-Z0-9.[]=(){}/_-`.
  • キー内で / は、 ( スラッシュ ) 文字が区切り文字として解釈され、フォルダーに似た構造体が定義されます
  • 指定したキーがすでにレイヤーに存在する場合、関連付けられているデータは新しいキーで上書きされます
  • キーに関連付けられるデータは、ローカルファイルまたはバイトオブジェクトのいずれかとして指定できます

データを書き込むときに、アップロードするデータのコンテンツタイプを任意で指定できます。 指定しない application/octet-stream場合、タイプはと見なされます。

例 : ファイルの内容を書き込みます

layerA = catalog.get_layer("A")

layerA.write_object(key = "dir1/name_1", path_or_data = "localdatafile.txt", content_type = "text/plain")

例 : write bytes オブジェクト

layerA = catalog.get_layer("A")

layerA.write_object(key = "dir1/name_2", path_or_data = a_bytes_object )

例 : 既存のオブジェクトを削除します

レイヤーから既存のオブジェクトを削除するに delete_object は、メソッドを使用します。 このメソッドは、 strictTrueキーが存在しない場合に例外を発生させるオプションのパラメーターを提供します。

layerA = catalog.get_layer("A")

layerA.delete_object(key = "dir1/name_1")

例 : 既存のオブジェクトをコピーしています

レイヤーから既存のオブジェクトをコピーするに copy_object は、メソッドを使用します。 このメソッドは、 Trueがターゲットキーのオブジェクトを置き換える場合に使用するオプションのreplaceパラメーターを提供します。

layerA = catalog.get_layer("A")

layerA.copy_object(key = "dir1/name_2", copy_from = "dir1/name_1")

layerA.copy_object(key = "dir1/name_2", copy_from = "dir1/name_1", replace = True)

」に一致する結果は 件です

    」に一致する結果はありません