バージョン付レイヤーデータを読み取ります

データ クライアント ライブラリには、 Spark を含むすべてのサポートされているレイヤータイプのデータを含むデータフレームを作成するためのカスタムバージョン付レイヤー DataFrameReader というクラスLayerDataFrameReaderが用意されています。

DataFrameReader でサポートされているすべての形式 も、LayerDataFrameReaderでサポート されています。 また、 Apache Avro 、 Apache Parquet 、 Protobuf 、および生バイト配列( octet-stream )などの形式もサポートしています。

読み取りプロセス

読み取り操作は、次の手順で動作します。

  1. Spark Connector は、サーバーとの最初の通信から開始し、役立つ情報を取得します。 たとえば、レイヤータイプ、レイヤースキーマ、レイヤーエンコード形式などです
  2. レイヤー内のパーティションは、指定されたフィルタークエリを使用してフィルター処理されます。 クエリが指定されていない場合、値 "mt_version==latest" がデフォルトで使用され、最新バージョンのすべてのパーティションが一致することを意味します。
  3. この段階では、レイヤーの形式を把握しています。 Spark 対応のファイル形式を作成できます。パーティション データでは、行 ( レコード ) のイテレータを作成できます。
  4. レイヤータイプおよびパーティション メタデータに応じて、一部の暗黙的な列が各行に追加されます。
  5. 結果の行が Spark フレームワークに渡さ DataFrameれ、ファイナライズされた行が返されます。

データフレーム列

パーティション データから派生するユーザー定義の列のほかに、 Spark Connector には、データのパーティション分割情報およびパーティションペイロード属性を表すために使用される追加の列があります。

データ列

ユーザー定義の列に対応し、パーティション データから派生します。

レイヤーのパーティション分割列

列名 データ型 意味
mt_partition 文字列 パーティション ID
mt_version 長い レイヤーのバージョン

パーティションペイロード属性の列

列名 データ型 意味
mt_metadata 地図 [ 文字列 , 文字列 ] パーティションのメタデータ
mt_timestamp 長い 作成のタイムスタンプ (UTC)
mt_checksum 文字列 ペイロードのチェックサム
mt_crc 文字列 ペイロードの CRC
mt_dataSize 長い ペイロードのサイズ
mt_compressedDataSize 長い ペイロードの圧縮サイズ

プロジェクトの依存関係

HERE platform Spark コネクタを使用してバージョン付レイヤーからデータを読み取るアプリケーションを作成する場合 は、「 Spark コネクタの依存関係」の章の説明に従って、必要な依存関係をプロジェクトに追加してください。

寄木細工のエンコードされたデータを読み取ります

次のスニペットで DataFrame は、カタログのバージョン付レイヤーから寄木細工でエンコードされたにアクセスする方法を示します。 Parquet スキーマはデータにバンドルされていることに注意してください。 したがって、明示的に形式を指定する必要はありません。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an versioned layer containing parquet-encoded SDII data
val reader = sparkSession
  .readLayer(catalogHrn, layerId)
  .query(s"mt_partition==$tileId")

if (compressed)
  reader.option("olp.connector.data-decompression-timeout", 300000)

val df = reader.load()
df.printSchema()

val messagesWithAtLeastOneSignRecognition = df
  .select("pathEvents.signRecognition")
  .where("size(pathEvents.signRecognition) > 0")

val count = messagesWithAtLeastOneSignRecognition.count()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an versioned layer containing parquet-encoded SDII data
Dataset<Row> dataFrame =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        .query(String.format("mt_partition==%s", tileId))
        .load();

dataFrame.printSchema();

Dataset<Row> messagesWithAtLeastOneSignRecognition =
    dataFrame
        .select("pathEvents.signRecognition")
        .where("size(pathEvents.signRecognition) > 0");

Long count = messagesWithAtLeastOneSignRecognition.count();

Avro でエンコードされたデータを読み取ります

次のスニペットで DataFrame は、カタログのバージョン付レイヤーからエンコードされたアブロにアクセスする方法を示します。 Avro スキーマはデータにバンドルされる予定です。 したがって、明示的に形式を指定する必要はありません。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.{DataFrame, SparkSession}
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an versioned layer containing avro-encoded SDII data
val reader = sparkSession
  .readLayer(catalogHrn, layerId)
  .query(s"mt_partition==$tileId")

