インデックス レイヤーデータを読み取ります

データ クライアント ライブラリには、 Spark を含むすべてのサポートされているレイヤータイプのデータを含むデータフレームを作成するためのカスタムインデックス レイヤー DataFrameReader というクラスLayerDataFrameReaderが用意されています。

DataFrameReader でサポートされているすべての形式 も、LayerDataFrameReaderでサポート されています。 さらに、 Apache Avro 、 Apache Parquet 、 Protobuf 、および生のバイト配列 ( オクテットストリーム ) などの形式もあります。

インデックス レイヤーから読み取る場合、特定の制限が適用されます。 制限事項および既知の制限事項については、「インデックス レイヤーからデータを取得する」を参照してください。

読み取りプロセス

読み取り操作は、次の手順で動作します。

  1. Spark Connector は、サーバーとの最初の通信から開始し、役立つ情報を取得します。 たとえば、レイヤータイプ、レイヤースキーマ、レイヤーエンコード形式などです
  2. レイヤー内のパーティションは、指定されたフィルタークエリを使用してフィルター処理されます。 クエリーが指定されていない場合、デフォルトでは「 timestamp=ge =0 」の値が使用され、すべてのパーティションが照合されます。
  3. この段階では、レイヤーの形式を把握しています。 Spark 対応のファイル形式を作成できます。パーティション データでは、行 ( レコード ) のイテレータを作成できます。
  4. レイヤータイプおよびパーティション メタデータに応じて、一部の暗黙的な列が各行に追加されます。
  5. 結果の行が Spark フレームワークに渡さ DataFrameれ、ファイナライズされた行が返されます。

ページ番号を使用して読み取ります

Spark では、クエリインデックスデータを並列にサポート しています。必要なパーツの量を定義するだけで、読み取り機にオプション olp.connector.query-parallelism を追加できます。

Scala
Java
val reader = sparkSession
  .readLayer(catalogHrn, layerId)
  .format("raw")
  .query(
    "tileId=INBOUNDINGBOX=(23.648524, 22.689013, 62.284241, 60.218811) and eventType==SignRecognition")
  .option("olp.connector.metadata-columns", true)
  .option("olp.connector.query-parallelism", 100)

if (compressed)
  reader.option("olp.connector.data-decompression-timeout", 1200000)

val df: DataFrame = reader.load()
Dataset<Row> df =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        .format("raw")
        .query(
            "tileId=INBOUNDINGBOX=(23.648524, 22.689013, 62.284241, 60.218811) and eventType==SignRecognition")
        .option("olp.connector.metadata-columns", true)
        .option("olp.connector.query-parallelism", 100)
        .load();

データフレーム列

パーティション データから派生するユーザー定義の列のほかに、 Spark Connector には、データのパーティション分割情報およびパーティションペイロード属性を表すために使用される追加の列があります。

データ列

ユーザー定義の列に対応し、パーティション データから派生します。

レイヤーのパーティション分割列

ユーザー定義のインデックス レイヤーパーティション分割列に対応します。 これらの名前はレイヤー定義と同じですが idx_ 、次のようにタイプ変換の後に接頭辞が付きます。

インデックスタイプ データ型
ブール ブール型
整数 長い
文字列 文字列
HERETile 長い
HERETime 長い

パーティションペイロード属性の列

列名 データ型 意味
mt_metadata 地図 [ 文字列 , 文字列 ] パーティションのメタデータ
mt_timestamp 長い 作成のタイムスタンプ (UTC)
mt_checksum 文字列 ペイロードのチェックサム
mt_crc 文字列 ペイロードの CRC
mt_dataSize 長い ペイロードのサイズ
mt_compressedDataSize 長い ペイロードの圧縮サイズ

プロジェクトの依存関係

HERE platform Spark コネクタを使用してインデックス レイヤーからデータを読み取るアプリケーションを作成する場合 は、「 Spark コネクタの依存関係」の章の説明に従って、必要な依存関係をプロジェクトに追加してください。

寄木細工のエンコードされたデータを読み取ります

次のスニペットで DataFrame は、カタログのインデックス レイヤーから寄木細工でエンコードされたにアクセスする方法を示します。 Parquet スキーマはデータにバンドルされていることに注意してください。 したがって、明示的に形式を指定する必要はありません。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.SparkSession
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer containing parquet-encoded SDII data that at a minimum
// contains the indexing attributes 'tileId' and 'eventType')
val reader = sparkSession
  .readLayer(catalogHrn, layerId)
  //.format("parquet")
  .query(
    "tileId=INBOUNDINGBOX=(23.648524, 22.689013, 62.284241, 60.218811) and eventType==SignRecognition")
  .option("olp.connector.metadata-columns", true)
  .option("olp.connector.query-parallelism", 100)

