ボラタイル レイヤーデータを読み取ります
データ クライアント ライブラリには、 Spark を含むすべてのサポートされているレイヤータイプのデータを含むデータフレームを作成するためのカスタムボラタイル レイヤー DataFrameReader というクラスLayerDataFrameReader
が用意されています。
DataFrameReader でサポートされているすべての形式 も、LayerDataFrameReader
でサポート されています。 さらに、 Apache Avro 、 Apache Parquet 、 Protobuf 、および生のバイト配列 ( オクテットストリーム ) などの形式もあります。
読み取りプロセス
読み取り操作は、次の手順で動作します。
- Spark Connector は、サーバーとの最初の通信から開始し、役立つ情報を取得します。 たとえば、レイヤータイプ、レイヤースキーマ、レイヤーエンコード形式などです
- レイヤー内のパーティションは、指定されたフィルタークエリを使用してフィルター処理されます。 クエリーが指定されていない場合、デフォルトでは値 "mt_timestamp=g=0 " が使用され、すべてのパーティションが一致することを意味します。
- この段階では、レイヤーの形式を把握しています。 Spark 対応のファイル形式を作成できます。パーティション データでは、行 ( レコード ) のイテレータを作成できます。
- レイヤータイプおよびパーティション メタデータに応じて、一部の暗黙的な列が各行に追加されます。
- 結果の行が Spark フレームワークに渡さ
DataFrame
れ、ファイナライズされた行が返されます。
データフレーム列
パーティション データから派生するユーザー定義の列のほかに、 Spark Connector には、データのパーティション分割情報およびパーティションペイロード属性を表すために使用される追加の列があります。
データ列
ユーザー定義の列に対応し、パーティション データから派生します。
レイヤーのパーティション分割列
列名 | データ型 | 意味 |
mt_partition | 文字列 | パーティション ID |
パーティションペイロード属性の列
列名 | データ型 | 意味 |
mt_dataHandle | 文字列 | データのハンドラ ( 廃止 ) |
mt_metadata | 地図 [ 文字列 , 文字列 ] | パーティション of メタデータ ( 廃止 ) |
mt_timestamp | 長い | 作成のタイムスタンプ (UTC) |
mt_checksum | 文字列 | ペイロードのチェックサム |
mt_crc | 文字列 | ペイロードの CRC |
mt_dataSize | 長い | ペイロードのサイズ |
mt_compressedDataSize | 長い | ペイロードの圧縮サイズ |
プロジェクトの依存関係
HERE platform Spark コネクタを使用してボラタイル レイヤーからデータを読み取るアプリケーションを作成する場合 は、「 Spark コネクタの依存関係」の章の説明に従って、必要な依存関係をプロジェクトに追加してください。
寄木細工のエンコードされたデータを読み取ります
次のスニペットで DataFrame
は、カタログのボラタイル レイヤーから寄木細工でエンコードされたにアクセスする方法を示します。 Parquet スキーマはデータにバンドルされていることに注意してください。 したがって、明示的に形式を指定する必要はありません。
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
val reader = sparkSession
.readLayer(catalogHrn, layerId)
.query(s"mt_partition==$tileId")
if (compressed)
reader.option("olp.connector.data-decompression-timeout", 300000)
val df = reader.load()
df.printSchema()
val messagesWithAtLeastOneSignRecognition = df
.select("pathEvents.signRecognition")
.where("size(pathEvents.signRecognition) > 0")
val count = messagesWithAtLeastOneSignRecognition.count()
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
Dataset<Row> dataFrame =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(catalogHrn, layerId)
.query(String.format("mt_partition==%s", tileId))
.load();
dataFrame.printSchema();
Dataset<Row> messagesWithAtLeastOneSignRecognition =
dataFrame
.select("pathEvents.signRecognition")
.where("size(pathEvents.signRecognition) > 0");
Long count = messagesWithAtLeastOneSignRecognition.count();
Avro でエンコードされたデータを読み取ります
次のスニペットで DataFrame
は、カタログのボラタイル レイヤーからエンコードされたアブロにアクセスする方法を示します。 Avro スキーマはデータにバンドルされる予定です。 したがって、明示的に形式を指定する必要はありません。
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
val reader = sparkSession
.readLayer(catalogHrn, layerId)
.query(s"mt_partition==$tileId")
if (compressed)
reader.option("olp.connector.data-decompression-timeout", 300000)
val df = reader.load()
df.printSchema()
val messagesWithAtLeastOneSignRecognition = df
.select("pathEvents.signRecognition")
.where("size(pathEvents.signRecognition) > 0")
val count = messagesWithAtLeastOneSignRecognition.count()
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
Dataset<Row> dataFrame =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(catalogHrn, layerId)
.query(String.format("mt_partition==%s", tileId))
.load();
Dataset<Row> messagesWithAtLeastOneSignRecognition =
dataFrame
.select("pathEvents.signRecognition")
.where("size(pathEvents.signRecognition) > 0");
Long count = messagesWithAtLeastOneSignRecognition.count();
Protobuf でエンコードされたデータを読み取ります
次のスニペットは DataFrame
、カタログのボラタイル レイヤーからエンコードされた Protobuf にアクセスする方法を示しています。 Protobuf スキーマはレイヤー設定から参照されることが予想されます。 したがって、明示的に形式を指定する必要はありません。
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val dataFrame = sparkSession
.readLayer(catalogHrn, layerId)
.query("mt_partition=in=(DEU, CUB)")
.load()
val tileIdLists = dataFrame
.select(col("partition_name"), explode(col("tile_id")).as("tile"))
tileIdLists.show()
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
Dataset<Row> dataFrame =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(catalogHrn, layerId)
.query("mt_partition=in=(DEU, CUB)")
.load();
Dataset<Row> tileIdLists =
dataFrame.select(
dataFrame.col("partition_name"),
functions.explode(dataFrame.col("tile_id")).as("tile"));
tileIdLists.show();
レイヤーから Protobuf データを読み取るには、レイヤー設定でスキーマを指定し、アーティファクトサービスで利用できるようにする必要があります。 さらに、スキーマ ds
にはバリアントが必要です。 スキーマの管理方法の詳細について は、「 Archetypes 開発者ガイド」を参照してください。
JSON でエンコードされたデータを読み取ります
次のスニペット で DataFrame
は、カタログのボラタイル レイヤー からエンコードされた JSON にアクセスする方法を示します。 この例では、 JSON オブジェクトに整数としてプロパティ INTVAL が含まれ、文字列として STRVAL が含まれています。
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
val df = sparkSession
.readLayer(catalogHrn, layerId)
.query(s"mt_partition=in=(${partitionIds mkString ", "})")
.load()
df.select("mt_partition", "intVal").where("intVal > 0").show()
df.printSchema()
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
Dataset<Row> dataFrame =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(catalogHrn, layerId)
.query(
"mt_partition=in=("
+ partitionId1
+ ", "
+ partitionId2
+ ", "
+ partitionId3
+ ")")
.load();
dataFrame.select("mt_partition", "intVal").where("intVal > 0").show();
dataFrame.printSchema();
テキストエンコードされたデータを読み取ります
次のスニペット で DataFrame
は、カタログのボラタイル レイヤー からエンコードされたテキストにアクセスする方法を示します。 この例では、 ROW オブジェクトには文字列としてフィールドデータが含まれています。
注 : 制限事項
テキストデータの読み取り中 value
、各行は既定で文字列列列を持つ各行になります。 したがって、テキストデータソースには value
、行ごとに 1 つの列しかありません。
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
val df = sparkSession
.readLayer(catalogHrn, layerId)
.query(s"mt_partition=in=(${partitionIds mkString ", "})")
.load()
df.select("mt_partition", "value").show()
df.printSchema()
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
Dataset<Row> dataFrame =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(catalogHrn, layerId)
.query(
"mt_partition=in=("
+ partitionId1
+ ", "
+ partitionId2
+ ", "
+ partitionId3
+ ")")
.load();
dataFrame.select("mt_partition", "value").show();
dataFrame.printSchema();
CSV でエンコードされたデータを読み取ります
次のスニペット で DataFrame
は、カタログのボラタイル レイヤー から CSV でエンコードされたにアクセスする方法を示します。 この例では、 csv 行に列 field1 が整数、 field2 が文字列として含まれています。
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
val df = sparkSession
.readLayer(catalogHrn, layerId)
.query(s"mt_partition=in=(${partitionIds mkString ", "})")
.load()
df.select("mt_partition", "field1").where("field1 > 0").show()
df.printSchema()
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
Dataset<Row> dataFrame =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(catalogHrn, layerId)
.query(
"mt_partition=in=("
+ partitionId1
+ ", "
+ partitionId2
+ ", "
+ partitionId3
+ ")")
.load();
dataFrame.select("mt_partition", "field1").where("field1 > 0").show();
dataFrame.printSchema();
次のスニペットでは、カタログのボラタイル レイヤーから任意の形式のデータにアクセスする方法を示します。
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.{Encoders, SparkSession}
val reader = sparkSession
.readLayer(catalogHrn, layerId)
.query(s"mt_partition==$tileId")
if (compressed)
reader.option("olp.connector.data-decompression-timeout", 300000)
val dataFrame = reader.load()
dataFrame.printSchema()
val dataFrameStringContent = dataFrame
.map(_.getAs[Array[Byte]]("data").map(_.toChar).mkString)(Encoders.STRING)
.first()
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
Dataset<Row> dataFrame =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(catalogHrn, layerId)
.query(String.format("mt_partition==%s", tileId))
.load();
dataFrame.printSchema();
List<String> dataFrameStringContent =
dataFrame
.map(
(MapFunction<Row, String>) row -> new String(row.<byte[]>getAs("data")),
Encoders.STRING())
.collectAsList();
注 :
raw
形式は application/octet-stream
、レイヤー設定内を参照します。生のレイヤー設定と混同しないでください。
デフォルトでは、が定義 .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
されています。
インデックスレイヤーおよびバージョンレイヤーとは対照的に、揮発性パーティションは期限切れになる可能性があります。 揮発性データの有効期限が切れるとメタデータが保持されますが、 Spark Connector は、ロードメソッドを呼び出すときに、期限切れのすべてのパーティションをスキップします。
RSQL の詳細 については、 RSQL を参照してください。