Spark Connector を使用してデータの読み取りと書き込みを行います
目的: Spark Connector を使用して、カタログ内のさまざまなレイヤーおよびデータ形式からデータを読み書きする方法について理解します。
複雑さ: 初心者向け
所要時間: 30 分
前提条件: プロジェクトでの作業を整理します
ソースコード: ダウンロード
このチュートリアルの例では 、データ クライアント ライブラリが提供する Spark コネクターの使用方法を示します。 これにより、 Spark とのバッチ処理ワークロードのやり取りがサポートされ、 Spark のすべての標準 API および機能を使用してデータの読み取り、書き込み、および削除を行うことができます。 ストリーム処理のワークロードには、代わりに付属の Flink Connector を使用する必要があります。
チュートリアルの主な部分では、次の使用方法について説明します。
- インデックス レイヤーからのデータの読み取り、印刷、および収集を Avro 形式で行います( Spark Connector は形式を推測でき、これを指定する必要はありません)。
- バージョン付レイヤーから Protobuf 形式でデータを読み取り、ボラタイル レイヤーに変換して寄木細工の形式で書き込みます
- バージョン付レイヤーからのデータの読み取り、フィルタリング、拡張、および書き込みをカスタム形式で行います。
準備のステップとして、データソースを作成する必要があります。このデータソースには、適切なレイヤーが含まれたそれぞれのカタログが含まれており、このチュートリアルのメインパートで使用するデータソースが適切に配置されているようにする必要があります。 使用されているデータセットは HERE Map Content カタログから取得され、道路ネットワークのトポロジおよびジオメトリに関する情報が含まれています。
Maven プロジェクトを設定します
プロジェクトの次のフォルダー構造を作成します。
spark-connector
└── src
└── main
├── java
└── resources
└── scala
この操作は、次の bash
1 つのコマンドで実行できます。
mkdir -p spark-connector/src/main/{java,resources,scala}
入力および出力カタログを作成します
pipeline-config.conf
という名前のファイルを作成し、次の内容を入力します。「プロジェクトでの作業の整理」で作成したカタログの {{YOUR_CATALOG_HRN}}
および {{YOUR_OUTPUT_CATALOG_HRN}}
を HERE リソースネームに置き換えます。
pipeline.config {
output-catalog { hrn = "{{YOUR_OUTPUT_CATALOG_HRN}}" }
input-catalogs {
//Please, use hrn:here-cn:data::olp-cn-here:here-map-content-china-2 on China Environment
roadTopologySource { hrn = "hrn:here:data::olp-here:rib-2" }
roadTopologyInput { hrn = "{{YOUR_CATALOG_HRN}}" }
}
}
このチュートリアルでは、パブリックカタログ( HERE Map Content カタログ)を使用します。 カタログは、まずプロジェクトにリンクして、プロジェクト内で使用する必要があります。 これを行うには、{{YOUR_PROJECT_HRN}}
を「プロジェクトでの作業の整理」チュートリアルで作成したプロジェクトの HERE リソースネームで置き換え、次のコマンドを実行します。
olp project resource link {{YOUR_PROJECT_HRN}} hrn:here:data::olp-here:rib-2
コマンドが成功すると、 CLI は次のメッセージを返します。
Project resource hrn:here:data::olp-here:rib-2 has been linked.
この例の POM は、 Spark チュートリアルのパスマッチングの POM と同じです。
親 POM :
<parent>
<groupId>com.here.platform</groupId>
<artifactId>sdk-batch-bom_2.12</artifactId>
<version>2.54.3</version>
<relativePath/>
</parent>
依存関係 :
<dependencies>
<dependency>
<groupId>com.here.platform.pipeline</groupId>
<artifactId>pipeline-interface_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.data.client</groupId>
<artifactId>spark-support_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.hrn</groupId>
<artifactId>hrn_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
</dependencies>
入力カタログと出力カタログの両方を作成する必要があります。 これらの手順を実行するに は、「 OLP コマンド ライン インターフェース ( CLI ) を使用してプロジェクトで作業を整理する」で説明されている手順を実行します。
たとえば、カタログには一意の識別子名を使用する必要 {{YOUR_USERNAME}}-spark-connector-input
があります。
spark-connector-input.json
以下の内容のファイルを作成 {{YOUR_INPUT_CATALOG_ID}}
し、選択した識別子に置き換えます。
注
すべてのタイムスタンプは、エポック( 1970 年 1 月 1 日 00:00 UTC )からの UTC ミリ秒数です。 別のタイムゾーンでアプリケーションを実行する場合は、データのクエリまたはアップロードを行う前に、タイムスタンプが UTC に変換されていることを確認してください。 Java または Scala では、次の関数呼び出しを使用して変換を実行できます。 Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis()
{
"id": "spark-connector-input",
"name": "Simulated road topology data archive (From tutorial) spark-connector-input",
"summary": "Archive of simulated road topology data",
"description": "Archive of simulated road topology data.",
"tags": ["Tutorial", "Simulated"],
"layers": [
{
"id": "index-layer-avro-data",
"name": "index-layer-avro-data",
"summary": "Simulated data.",
"description": "Simulated road topology data.\n\nThis data is meant to be used for demonstration purposes and \"playing\" with data.",
"tags": ["Tutorial", "Simulated"],
"contentType": "application/x-avro-binary",
"layerType": "index",
"indexProperties": {
"indexDefinitions": [
{
"name": "ingestion_timestamp",
"duration": 600000,
"type": "timewindow"
},
{
"name": "tile_id",
"type": "heretile",
"zoomLevel": 12
}
],
"ttl": "unlimited"
},
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "generic"
}
},
{
"id": "versioned-layer-protobuf-data",
"name": "versioned-layer-protobuf-data",
"summary": "Simulated data.",
"description": "Simulated road topology data for versioned-layer-protobuf-data",
"contentType": "application/x-protobuf",
"layerType": "versioned",
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "generic"
},
"schema": {
"hrn": "hrn:here:schema::olp-here:com.here.schema.rib:topology-geometry_v2:2.41.0"
}
},
{
"id": "versioned-layer-custom-data",
"name": "versioned-layer-custom-data",
"summary": "Simulated data.",
"description": "Simulated road topology data for versioned-layer-custom-data",
"contentType": "application/octet-stream",
"layerType": "versioned",
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "generic"
}
}
]
}
出力カタログでは spark-connector-ouput.json
、次のようにファイルに名前を付けることができます。
{
"id": "spark-connector-output",
"name": "Simulated data archive (From tutorial) test-spark-connector-output",
"summary": "Archive of simulated road topology data",
"description": "Archive of simulated road topology data.",
"tags": ["Tutorial", "Simulated"],
"layers": [
{
"id": "volatile-layer-parquet-data",
"name": "volatile-layer-parquet-data",
"summary": "Simulated data.",
"description": "Simulated road topology data.",
"contentType": "application/x-parquet",
"layerType": "volatile",
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "generic"
}
},
{
"id": "versioned-layer-custom-data",
"name": "versioned-layer-custom-data",
"summary": "Simulated data.",
"description": "Simulated road topology data for versioned-layer-custom-data",
"contentType": "application/octet-stream",
"layerType": "versioned",
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "generic"
}
}
]
}
{{YOUR_CATALOG_ID}}
次のコード スニペットに示されているように、を独自の識別子に置き換えます。 また {{YOUR_PROJECT_HRN}}
、「 プロジェクトの作業を整理」の「 HERE リソースネーム 」に置き換え てから、次のコマンドを実行します。
olp catalog create {{YOUR_CATALOG_ID}} \
"Simulated road topology data input from tutorial ({{YOUR_USERNAME}})" \
--config spark-connector-input.json \
--scope {{YOUR_PROJECT_HRN}}
注
レルムで請求タグが必要な場合 は、layer
セクションにbillingTags: ["YOUR_BILLING_TAG"]
プロパティを追加して設定 ファイルを更新します。
データソースを設定します
この手順では、このチュートリアルのメイン部分で使用するデータソースを設定するためのアプリケーションを実装します。 Spark コネクターは HERE 、道路のトポロジーおよび形状のデータを含むソースからデータを読み取り、次のステージで入力として使用されるカタログに書き込むために使用されます。
このためには、道路トポロジおよびジオメトリデータが含まれているデータセットからデータを読み取り、 3 つのデータソースに保存する必要があります。各データソースは、入力カタログを作成するときに上記で定義したいずれかのレイヤータイプおよびデータ形式に対応しています。
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Row, SparkSession}
import com.here.platform.data.client.spark.scaladsl.{
GroupedData,
VersionedDataConverter,
VersionedRowMetadata
}
import org.slf4j.LoggerFactory
object DataSourcesSetupScala extends App {
private val logger = LoggerFactory.getLogger(DataSourcesSetupScala.getClass)
private val pipelineContext = new PipelineContext
// Source for the archived road topology data
private val roadTopologyDataSourceCatalogHrn =
pipelineContext.config.inputCatalogs("roadTopologySource")
private val roadTopologyDataSourceLayerId = "topology-geometry"
// Data sources that will be the input for the SparkConnector application, and will be written to in the setup step
private val inputRoadTopologyCatalogHrn =
pipelineContext.config.inputCatalogs("roadTopologyInput")
private val inputIndexAvroLayerId = "index-layer-avro-data"
private val inputVersionedProtobufLayerId = "versioned-layer-protobuf-data"
private val inputversionedLayerCustomId = "versioned-layer-custom-data"
val sparkSession: SparkSession =
SparkSession.builder().appName("DataSourcesSetupScalaPipeline").getOrCreate()
import sparkSession.implicits._
// Partitions that will be retrieved from the archived weather dataset, corresponding to the cities of Berlin and Munich
// For China, please use partitions 23543287 and 23551652
private val berlinPartition = "23618361"
private val munichPartition = "23611420"
// Reads the data from the roadTopologySource catalog
val sourceRoadTopologyData = sparkSession
.readLayer(roadTopologyDataSourceCatalogHrn, roadTopologyDataSourceLayerId)
.query(s"mt_partition=in=($berlinPartition, $munichPartition)")
.load()
// Performs some transformations on the dataset
val transformedData = sourceRoadTopologyData
.select(col("partition_name"), explode(col("segment")) as "road")
.select(col("partition_name"),
col("road.identifier") as "identifier",
explode(col("road.geometry.point")) as "points",
col("road.length") as "length")
.select("partition_name", "identifier", "points.*", "length")
// For the index layer, all index attributes (in this case tile_id and ingestion_timestamp) have to be present in the
// result as DF row columns, and prefixed with "idx_"
val indexLayerData = transformedData
.withColumn("idx_tile_id", col("partition_name"))
.withColumn("idx_ingestion_timestamp", unix_timestamp())
/*
The resulting schema to be written is the following:
root
|-- partition_name: string (nullable = true)
|-- identifier: string (nullable = true)
|-- latitude: double (nullable = true)
|-- longitude: double (nullable = true)
|-- z_level: integer (nullable = true)
|-- elevation: double (nullable = true)
|-- length: double (nullable = true)
|-- idx_tile_id: string (nullable = true)
|-- idx_ingestion_timestamp: long (nullable = true)
*/
indexLayerData
.writeLayer(inputRoadTopologyCatalogHrn, inputIndexAvroLayerId)
.save()
// For the versioned layer, both catalog version and partitionId attributes have to be present in the result as DF
// row columns, and there needs to be a metadata column (mt_partition) containing the partition name
// The data saved in protobuf format needs to have its schema specified in the respective layer definition.
val versionedLayerData = sourceRoadTopologyData
.withColumn("mt_partition", col("partition_name"))
/*
The resulting schema to be written is the following:
root
|-- partition_name: string (nullable = true)
|-- node: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- identifier: string (nullable = true)
| | |-- segment_ref: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- partition_name: string (nullable = true)
| | | | |-- identifier: string (nullable = true)
| | |-- geometry: struct (nullable = true)
| | | |-- latitude: double (nullable = true)
| | | |-- longitude: double (nullable = true)
| | | |-- z_level: integer (nullable = true)
| | | |-- elevation: double (nullable = true)
|-- segment: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- identifier: string (nullable = true)
| | |-- start_node_ref: struct (nullable = true)
| | | |-- partition_name: string (nullable = true)
| | | |-- identifier: string (nullable = true)
| | |-- end_node_ref: struct (nullable = true)
| | | |-- partition_name: string (nullable = true)
| | | |-- identifier: string (nullable = true)
| | |-- geometry: struct (nullable = true)
| | | |-- point: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- latitude: double (nullable = true)
| | | | | |-- longitude: double (nullable = true)
| | | | | |-- z_level: integer (nullable = true)
| | | | | |-- elevation: double (nullable = true)
| | |-- length: double (nullable = true)
|-- mt_partition: string (nullable = true)
*/
versionedLayerData
.writeLayer(inputRoadTopologyCatalogHrn, inputVersionedProtobufLayerId)
.save()
// For the custom layer, there needs to be a metadata column (mt_partition) containing the partition name, and another
// column (data) containing the serialized data
val customRawLayerId = transformedData
.select(col("length"))
.map(v => s"${lit("road-partition-")}:${lit(v.mkString).toString}".getBytes)
.select(col("value") as "data")
// A new column is added containing one partition id for each line in the input data, in the format "partition-{{number}}"
.withColumn("mt_partition", concat(lit("partition-"), monotonically_increasing_id))
/*
The resulting schema to be written is the following:
root
|-- data: binary (nullable = true)
|-- mt_partition: string (nullable = false)
*/
customRawLayerId
.writeLayer(inputRoadTopologyCatalogHrn, inputversionedLayerCustomId)
.withDataConverter(new VersionedDataConverter {
override def serializeGroup(rowMetadata: VersionedRowMetadata,
rows: Iterator[Row]): GroupedData[VersionedRowMetadata] = {
val bytes = rows.next().getAs[Array[Byte]]("data")
GroupedData(rowMetadata, bytes)
}
})
.save()
logger.info(s"Finished setting up data sources in catalog: $inputRoadTopologyCatalogHrn")
sparkSession.stop()
}
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import com.here.platform.data.client.spark.javadsl.VersionedDataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.VersionedRowMetadata;
import com.here.platform.pipeline.PipelineContext;
import java.util.Iterator;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DataSourcesSetup {
private static PipelineContext pipelineContext = new PipelineContext();
// Source for the archived road topology data
private static HRN roadTopologyDataSourceCatalogHrn =
pipelineContext.getConfig().getInputCatalogs().get("roadTopologySource");
private static String roadTopologyDataSourceLayerId = "topology-geometry";
// Data sources that will be the input for the SparkConnector application, and will be written to
// in the setup step
private static HRN inputRoadTopologyCatalogHrn =
pipelineContext.getConfig().getInputCatalogs().get("roadTopologyInput");
private static String inputIndexAvroLayerId = "index-layer-avro-data";
private static String inputVersionedProtobufLayerId = "versioned-layer-protobuf-data";
private static String inputversionedLayerCustomId = "versioned-layer-custom-data";
private static SparkSession sparkSession =
SparkSession.builder().appName("DataSourcesSetupPipeline").getOrCreate();
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger(DataSourcesSetup.class);
// Partitions that will be retrieved from the archived weather dataset, corresponding to the
// cities of Berlin and Munich
// For China, please use partitions 23543287 and 23551652
String berlinPartition = "23618361";
String munichPartition = "23611420";
// Reads the data from the roadTopologySource catalog
Dataset<Row> sourceRoadTopologyData =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(roadTopologyDataSourceCatalogHrn, roadTopologyDataSourceLayerId)
.query(String.format("mt_partition=in=(%s, %s)", berlinPartition, munichPartition))
.load();
// Performs some transformations on the dataset
Dataset<Row> transformedData =
sourceRoadTopologyData
.select(col("partition_name"), explode(col("segment")).as("road"))
.select(
col("partition_name"),
col("road.identifier").as("identifier"),
explode(col("road.geometry.point")),
col("road.length").as("length"))
.select("partition_name", "identifier", "col.*", "length");
// For the index layer, all index attributes (in this case tile_id and ingestion_timestamp) have
// to be present in the result as DF row columns, and prefixed with "idx_"
Dataset<Row> indexLayerData =
transformedData
.withColumn("idx_tile_id", col("partition_name"))
.withColumn("idx_ingestion_timestamp", unix_timestamp());
JavaLayerDataFrameWriter.create(indexLayerData)
.writeLayer(inputRoadTopologyCatalogHrn, inputIndexAvroLayerId)
.save();
/*
The resulting schema to be written is the following:
root
|-- partition_name: string (nullable = true)
|-- identifier: string (nullable = true)
|-- latitude: double (nullable = true)
|-- longitude: double (nullable = true)
|-- z_level: integer (nullable = true)
|-- elevation: double (nullable = true)
|-- length: double (nullable = true)
|-- idx_tile_id: string (nullable = true)
|-- idx_ingestion_timestamp: long (nullable = true)
*/
// For the versioned layer, both catalog version and partitionId attributes have to be present
// in the result as DF row columns, and there needs to be a metadata column (mt_partition)
// containing the partition name.
// The data saved in protobuf format needs to have its schema specified in the respective layer
// definition.
Dataset<Row> versionedLayerData =
sourceRoadTopologyData.withColumn("mt_partition", col("partition_name"));
JavaLayerDataFrameWriter.create(versionedLayerData)
.writeLayer(inputRoadTopologyCatalogHrn, inputVersionedProtobufLayerId)
.save();
/*
The resulting schema to be written is the following:
root
|-- partition_name: string (nullable = true)
|-- node: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- identifier: string (nullable = true)
| | |-- segment_ref: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- partition_name: string (nullable = true)
| | | | |-- identifier: string (nullable = true)
| | |-- geometry: struct (nullable = true)
| | | |-- latitude: double (nullable = true)
| | | |-- longitude: double (nullable = true)
| | | |-- z_level: integer (nullable = true)
| | | |-- elevation: double (nullable = true)
|-- segment: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- identifier: string (nullable = true)
| | |-- start_node_ref: struct (nullable = true)
| | | |-- partition_name: string (nullable = true)
| | | |-- identifier: string (nullable = true)
| | |-- end_node_ref: struct (nullable = true)
| | | |-- partition_name: string (nullable = true)
| | | |-- identifier: string (nullable = true)
| | |-- geometry: struct (nullable = true)
| | | |-- point: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- latitude: double (nullable = true)
| | | | | |-- longitude: double (nullable = true)
| | | | | |-- z_level: integer (nullable = true)
| | | | | |-- elevation: double (nullable = true)
| | |-- length: double (nullable = true)
|-- mt_partition: string (nullable = true)
*/
// For the custom layer, there needs to be a metadata column (mt_partition) containing the
// partition name, and another column (data) containing the serialized data
Dataset<Row> customRawLayerId =
transformedData
.select(col("length"))
.map((MapFunction<Row, byte[]>) row -> row.toString().getBytes(), Encoders.BINARY())
.select(col("value").as("data"))
// A new column is added containing one partition id for each line in the input data, in
// the format "partition-{{number}}"
.withColumn("mt_partition", concat(lit("partition-"), monotonically_increasing_id()));
JavaLayerDataFrameWriter.create(customRawLayerId)
.writeLayer(inputRoadTopologyCatalogHrn, inputversionedLayerCustomId)
.withDataConverter(
new VersionedDataConverter() {
@Override
public GroupedData<VersionedRowMetadata> serializeGroup(
VersionedRowMetadata rowMetadata, Iterator<Row> rows) {
byte[] bytes = rows.next().getAs("data");
return new GroupedData<>(rowMetadata, bytes);
}
})
.save();
/*
The resulting schema to be written is the following:
root
|-- data: binary (nullable = true)
|-- mt_partition: string (nullable = false)
*/
logger.info(
String.format(
"Finished setting up data sources in catalog: %s", inputRoadTopologyCatalogHrn));
sparkSession.stop();
}
}
Spark Connector アプリケーションを実装します
このアプリケーションは、前のステージで作成したデータソースを使用して、異なるレイヤーおよびデータ形式から読み取ります。 これにより、結果のデータに何らかの変換が実行され、前に作成されたカタログから出力レイヤーに書き込みます。
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import com.here.platform.data.client.spark.scaladsl.{
GroupedData,
VersionedDataConverter,
VersionedRowMetadata
}
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, Row, SparkSession}
import org.slf4j.LoggerFactory
object SparkConnectorScala extends App {
private val logger = LoggerFactory.getLogger(SparkConnectorScala.getClass)
private val pipelineContext = new PipelineContext
// Defines the input / output catalogs to be read / written to
private val inputRoadTopologyCatalogHrn =
pipelineContext.config.inputCatalogs("roadTopologyInput")
private val outputRoadTopologyCatalogHrn = pipelineContext.config.outputCatalog
// Defines the layers used to read / write data
private val inputIndexAvroLayerId = "index-layer-avro-data"
private val inputVersionedProtobufLayerId = "versioned-layer-protobuf-data"
private val versionedLayerCustomId = "versioned-layer-custom-data"
private val outputVolatileParquetLayerId = "volatile-layer-parquet-data"
val sparkSession: SparkSession =
SparkSession.builder().appName("SparkConnectorScalaPipeline").getOrCreate()
// This is needed for implicit conversions
import sparkSession.implicits._
// Values for partitions and tile id to be used in queries
// For China, please use partitions 23543287 and 23551652
private val berlinPartition = "23618361"
private val munichPartition = "23611420"
// Task 1: Reading data from index layer in the avro format, and printing schema and data
// This step uses LayerDataFrameReader to read from the index layer. In this example, two index attributes
// (tile_id and weather_timestamp) are present, and their respective rows are prefixed with "idx_"
val avroData: DataFrame = sparkSession
.readLayer(inputRoadTopologyCatalogHrn, inputIndexAvroLayerId)
// A query needs to be specified, either selecting at least one tile_id, or an weather_timestamp.
// In order to retrieve the full dataset, you can use "ingestion_timestamp > 0"
.query(s"tile_id==$munichPartition")
.load()
// Displaying the schema and the content of the DataFrame
avroData.printSchema()
avroData.show()
// Collecting the results into Array[Row]
avroData.collect()
/*
The resulting schema is the following:
root
|-- partition_name: string (nullable = true)
|-- identifier: string (nullable = true)
|-- latitude: double (nullable = true)
|-- longitude: double (nullable = true)
|-- z_level: integer (nullable = true)
|-- elevation: double (nullable = true)
|-- length: double (nullable = true)
|-- mt_dataHandle: string (nullable = true)
|-- mt_metadata: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- mt_timestamp: long (nullable = true)
|-- mt_checksum: string (nullable = true)
|-- mt_crc: string (nullable = true)
|-- mt_dataSize: long (nullable = true)
|-- idx_tile_id: long (nullable = true)
|-- idx_ingestion_timestamp: long (nullable = true)
*/
// Task 2: Read data from versioned/protobuf layer, filter it, transform it (select, filter, groupBy, agg, etc.)
// and write it to volatile/parquet layer.
// In order to retrieve data from the versioned layer, we need to specify the partition(s) in the query (mt_partition).
// Since this is protobuf data, the layer definition needs to include a field with the used schema
val protobufData: DataFrame = sparkSession
.readLayer(inputRoadTopologyCatalogHrn, inputVersionedProtobufLayerId)
.query(s"mt_partition=in=($berlinPartition, $munichPartition)")
.load()
// We can create a case class that will be used to encode the data in a type-safe way
case class RoadSegmentTopology(identifier: String,
partition_name: String,
points: Array[SegmentPoint],
length: Double)
case class SegmentPoint(latitude: String, longitude: String, z_level: Integer, elevation: Double)
// In this step, we read the data as RoadTopology data
val transformedData = protobufData
.select(col("partition_name"), explode(col("segment")) as "road")
.select(col("road.identifier") as "identifier",
col("partition_name"),
col("road.geometry.point") as "points",
col("road.length") as "length")
.as[RoadSegmentTopology]
// We will calculate the average segment length per partition, as well as standard deviation
// and total number of segments
val statsByPartition: DataFrame = transformedData
.groupBy("partition_name")
.agg(round(avg("length")) as "average_length",
round(stddev("length")) as "std_length",
count("points") as "number_of_points")
// In order to write to a volatile layer, we need a the column mt_partition that specifies the partition
.withColumn("mt_partition", col("partition_name"))
/*
The resulting schema is the following:
root
|-- partition_name: string (nullable = true)
|-- average_length: double (nullable = true)
|-- std_length: double (nullable = true)
|-- number_of_points: long (nullable = false)
|-- mt_partition: string (nullable = true)
*/
statsByPartition
.writeLayer(outputRoadTopologyCatalogHrn, outputVolatileParquetLayerId)
.save()
// Task 3: Read data from versioned/custom layer (with custom format), filter augment it and write it to
// another versioned/custom layer.
val customReadData: Dataset[String] = sparkSession
.readLayer(inputRoadTopologyCatalogHrn, versionedLayerCustomId)
.query(s"mt_partition=in=(partition-1, partition-2, partition-3)")
.load()
.map { row =>
row.getAs[Array[Byte]]("data").map(_.toChar).mkString
}(Encoders.STRING)
val customData: DataFrame = customReadData
.map(_.getBytes)
.toDF
.withColumn("data", col("value"))
.select(col("value") as "data")
// A new column is added containing one partition id for each line in the input data, in the format "partition-{{number}}"
.withColumn("mt_partition", concat(lit("id-"), monotonically_increasing_id))
/*
The resulting schema is the following:
root
|-- data: binary (nullable = true)
|-- mt_partition: string (nullable = false)
*/
customData
.writeLayer(outputRoadTopologyCatalogHrn, versionedLayerCustomId)
// A DataConverter needs to be provided for the write operation to succeed
.withDataConverter(new VersionedDataConverter {
override def serializeGroup(rowMetadata: VersionedRowMetadata,
rows: Iterator[Row]): GroupedData[VersionedRowMetadata] = {
val bytes = rows.next().getAs[Array[Byte]]("data")
GroupedData(rowMetadata, bytes)
}
})
.save()
logger.info(s"Finished setting up data sources in catalog: $outputRoadTopologyCatalogHrn")
sparkSession.stop()
}
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import com.here.platform.data.client.spark.javadsl.VersionedDataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.VersionedRowMetadata;
import com.here.platform.pipeline.PipelineContext;
import java.io.Serializable;
import java.util.Iterator;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SparkConnector {
private static final PipelineContext pipelineContext = new PipelineContext();
// Defines the input / output catalogs to be read / written to
private static final HRN inputRoadTopologyCatalogHrn =
pipelineContext.getConfig().getInputCatalogs().get("roadTopologyInput");
private static final HRN outputRoadTopologyCatalogHrn =
pipelineContext.getConfig().getOutputCatalog();
// Defines the layers used to read / write data
private static final String INPUT_INDEX_AVRO_LAYER_ID = "index-layer-avro-data";
private static final String INPUT_VERSIONED_PROTOBUF_LAYER_ID = "versioned-layer-protobuf-data";
private static final String VERSIONED_LAYER_CUSTOM_ID = "versioned-layer-custom-data";
private static final String OUTPUT_VOLATILE_PARQUET_LAYER_ID = "volatile-layer-parquet-data";
public static void main(String[] args) {
SparkSession sparkSession =
SparkSession.builder().appName("SparkConnectorPipeline").getOrCreate();
Logger logger = LoggerFactory.getLogger(SparkConnector.class);
// Values for partitions and tile id to be used in queries
// For China, please use partitions 23543287 and 23551652
String berlinPartition = "23618361";
String munichPartition = "23611420";
// Task 1: Reading data from index layer in the avro format, and printing schema and data
// This step uses LayerDataFrameReader to read from the index layer. In this example, two index
// attributes
// (tile_id and weather_timestamp) are present, and their respective rows are prefixed with
// "idx_"
Dataset<Row> avroData =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(inputRoadTopologyCatalogHrn, INPUT_INDEX_AVRO_LAYER_ID)
// A query needs to be specified, either selecting at least one tile_id, or an
// weather_timestamp.
// In order to retrieve the full dataset, you can use "ingestion_timestamp > 0"
.query(String.format("tile_id==%s", munichPartition))
.load();
// Displaying the schema and the content of the DataFrame
avroData.printSchema();
avroData.show();
// Collecting the results into Array[Row]
avroData.collect();
/*
The resulting schema is the following:
root
|-- partition_name: string (nullable = true)
|-- identifier: string (nullable = true)
|-- latitude: double (nullable = true)
|-- longitude: double (nullable = true)
|-- z_level: integer (nullable = true)
|-- elevation: double (nullable = true)
|-- length: double (nullable = true)
|-- mt_dataHandle: string (nullable = true)
|-- mt_metadata: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- mt_timestamp: long (nullable = true)
|-- mt_checksum: string (nullable = true)
|-- mt_crc: string (nullable = true)
|-- mt_dataSize: long (nullable = true)
|-- idx_tile_id: long (nullable = true)
|-- idx_ingestion_timestamp: long (nullable = true)
*/
// Task 2: Read data from versioned/protobuf layer, filter it, transform it (select, filter,
// groupBy, agg, etc.)
// and write it to volatile/parquet layer.
// In order to retrieve data from the versioned layer, we need to specify the partition(s) in
// the query (mt_partition).
// Since this is protobuf data, the layer definition needs to include a field with the used
// schema
Dataset<Row> protobufData =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(inputRoadTopologyCatalogHrn, INPUT_VERSIONED_PROTOBUF_LAYER_ID)
.query(String.format("mt_partition=in=(%s, %s)", berlinPartition, munichPartition))
.load();
// We can create a class that will be used to encode the data in a type-safe way
Encoder<RoadSegmentTopology> roadSegmentTopologyEncoder =
Encoders.bean(RoadSegmentTopology.class);
// In this step, we read the data as RoadTopology data
Dataset<RoadSegmentTopology> transformedData =
protobufData
.select(col("partition_name"), explode(col("segment")).as("road"))
.select(
col("road.identifier").as("identifier"),
col("partition_name"),
col("road.geometry.point").as("points"),
col("road.length").as("length"))
.as(roadSegmentTopologyEncoder);
// We will calculate the average segment length per partition, as well as standard deviation
// and total number of segments
Dataset<Row> statsByPartition =
transformedData
.groupBy("partition_name")
.agg(
round(avg("length")).as("average_length"),
round(stddev("length")).as("std_length"),
count("points").as("number_of_points"))
// In order to write to a volatile layer, we need a the column mt_partition that
// specifies the partition
.withColumn("mt_partition", col("partition_name"));
/*
The resulting schema is the following:
root
|-- partition_name: string (nullable = true)
|-- average_length: double (nullable = true)
|-- std_length: double (nullable = true)
|-- number_of_points: long (nullable = false)
|-- mt_partition: string (nullable = true)
*/
JavaLayerDataFrameWriter.create(statsByPartition)
.writeLayer(outputRoadTopologyCatalogHrn, OUTPUT_VOLATILE_PARQUET_LAYER_ID)
.save();
// Task 3: Read data from versioned/custom layer (with custom format), filter augment it and
// write it to
// another versioned/custom layer.
Dataset<Row> customData =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(inputRoadTopologyCatalogHrn, VERSIONED_LAYER_CUSTOM_ID)
.query("mt_partition=in=(partition-1, partition-2, partition-3)")
.load();
// A new column is added containing one partition id for each line in the input data, in the
// format "partition-{{number}}"
Dataset<Row> partitionedData =
customData.withColumn(
"mt_partition", concat(lit("partition-"), monotonically_increasing_id()));
/*
The resulting schema is the following:
root
|-- data: binary (nullable = true)
|-- mt_partition: string (nullable = false)
*/
JavaLayerDataFrameWriter.create(partitionedData)
.writeLayer(outputRoadTopologyCatalogHrn, VERSIONED_LAYER_CUSTOM_ID)
// A DataConverter needs to be provided for the write operation to succeed
.withDataConverter(
new VersionedDataConverter() {
@Override
public GroupedData<VersionedRowMetadata> serializeGroup(
VersionedRowMetadata rowMetadata, Iterator<Row> rows) {
byte[] bytes = rows.next().getAs("data");
return new GroupedData<>(rowMetadata, bytes);
}
})
.save();
logger.info(
String.format(
"Finished setting up data sources in catalog: %s", outputRoadTopologyCatalogHrn));
sparkSession.stop();
}
public static class RoadSegmentTopology implements Serializable {
private String identifier;
private String partition_name;
private SegmentPoint[] points;
private Double length;
}
public static class SegmentPoint implements Serializable {
private String partition_name;
private String longitude;
private Integer z_level;
private Double elevation;
}
}
ローカルでコンパイルおよび実行します
アプリケーションをローカルで実行するには、次のコマンドを実行します。
mvn compile exec:java \
-Dexec.mainClass=DataSourcesSetupScala \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf \
-Dexec.cleanupDaemonThreads=false \
-Dspark.master="local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
mvn compile exec:java \
-Dexec.mainClass=DataSourcesSetup \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf \
-Dexec.cleanupDaemonThreads=false \
-Dspark.master="local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
チュートリアルの主な部分で、データソースを配置した後で、次のコマンドを実行する必要があります。
mvn compile exec:java \
-Dexec.mainClass=SparkConnectorScala \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf \
-Dexec.cleanupDaemonThreads=false \
-Dspark.master="local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
mvn compile exec:java \
-Dexec.mainClass=SparkConnector \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf \
-Dexec.cleanupDaemonThreads=false \
-Dspark.master="local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
詳細情報
このチュートリアルで扱うトピックの詳細については、次のソースを参照してください。
- さまざまなレイヤータイプおよびレイヤー設定については 、 Data API のドキュメントを参照してください。
- カタログの対話式の照会および操作の詳細について は、 OLP CLI のドキュメントおよびデータセクションを参照してください。
- サービスにアクセスするための Scala および Java API の詳細について は、データ クライアント ライブラリ開発者ガイドを確認してください。