if (compressed)
  reader.option("olp.connector.data-decompression-timeout", 300000)

val df: DataFrame = reader.load()
df.printSchema()

val messagesWithAtLeastOneSignRecognition = df
  .select("pathEvents.signRecognition")
  .where("size(pathEvents.signRecognition) > 0")

val count = messagesWithAtLeastOneSignRecognition.count()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an versioned layer containing avro-encoded SDII data
Dataset<Row> dataFrame =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        .query(String.format("mt_partition==%s", tileId))
        .load();

dataFrame.printSchema();

Dataset<Row> messagesWithAtLeastOneSignRecognition =
    dataFrame
        .select("pathEvents.signRecognition")
        .where("size(pathEvents.signRecognition) > 0");

Long count = messagesWithAtLeastOneSignRecognition.count();

Protobuf でエンコードされたデータを読み取ります

次のスニペットは DataFrame 、カタログのバージョン付レイヤーからエンコードされた Protobuf にアクセスする方法を示しています。 Protobuf スキーマはレイヤー設定から参照されることが予想されます。 したがって、明示的に形式を指定する必要はありません。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer containing protobuf-encoded SDII data that has copy of real data
// from "indexed-locations" layer of "hrn:here:data:::rib-2" catalog)
val dataFrame = sparkSession
  .readLayer(catalogHrn, layerId)
  .query("mt_partition=in=(DEU, CUB)")
  .load()

val tileIdLists = dataFrame
  .select(col("partition_name"), explode(col("tile_id")).as("tile"))

tileIdLists.show()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer containing protobuf-encoded SDII data that has copy
// of real data
// from "indexed-locations" layer of "hrn:here:data:::rib-2" catalog)
Dataset<Row> dataFrame =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        .query("mt_partition=in=(DEU, CUB)")
        .load();

Dataset<Row> tileIdLists =
    dataFrame.select(
        dataFrame.col("partition_name"),
        org.apache.spark.sql.functions.explode(dataFrame.col("tile_id")).as("tile"));

tileIdLists.show();

レイヤーから Protobuf データを読み取るには、レイヤー設定でスキーマを指定し、アーティファクトサービスで利用できるようにする必要があります。 さらに、スキーマ ds にはバリアントが必要です。 スキーマの管理方法の詳細について は、「 Archetypes 開発者ガイド」を参照してください。

JSON でエンコードされたデータを読み取ります

次のスニペット で DataFrame は、カタログのボラタイル レイヤー からエンコードされた JSON にアクセスする方法を示します。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a versioned layer containing json-encoded data)
val result = sparkSession
  .readLayer(catalogHrn, layerId)
  .query(s"mt_partition=in=($partitionId, $anotherPartitionId)")
  .load()
  .map { row =>
    val Seq(tileId, index) = Seq("tileId", "index").map(row.getAs[Long])
    (tileId, index)
  }(Encoders.tuple[Long, Long](Encoders.scalaLong, Encoders.scalaLong))
  .collect()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

// org.apache.spark.sql.SparkSession sparkSession
// HRN catalogHrn (HRN of a catalog that contains the layer $layerId)
// String layerId (ID of a versioned layer containing json-encoded data)
List<Tuple2<Long, Long>> groupedData =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        .query("mt_partition=in=(" + partitionId + ", " + anotherPartitionId + ")")
        .load()
        .map(
            (MapFunction<Row, Tuple2<Long, Long>>)
                row -> {
                  Long tileId = row.getAs("tileId");
                  Long index = row.getAs("index");
                  return new Tuple2<>(tileId, index);
                },
            Encoders.tuple(Encoders.LONG(), Encoders.LONG()))
        .collectAsList();

テキストエンコードされたデータを読み取ります

次のスニペット で DataFrame は、カタログのボラタイル レイヤー からエンコードされたテキストにアクセスする方法を示します。 この例では、 ROW オブジェクトには文字列としてフィールドデータが含まれています。

注 : 制限事項