if (compressed)
  reader.option("olp.connector.data-decompression-timeout", 1200000)

val df = reader.load()

df.printSchema()

df.show()

val messagesWithAtLeastOneSignRecognition = df
  .select("pathEvents.signRecognition")
  .where("size(pathEvents.signRecognition) > 0")

val count = messagesWithAtLeastOneSignRecognition.count()
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// org.apache.spark.sql.SparkSession sparkSession
// HRN catalogHrn (HRN of a catalog that contains the layer $layerId)
// String layerId (ID of an index layer containing parquet-encoded SDII data that at a minimum
// contains the indexing attributes 'tileId' and 'eventType')
Dataset<Row> df =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        // .format("parquet")
        .query(
            "tileId=INBOUNDINGBOX=(23.648524, 22.689013, 62.284241, 60.218811) and eventType==SignRecognition")
        .option("olp.connector.query-parallelism", 100)
        .load();

long messagesWithAtLeastOneSignRecognitionCount =
    df.select("pathEvents.signRecognition")
        .where("size(pathEvents.signRecognition) > 0")
        .count();

Avro でエンコードされたデータを読み取ります

次のスニペットで DataFrame は、カタログのインデックス レイヤーからエンコードされたアブロにアクセスする方法を示します。 Avro スキーマはデータにバンドルされる予定です。 したがって、明示的に形式を指定する必要はありません。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.{DataFrame, SparkSession}
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer containing avro-encoded SDII data that at a minimum
// contains the indexing attributes 'tileId' and 'eventType')
val reader = sparkSession
  .readLayer(catalogHrn, layerId)
  //.format("avro")
  .query(
    "tileId=INBOUNDINGBOX=(23.648524, 22.689013, 62.284241, 60.218811) and eventType==SignRecognition")
  .option("olp.connector.query-parallelism", 100)

if (compressed)
  reader.option("olp.connector.data-decompression-timeout", 1200000)

val df: DataFrame = reader.load()

val messagesWithAtLeastOneSignRecognition = df
  .select("pathEvents.signRecognition")
  .where("size(pathEvents.signRecognition) > 0")

val count = messagesWithAtLeastOneSignRecognition.count()
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// org.apache.spark.sql.SparkSession sparkSession
// HRN catalogHrn (HRN of a catalog that contains the layer $layerId)
// String layerId (ID of an index layer containing avro-encoded SDII data that at a minimum
// contains the indexing attributes 'tileId' and 'eventType')
Dataset<Row> df =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        // .format("avro")
        .query(
            "tileId=INBOUNDINGBOX=(23.648524, 22.689013, 62.284241, 60.218811) and eventType==SignRecognition")
        .option("olp.connector.query-parallelism", 100)
        .load();

Dataset<Row> messagesWithAtLeastOneSignRecognition =
    df.select("pathEvents.signRecognition").where("size(pathEvents.signRecognition) > 0");

long count = messagesWithAtLeastOneSignRecognition.count();

Protobuf でエンコードされたデータを読み取ります

次のスニペットは DataFrame 、カタログのインデックス レイヤーからエンコードされた Protobuf にアクセスする方法を示しています。 Protobuf スキーマはレイヤー設定から参照されることが予想されます。 したがって、明示的に形式を指定する必要はありません。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.size
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer containing protobuf-encoded SDII data that at a minimum
// contains the indexing attributes 'tileId' and 'eventType')
val reader = sparkSession
  .readLayer(catalogHrn, layerId)
  //.format("protobuf")
  .query(
    "tileId=INBOUNDINGBOX=(23.648524, 22.689013, 62.284241, 60.218811) and eventType==SignRecognition")
  .option("olp.connector.metadata-columns", true)
  .option("olp.connector.query-parallelism", 100)

if (compressed)
  reader.option("olp.connector.data-decompression-timeout", 1200000)

val df = reader.load()

val sqlContext = sparkSession.sqlContext
import sqlContext.implicits._

val messagesWithAtLeastOneSignRecognition = df
  .select("mt_dataHandle", "message.pathEvents.signRecognition")
  .where("size(message.pathEvents.signRecognition) > 0")

