バージョン付レイヤーデータを読み取ります
データ クライアント ライブラリには、 Spark を含むすべてのサポートされているレイヤータイプのデータを含むデータフレームを作成するためのカスタムバージョン付レイヤー DataFrameReader というクラスLayerDataFrameReader
が用意されています。
DataFrameReader でサポートされているすべての形式 も、LayerDataFrameReader
でサポート されています。 また、 Apache Avro 、 Apache Parquet 、 Protobuf 、および生バイト配列( octet-stream )などの形式もサポートしています。
読み取りプロセス
読み取り操作は、次の手順で動作します。
- Spark Connector は、サーバーとの最初の通信から開始し、役立つ情報を取得します。 たとえば、レイヤータイプ、レイヤースキーマ、レイヤーエンコード形式などです
- レイヤー内のパーティションは、指定されたフィルタークエリを使用してフィルター処理されます。 クエリが指定されていない場合、値 "mt_version==latest" がデフォルトで使用され、最新バージョンのすべてのパーティションが一致することを意味します。
- この段階では、レイヤーの形式を把握しています。 Spark 対応のファイル形式を作成できます。パーティション データでは、行 ( レコード ) のイテレータを作成できます。
- レイヤータイプおよびパーティション メタデータに応じて、一部の暗黙的な列が各行に追加されます。
- 結果の行が Spark フレームワークに渡さ
DataFrame
れ、ファイナライズされた行が返されます。
データフレーム列
パーティション データから派生するユーザー定義の列のほかに、 Spark Connector には、データのパーティション分割情報およびパーティションペイロード属性を表すために使用される追加の列があります。
データ列
ユーザー定義の列に対応し、パーティション データから派生します。
レイヤーのパーティション分割列
列名 | データ型 | 意味 |
mt_partition | 文字列 | パーティション ID |
mt_version | 長い | レイヤーのバージョン |
パーティションペイロード属性の列
列名 | データ型 | 意味 |
mt_metadata | 地図 [ 文字列 , 文字列 ] | パーティションのメタデータ |
mt_timestamp | 長い | 作成のタイムスタンプ (UTC) |
mt_checksum | 文字列 | ペイロードのチェックサム |
mt_crc | 文字列 | ペイロードの CRC |
mt_dataSize | 長い | ペイロードのサイズ |
mt_compressedDataSize | 長い | ペイロードの圧縮サイズ |
プロジェクトの依存関係
HERE platform Spark コネクタを使用してバージョン付レイヤーからデータを読み取るアプリケーションを作成する場合 は、「 Spark コネクタの依存関係」の章の説明に従って、必要な依存関係をプロジェクトに追加してください。
寄木細工のエンコードされたデータを読み取ります
次のスニペットで DataFrame
は、カタログのバージョン付レイヤーから寄木細工でエンコードされたにアクセスする方法を示します。 Parquet スキーマはデータにバンドルされていることに注意してください。 したがって、明示的に形式を指定する必要はありません。
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
val reader = sparkSession
.readLayer(catalogHrn, layerId)
.query(s"mt_partition==$tileId")
if (compressed)
reader.option("olp.connector.data-decompression-timeout", 300000)
val df = reader.load()
df.printSchema()
val messagesWithAtLeastOneSignRecognition = df
.select("pathEvents.signRecognition")
.where("size(pathEvents.signRecognition) > 0")
val count = messagesWithAtLeastOneSignRecognition.count()
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
Dataset<Row> dataFrame =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(catalogHrn, layerId)
.query(String.format("mt_partition==%s", tileId))
.load();
dataFrame.printSchema();
Dataset<Row> messagesWithAtLeastOneSignRecognition =
dataFrame
.select("pathEvents.signRecognition")
.where("size(pathEvents.signRecognition) > 0");
Long count = messagesWithAtLeastOneSignRecognition.count();
Avro でエンコードされたデータを読み取ります
次のスニペットで DataFrame
は、カタログのバージョン付レイヤーからエンコードされたアブロにアクセスする方法を示します。 Avro スキーマはデータにバンドルされる予定です。 したがって、明示的に形式を指定する必要はありません。
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.{DataFrame, SparkSession}
val reader = sparkSession
.readLayer(catalogHrn, layerId)
.query(s"mt_partition==$tileId")
if (compressed)
reader.option("olp.connector.data-decompression-timeout", 300000)
val df: DataFrame = reader.load()
df.printSchema()
val messagesWithAtLeastOneSignRecognition = df
.select("pathEvents.signRecognition")
.where("size(pathEvents.signRecognition) > 0")
val count = messagesWithAtLeastOneSignRecognition.count()
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
Dataset<Row> dataFrame =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(catalogHrn, layerId)
.query(String.format("mt_partition==%s", tileId))
.load();
dataFrame.printSchema();
Dataset<Row> messagesWithAtLeastOneSignRecognition =
dataFrame
.select("pathEvents.signRecognition")
.where("size(pathEvents.signRecognition) > 0");
Long count = messagesWithAtLeastOneSignRecognition.count();
Protobuf でエンコードされたデータを読み取ります
次のスニペットは DataFrame
、カタログのバージョン付レイヤーからエンコードされた Protobuf にアクセスする方法を示しています。 Protobuf スキーマはレイヤー設定から参照されることが予想されます。 したがって、明示的に形式を指定する必要はありません。
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val dataFrame = sparkSession
.readLayer(catalogHrn, layerId)
.query("mt_partition=in=(DEU, CUB)")
.load()
val tileIdLists = dataFrame
.select(col("partition_name"), explode(col("tile_id")).as("tile"))
tileIdLists.show()
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
Dataset<Row> dataFrame =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(catalogHrn, layerId)
.query("mt_partition=in=(DEU, CUB)")
.load();
Dataset<Row> tileIdLists =
dataFrame.select(
dataFrame.col("partition_name"),
org.apache.spark.sql.functions.explode(dataFrame.col("tile_id")).as("tile"));
tileIdLists.show();
レイヤーから Protobuf データを読み取るには、レイヤー設定でスキーマを指定し、アーティファクトサービスで利用できるようにする必要があります。 さらに、スキーマ ds
にはバリアントが必要です。 スキーマの管理方法の詳細について は、「 Archetypes 開発者ガイド」を参照してください。
JSON でエンコードされたデータを読み取ります
次のスニペット で DataFrame
は、カタログのボラタイル レイヤー からエンコードされた JSON にアクセスする方法を示します。
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
val result = sparkSession
.readLayer(catalogHrn, layerId)
.query(s"mt_partition=in=($partitionId, $anotherPartitionId)")
.load()
.map { row =>
val Seq(tileId, index) = Seq("tileId", "index").map(row.getAs[Long])
(tileId, index)
}(Encoders.tuple[Long, Long](Encoders.scalaLong, Encoders.scalaLong))
.collect()
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
List<Tuple2<Long, Long>> groupedData =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(catalogHrn, layerId)
.query("mt_partition=in=(" + partitionId + ", " + anotherPartitionId + ")")
.load()
.map(
(MapFunction<Row, Tuple2<Long, Long>>)
row -> {
Long tileId = row.getAs("tileId");
Long index = row.getAs("index");
return new Tuple2<>(tileId, index);
},
Encoders.tuple(Encoders.LONG(), Encoders.LONG()))
.collectAsList();
テキストエンコードされたデータを読み取ります
次のスニペット で DataFrame
は、カタログのボラタイル レイヤー からエンコードされたテキストにアクセスする方法を示します。 この例では、 ROW オブジェクトには文字列としてフィールドデータが含まれています。
注 : 制限事項
テキストデータの読み取り中 value
、各行は既定で文字列列列を持つ各行になります。 したがって、テキストデータソースには value
、行ごとに 1 つの列しかありません。
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
val result = sparkSession
.readLayer(catalogHrn, layerId)
.query(s"mt_partition=in=($partitionId, $anotherPartitionId)")
.load()
.map { row =>
val Seq(tileId, index) = row.getAs[String]("value").split("_").map(_.toInt).toSeq
(tileId, index)
}(Encoders.tuple[Int, Int](Encoders.scalaInt, Encoders.scalaInt))
.collect()
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
List<Tuple2<Integer, Integer>> groupedData =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(catalogHrn, layerId)
.query("mt_partition=in=(" + partitionId + ", " + anotherPartitionId + ")")
.load()
.map(
(MapFunction<Row, Tuple2<Integer, Integer>>)
row -> {
String value = row.getAs("value");
String[] result = value.split("_");
Integer tileId = Integer.valueOf(result[0]);
Integer index = Integer.valueOf(result[1]);
return new Tuple2<>(tileId, index);
},
Encoders.tuple(Encoders.INT(), Encoders.INT()))
.collectAsList();
CSV でエンコードされたデータを読み取ります
次のスニペット で DataFrame
は、カタログのボラタイル レイヤー から CSV でエンコードされたにアクセスする方法を示します。 この例では、 csv 行に列 field1 が整数、 field2 が文字列として含まれています。
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
val result = sparkSession
.readLayer(catalogHrn, layerId)
.query(s"mt_partition=in=($partitionId, $anotherPartitionId)")
.load()
.map { row =>
val Seq(tileId, index) = Seq("tileId", "index").map(row.getAs[String])
(tileId, index)
}(Encoders.tuple[String, String](Encoders.STRING, Encoders.STRING))
.collect()
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
List<Tuple2<String, String>> groupedData =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(catalogHrn, layerId)
.query("mt_partition=in=(" + partitionId + ", " + anotherPartitionId + ")")
.load()
.map(
(MapFunction<Row, Tuple2<String, String>>)
row -> {
String tileId = row.getAs("tileId");
String index = row.getAs("index");
return new Tuple2<>(tileId, index);
},
Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
.collectAsList();
次のスニペットでは、カタログのバージョン付レイヤーから任意の形式のデータにアクセスする方法を示します。
import org.apache.spark.sql.{Encoders, Row, SparkSession}
val reader = sparkSession
.readLayer(catalogHrn, layerId)
.query(s"mt_partition==$tileId")
if (compressed)
reader.option("olp.connector.data-decompression-timeout", 300000)
val dataFrame = reader.load()
dataFrame.printSchema()
val dataFrameStringContent = dataFrame
.map[String]((r: Row) => r.getAs[String]("value"))(Encoders.STRING)
.collectAsList()
.asScala
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
Dataset<Row> dataFrame =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(catalogHrn, layerId)
.query(String.format("mt_partition==%s", tileId))
.load();
dataFrame.printSchema();
List<String> dataFrameStringContent =
dataFrame
.map((MapFunction<Row, String>) row -> row.getAs("value"), Encoders.STRING())
.collectAsList();
既知の問題 :
-
DataFrame
パーティション ID やレイヤーバージョンなどのパーティション分割情報を表す列が含まれている必要があります。 現在、これらの列はありません。
注 :
raw
形式は application/octet-stream
、レイヤー設定内を参照します。生のレイヤー設定と混同しないでください。
RSQL の詳細 については、 RSQL を参照してください。