テキストデータの読み取り中 value 、各行は既定で文字列列列を持つ各行になります。 したがって、テキストデータソースには value 、行ごとに 1 つの列しかありません。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a versioned layer containing plain text data)
val result = sparkSession
  .readLayer(catalogHrn, layerId)
  .query(s"mt_partition=in=($partitionId, $anotherPartitionId)")
  .load()
  .map { row =>
    val Seq(tileId, index) = row.getAs[String]("value").split("_").map(_.toInt).toSeq
    (tileId, index)
  }(Encoders.tuple[Int, Int](Encoders.scalaInt, Encoders.scalaInt))
  .collect()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

// org.apache.spark.sql.SparkSession sparkSession
// HRN catalogHrn (HRN of a catalog that contains the layer $layerId)
// String layerId (ID of a versioned layer containing plain text data)
List<Tuple2<Integer, Integer>> groupedData =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        .query("mt_partition=in=(" + partitionId + ", " + anotherPartitionId + ")")
        .load()
        .map(
            (MapFunction<Row, Tuple2<Integer, Integer>>)
                row -> {
                  String value = row.getAs("value");
                  String[] result = value.split("_");
                  Integer tileId = Integer.valueOf(result[0]);
                  Integer index = Integer.valueOf(result[1]);
                  return new Tuple2<>(tileId, index);
                },
            Encoders.tuple(Encoders.INT(), Encoders.INT()))
        .collectAsList();

CSV でエンコードされたデータを読み取ります

次のスニペット で DataFrame は、カタログのボラタイル レイヤー から CSV でエンコードされたにアクセスする方法を示します。 この例では、 csv 行に列 field1 が整数、 field2 が文字列として含まれています。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a versioned layer containing CSV text data)
val result = sparkSession
  .readLayer(catalogHrn, layerId)
  .query(s"mt_partition=in=($partitionId, $anotherPartitionId)")
  .load()
  .map { row =>
    val Seq(tileId, index) = Seq("tileId", "index").map(row.getAs[String])
    (tileId, index)
  }(Encoders.tuple[String, String](Encoders.STRING, Encoders.STRING))
  .collect()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

// org.apache.spark.sql.SparkSession sparkSession
// HRN catalogHrn (HRN of a catalog that contains the layer $layerId)
// String layerId (ID of a versioned layer containing CSV text data)
List<Tuple2<String, String>> groupedData =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        .query("mt_partition=in=(" + partitionId + ", " + anotherPartitionId + ")")
        .load()
        .map(
            (MapFunction<Row, Tuple2<String, String>>)
                row -> {
                  String tileId = row.getAs("tileId");
                  String index = row.getAs("index");
                  return new Tuple2<>(tileId, index);
                },
            Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
        .collectAsList();

その他の形式を参照してください

次のスニペットでは、カタログのバージョン付レイヤーから任意の形式のデータにアクセスする方法を示します。

Scala
Java
import org.apache.spark.sql.{Encoders, Row, SparkSession}
val reader = sparkSession
  .readLayer(catalogHrn, layerId)
  .query(s"mt_partition==$tileId")

if (compressed)
  reader.option("olp.connector.data-decompression-timeout", 300000)

val dataFrame = reader.load()

dataFrame.printSchema()

val dataFrameStringContent = dataFrame
  .map[String]((r: Row) => r.getAs[String]("value"))(Encoders.STRING)
  .collectAsList()
  .asScala
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

Dataset<Row> dataFrame =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        .query(String.format("mt_partition==%s", tileId))
        .load();

dataFrame.printSchema();

List<String> dataFrameStringContent =
    dataFrame
        .map((MapFunction<Row, String>) row -> row.getAs("value"), Encoders.STRING())
        .collectAsList();
既知の問題 :
  • DataFrame パーティション ID やレイヤーバージョンなどのパーティション分割情報を表す列が含まれている必要があります。 現在、これらの列はありません。
注 :

raw 形式は application/octet-stream 、レイヤー設定内を参照します。生のレイヤー設定と混同しないでください。

RSQL の詳細 については、 RSQL を参照してください。

」に一致する結果は 件です

    」に一致する結果はありません