val dataHandle = messagesWithAtLeastOneSignRecognition
  .select("mt_dataHandle")
  .head()
  .getString(0)

// Protobuf schema is of an SDII MessageList, so `size()` is used to compute the
// length of the `WrappedArray` (one array per SDII Message)
// Resulting `DataFrame` is a single row with a count, so `.head().getInt(0)` is used
// to retrieve the value
val count: Int = messagesWithAtLeastOneSignRecognition
  .select(size($"signRecognition"))
  .head()
  .getInt(0)
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// org.apache.spark.sql.SparkSession sparkSession
// HRN catalogHrn (HRN of a catalog that contains the layer $layerId)
// String layerId (ID of an index layer containing protobuf-encoded SDII data that at a minimum
// contains the indexing attributes 'tileId' and 'eventType')
Dataset<Row> df =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        // .format("protobuf")
        .query(
            "tileId=INBOUNDINGBOX=(23.648524, 22.689013, 62.284241, 60.218811) and eventType==SignRecognition")
        .option("olp.connector.metadata-columns", true)
        .option("olp.connector.query-parallelism", 100)
        .load();

Dataset<Row> messagesWithAtLeastOneSignRecognition =
    df.select("mt_dataHandle", "message.pathEvents.signRecognition")
        .where("size(message.pathEvents.signRecognition) > 0");

String dataHandle =
    messagesWithAtLeastOneSignRecognition.select("mt_dataHandle").head().getString(0);

// Protobuf schema is of an SDII MessageList, so `size()` is used to compute the
// length of the `WrappedArray` (one array per SDII Message)
// Resulting `DataFrame` is a single row with a count, so `.head().getInt(0)` is used
// to retrieve the value
int count =
    messagesWithAtLeastOneSignRecognition
        .select(size(new Column("signRecognition")))
        .head()
        .getInt(0);

レイヤーから Protobuf データを読み取るには、レイヤー設定でスキーマを指定し、アーティファクトサービスで利用できるようにする必要があります。 さらに、スキーマ ds にはバリアントが必要です。 スキーマの管理方法の詳細について は、「 Archetypes 開発者ガイド」を参照してください。

CSV でエンコードされたデータを読み取ります

次のスニペット で DataFrame は、カタログのインデックス レイヤー から CSV でエンコードされたにアクセスする方法を示します。 この例では、 csv 行に列 field1 が整数、 field2 が文字列として含まれています。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of index layer)
val df = sparkSession
  .readLayer(catalogHrn, layerId)
  .query("eventType==SignRecognition")
  .load()

df.select("idx_eventType", "field1").where("field1 > 0").show()

df.printSchema()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer)
Dataset<Row> dataFrame =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        .query("eventId=in=(1,2,3)")
        .load();

dataFrame.select("idx_eventId", "field1").where("field1 > 0").show();

dataFrame.printSchema();

テキストエンコードされたデータを読み取ります

次のスニペット で DataFrame は、カタログのインデックス レイヤー からエンコードされたテキストにアクセスする方法を示します。 この例では、 ROW オブジェクトには文字列としてフィールドデータが含まれています。

注 : 制限事項

テキストデータの読み取り中 value 、各行は既定で文字列列列を持つ各行になります。 したがって、テキストデータソースには value 、行ごとに 1 つの列しかありません。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of index layer)
val df = sparkSession
  .readLayer(catalogHrn, layerId)
  .query("eventType==SignRecognition")
  .load()

df.select("idx_eventId", "value").show()

df.printSchema()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer)
Dataset<Row> dataFrame =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        .query("eventId=in=(1,2,3)")
        .load();

dataFrame.select("idx_eventId", "value").show();

dataFrame.printSchema();

JSON でエンコードされたデータを読み取ります

次のスニペット で DataFrame は、カタログのインデックス レイヤー からエンコードされた JSON にアクセスする方法を示します。 この例では、 JSON オブジェクトに整数としてプロパティ INTVAL が含まれ、文字列として STRVAL が含まれています。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of index layer)
val df = sparkSession
  .readLayer(catalogHrn, layerId)
  .query("eventType==SignRecognition")
  .load()

df.select("idx_eventType", "intVal").where("intVal > 0").show()

df.printSchema()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer)
Dataset<Row> dataFrame =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        .query("eventId=in=(1,2,3)")
        .load();

dataFrame.select("idx_eventId", "intVal").where("intVal > 0").show();

dataFrame.printSchema();

その他の形式を参照してください

