ボラタイル レイヤーデータを読み取ります

データ クライアント ライブラリには、 Spark を含むすべてのサポートされているレイヤータイプのデータを含むデータフレームを作成するためのカスタムボラタイル レイヤー DataFrameReader というクラスLayerDataFrameReaderが用意されています。

DataFrameReader でサポートされているすべての形式 も、LayerDataFrameReaderでサポート されています。 さらに、 Apache Avro 、 Apache Parquet 、 Protobuf 、および生のバイト配列 ( オクテットストリーム ) などの形式もあります。

読み取りプロセス

読み取り操作は、次の手順で動作します。

  1. Spark Connector は、サーバーとの最初の通信から開始し、役立つ情報を取得します。 たとえば、レイヤータイプ、レイヤースキーマ、レイヤーエンコード形式などです
  2. レイヤー内のパーティションは、指定されたフィルタークエリを使用してフィルター処理されます。 クエリーが指定されていない場合、デフォルトでは値 "mt_timestamp=g=0 " が使用され、すべてのパーティションが一致することを意味します。
  3. この段階では、レイヤーの形式を把握しています。 Spark 対応のファイル形式を作成できます。パーティション データでは、行 ( レコード ) のイテレータを作成できます。
  4. レイヤータイプおよびパーティション メタデータに応じて、一部の暗黙的な列が各行に追加されます。
  5. 結果の行が Spark フレームワークに渡さ DataFrameれ、ファイナライズされた行が返されます。

データフレーム列

パーティション データから派生するユーザー定義の列のほかに、 Spark Connector には、データのパーティション分割情報およびパーティションペイロード属性を表すために使用される追加の列があります。

データ列

ユーザー定義の列に対応し、パーティション データから派生します。

レイヤーのパーティション分割列

列名 データ型 意味
mt_partition 文字列 パーティション ID

パーティションペイロード属性の列

列名 データ型 意味
mt_dataHandle 文字列 データのハンドラ ( 廃止 )
mt_metadata 地図 [ 文字列 , 文字列 ] パーティション of メタデータ ( 廃止 )
mt_timestamp 長い 作成のタイムスタンプ (UTC)
mt_checksum 文字列 ペイロードのチェックサム
mt_crc 文字列 ペイロードの CRC
mt_dataSize 長い ペイロードのサイズ
mt_compressedDataSize 長い ペイロードの圧縮サイズ

プロジェクトの依存関係

HERE platform Spark コネクタを使用してボラタイル レイヤーからデータを読み取るアプリケーションを作成する場合 は、「 Spark コネクタの依存関係」の章の説明に従って、必要な依存関係をプロジェクトに追加してください。

寄木細工のエンコードされたデータを読み取ります

次のスニペットで DataFrame は、カタログのボラタイル レイヤーから寄木細工でエンコードされたにアクセスする方法を示します。 Parquet スキーマはデータにバンドルされていることに注意してください。 したがって、明示的に形式を指定する必要はありません。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an versioned layer containing parquet-encoded SDII data
val reader = sparkSession
  .readLayer(catalogHrn, layerId)
  .query(s"mt_partition==$tileId")

if (compressed)
  reader.option("olp.connector.data-decompression-timeout", 300000)
val df = reader.load()

df.printSchema()

val messagesWithAtLeastOneSignRecognition = df
  .select("pathEvents.signRecognition")
  .where("size(pathEvents.signRecognition) > 0")

val count = messagesWithAtLeastOneSignRecognition.count()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an volatile layer containing parquet-encoded SDII data
Dataset<Row> dataFrame =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        .query(String.format("mt_partition==%s", tileId))
        .load();

dataFrame.printSchema();

Dataset<Row> messagesWithAtLeastOneSignRecognition =
    dataFrame
        .select("pathEvents.signRecognition")
        .where("size(pathEvents.signRecognition) > 0");

Long count = messagesWithAtLeastOneSignRecognition.count();

Avro でエンコードされたデータを読み取ります

次のスニペットで DataFrame は、カタログのボラタイル レイヤーからエンコードされたアブロにアクセスする方法を示します。 Avro スキーマはデータにバンドルされる予定です。 したがって、明示的に形式を指定する必要はありません。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an versioned layer containing avro-encoded SDII data
val reader = sparkSession
  .readLayer(catalogHrn, layerId)
  .query(s"mt_partition==$tileId")

if (compressed)
  reader.option("olp.connector.data-decompression-timeout", 300000)

val df = reader.load()

df.printSchema()

val messagesWithAtLeastOneSignRecognition = df
  .select("pathEvents.signRecognition")
  .where("size(pathEvents.signRecognition) > 0")

val count = messagesWithAtLeastOneSignRecognition.count()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a volatile layer containing avro-encoded SDII data
Dataset<Row> dataFrame =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        .query(String.format("mt_partition==%s", tileId))
        .load();

Dataset<Row> messagesWithAtLeastOneSignRecognition =
    dataFrame
        .select("pathEvents.signRecognition")
        .where("size(pathEvents.signRecognition) > 0");

Long count = messagesWithAtLeastOneSignRecognition.count();

Protobuf でエンコードされたデータを読み取ります

次のスニペットは DataFrame 、カタログのボラタイル レイヤーからエンコードされた Protobuf にアクセスする方法を示しています。 Protobuf スキーマはレイヤー設定から参照されることが予想されます。 したがって、明示的に形式を指定する必要はありません。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an volatile layer containing protobuf-encoded SDII data that has copy of real data
// from "indexed-locations" layer of "hrn:here:data:::rib-2" catalog)
val dataFrame = sparkSession
  .readLayer(catalogHrn, layerId)
  .query("mt_partition=in=(DEU, CUB)")
  .load()

val tileIdLists = dataFrame
  .select(col("partition_name"), explode(col("tile_id")).as("tile"))

tileIdLists.show()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer containing protobuf-encoded SDII data that has copy
// of real data
// from "indexed-locations" layer of "hrn:here:data:::rib-2" catalog)
Dataset<Row> dataFrame =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        .query("mt_partition=in=(DEU, CUB)")
        .load();

Dataset<Row> tileIdLists =
    dataFrame.select(
        dataFrame.col("partition_name"),
        functions.explode(dataFrame.col("tile_id")).as("tile"));

tileIdLists.show();

レイヤーから Protobuf データを読み取るには、レイヤー設定でスキーマを指定し、アーティファクトサービスで利用できるようにする必要があります。 さらに、スキーマ ds にはバリアントが必要です。 スキーマの管理方法の詳細について は、「 Archetypes 開発者ガイド」を参照してください。

JSON でエンコードされたデータを読み取ります

次のスニペット で DataFrame は、カタログのボラタイル レイヤー からエンコードされた JSON にアクセスする方法を示します。 この例では、 JSON オブジェクトに整数としてプロパティ INTVAL が含まれ、文字列として STRVAL が含まれています。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of volatile layer)
val df = sparkSession
  .readLayer(catalogHrn, layerId)
  .query(s"mt_partition=in=(${partitionIds mkString ", "})")
  .load()

df.select("mt_partition", "intVal").where("intVal > 0").show()

df.printSchema()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an volatile layer containing parquet-encoded SDII data
Dataset<Row> dataFrame =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        .query(
            "mt_partition=in=("
                + partitionId1
                + ", "
                + partitionId2
                + ", "
                + partitionId3
                + ")")
        .load();

dataFrame.select("mt_partition", "intVal").where("intVal > 0").show();

dataFrame.printSchema();

テキストエンコードされたデータを読み取ります

次のスニペット で DataFrame は、カタログのボラタイル レイヤー からエンコードされたテキストにアクセスする方法を示します。 この例では、 ROW オブジェクトには文字列としてフィールドデータが含まれています。

注 : 制限事項

テキストデータの読み取り中 value 、各行は既定で文字列列列を持つ各行になります。 したがって、テキストデータソースには value 、行ごとに 1 つの列しかありません。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of volatile layer)
val df = sparkSession
  .readLayer(catalogHrn, layerId)
  .query(s"mt_partition=in=(${partitionIds mkString ", "})")
  .load()

df.select("mt_partition", "value").show()

df.printSchema()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an volatile layer containing parquet-encoded SDII data
Dataset<Row> dataFrame =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        .query(
            "mt_partition=in=("
                + partitionId1
                + ", "
                + partitionId2
                + ", "
                + partitionId3
                + ")")
        .load();

dataFrame.select("mt_partition", "value").show();

dataFrame.printSchema();

CSV でエンコードされたデータを読み取ります

次のスニペット で DataFrame は、カタログのボラタイル レイヤー から CSV でエンコードされたにアクセスする方法を示します。 この例では、 csv 行に列 field1 が整数、 field2 が文字列として含まれています。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of volatile layer)
val df = sparkSession
  .readLayer(catalogHrn, layerId)
  .query(s"mt_partition=in=(${partitionIds mkString ", "})")
  .load()

df.select("mt_partition", "field1").where("field1 > 0").show()

df.printSchema()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an volatile layer containing parquet-encoded SDII data
Dataset<Row> dataFrame =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        .query(
            "mt_partition=in=("
                + partitionId1
                + ", "
                + partitionId2
                + ", "
                + partitionId3
                + ")")
        .load();

dataFrame.select("mt_partition", "field1").where("field1 > 0").show();

dataFrame.printSchema();

その他の形式を参照してください

次のスニペットでは、カタログのボラタイル レイヤーから任意の形式のデータにアクセスする方法を示します。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.{Encoders, SparkSession}
val reader = sparkSession
  .readLayer(catalogHrn, layerId)
  .query(s"mt_partition==$tileId")

if (compressed)
  reader.option("olp.connector.data-decompression-timeout", 300000)
val dataFrame = reader.load()

dataFrame.printSchema()

val dataFrameStringContent = dataFrame
  .map(_.getAs[Array[Byte]]("data").map(_.toChar).mkString)(Encoders.STRING)
  .first()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

Dataset<Row> dataFrame =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        .query(String.format("mt_partition==%s", tileId))
        .load();

dataFrame.printSchema();

List<String> dataFrameStringContent =
    dataFrame
        .map(
            (MapFunction<Row, String>) row -> new String(row.<byte[]>getAs("data")),
            Encoders.STRING())
        .collectAsList();
注 :

raw 形式は application/octet-stream 、レイヤー設定内を参照します。生のレイヤー設定と混同しないでください。

デフォルトでは、が定義 .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")されています。

インデックスレイヤーおよびバージョンレイヤーとは対照的に、揮発性パーティションは期限切れになる可能性があります。 揮発性データの有効期限が切れるとメタデータが保持されますが、 Spark Connector は、ロードメソッドを呼び出すときに、期限切れのすべてのパーティションをスキップします。

RSQL の詳細 については、 RSQL を参照してください。

」に一致する結果は 件です

    」に一致する結果はありません