次のスニペットでは、カタログのインデックス レイヤーから任意の形式のデータにアクセスする方法を示します。

Scala
Java
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
val schema: StructType = new StructType(
  Array[StructField](
    StructField("mt_dataHandle", DataTypes.StringType, nullable = false, Metadata.empty),
    StructField("signRecognitionCount", DataTypes.IntegerType, nullable = false, Metadata.empty)
  ))
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer containing protobuf-encoded SDII data that at a minimum
// contains the indexing attributes 'tileId' and 'eventType')
/// [spark-index-query-withparts]
val reader = sparkSession
  .readLayer(catalogHrn, layerId)
  .format("raw")
  .query(
    "tileId=INBOUNDINGBOX=(23.648524, 22.689013, 62.284241, 60.218811) and eventType==SignRecognition")
  .option("olp.connector.metadata-columns", true)
  .option("olp.connector.query-parallelism", 100)

if (compressed)
  reader.option("olp.connector.data-decompression-timeout", 1200000)

val df: DataFrame = reader.load()
/// [spark-index-query-withparts]
val dfSignRecognitionCount: DataFrame = df.flatMap { row: Row =>
  val messageList: mutable.Buffer[SdiiMessage.Message] =
    SdiiMessageList.MessageList.parseFrom(row.getAs[Array[Byte]]("data")).getMessageList.asScala

  messageList.map { message =>
    RowFactory.create(row.getAs[Object]("mt_dataHandle"),
                      message.getPathEvents.getSignRecognitionCount.asInstanceOf[Object])
  }
}(RowEncoder(schema))

val messagesWithAtLeastOneSignRecognition = dfSignRecognitionCount
  .select("mt_dataHandle", "signRecognitionCount")
  .where("signRecognitionCount > 0")

val dataHandles = messagesWithAtLeastOneSignRecognition
  .map[String]((r: Row) => r.getAs[String]("mt_dataHandle"))(Encoders.STRING)
  .dropDuplicates()
  .collectAsList()

val count = messagesWithAtLeastOneSignRecognition.count()
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

StructType schema =
    new StructType(
        new StructField[] {
          new StructField("mt_dataHandle", DataTypes.StringType, false, Metadata.empty()),
          new StructField(
              "signRecognitionCount", DataTypes.IntegerType, false, Metadata.empty())
        });
// org.apache.spark.sql.SparkSession sparkSession
// HRN catalogHrn (HRN of a catalog that contains the layer $layerId)
/// [spark-index-query-withparts]
Dataset<Row> df =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        .format("raw")
        .query(
            "tileId=INBOUNDINGBOX=(23.648524, 22.689013, 62.284241, 60.218811) and eventType==SignRecognition")
        .option("olp.connector.metadata-columns", true)
        .option("olp.connector.query-parallelism", 100)
        .load();
/// [spark-index-query-withparts]
Dataset<Row> dfSignRecognitionCount =
    df.flatMap(
        (FlatMapFunction<Row, Row>)
            row ->
                SdiiMessageList.MessageList.parseFrom(row.<byte[]>getAs("data"))
                    .getMessageList().stream()
                    .map(
                        m ->
                            RowFactory.create(
                                row.getAs("mt_dataHandle"),
                                m.getPathEvents().getSignRecognitionCount()))
                    .iterator(),
        RowEncoder.apply(schema));

Dataset<Row> messagesWithAtLeastOneSignRecognition =
    dfSignRecognitionCount
        .select("mt_dataHandle", "signRecognitionCount")
        .where("signRecognitionCount > 0");

List<String> dataHandles =
    messagesWithAtLeastOneSignRecognition
        .map((MapFunction<Row, String>) row -> row.getAs("mt_dataHandle"), Encoders.STRING())
        .dropDuplicates()
        .collectAsList();

long count = messagesWithAtLeastOneSignRecognition.count();
既知の問題 :
  • DataFrame インデックス レイヤー構造体の定義を表す列が含まれていますが、これらの列の相対的な位置は末尾にあり、ここでメタデータ列の前に配置する必要があります。
  • これらの列の値が存在しない場合 NULL は、値の代わりに既定値を使用します。
  • これらの列の場所は行の最後にある必要がありますが、ペイロード列の直後にあります。
メモ :
  • raw 形式は application/octet-stream 、レイヤー設定内を参照します。生のレイヤー設定と混同しないでください。
  • RSQL の詳細 については、 RSQL を参照してください。

」に一致する結果は 件です

    」に一致する結果はありません