Spark Connector を使用してデータの読み取りと書き込みを行います
目的: Spark Connector を使用して、カタログ内のさまざまなレイヤーおよびデータ形式からデータを読み書きする方法について理解します。
複雑さ: 初心者向け
所要時間: 30 分
前提条件: プロジェクトでの作業を整理します
ソースコード: ダウンロード
このチュートリアルの例では 、データ クライアント ライブラリが提供する Spark コネクターの使用方法を示します。 これにより、 Spark とのバッチ処理ワークロードのやり取りがサポートされ、 Spark のすべての標準 API および機能を使用してデータの読み取り、書き込み、および削除を行うことができます。 ストリーム処理のワークロードには、代わりに付属の Flink Connector を使用する必要があります。
チュートリアルの主な部分では、次の使用方法について説明します。
- インデックス レイヤーからのデータの読み取り、印刷、および収集を Avro 形式で行います( Spark Connector は形式を推測でき、これを指定する必要はありません)。
- バージョン付レイヤーから Protobuf 形式でデータを読み取り、ボラタイル レイヤーに変換して寄木細工の形式で書き込みます
- バージョン付レイヤーからのデータの読み取り、フィルタリング、拡張、および書き込みをカスタム形式で行います。
準備のステップとして、データソースを作成する必要があります。このデータソースには、適切なレイヤーが含まれたそれぞれのカタログが含まれており、このチュートリアルのメインパートで使用するデータソースが適切に配置されているようにする必要があります。 使用されているデータセットは HERE Map Content カタログから取得され、道路ネットワークのトポロジおよびジオメトリに関する情報が含まれています。
Maven プロジェクトを設定します
プロジェクトの次のフォルダー構造を作成します。
spark-connector
└── src
└── main
├── java
└── resources
└── scala
この操作は、次の bash
1 つのコマンドで実行できます。
mkdir -p spark-connector/src/main/{java,resources,scala}
pipeline-config.conf
という名前のファイルを作成し、次の内容を入力します。「プロジェクトでの作業の整理」で作成したカタログの {{YOUR_CATALOG_HRN}}
および {{YOUR_OUTPUT_CATALOG_HRN}}
を HERE リソースネームに置き換えます。
pipeline.config {
output-catalog { hrn = "{{YOUR_OUTPUT_CATALOG_HRN}}" }
input-catalogs {
//Please, use hrn:here-cn:data::olp-cn-here:here-map-content-china-2 on China Environment
roadTopologySource { hrn = "hrn:here:data::olp-here:rib-2" }
roadTopologyInput { hrn = "{{YOUR_CATALOG_HRN}}" }
}
}
このチュートリアルでは、パブリックカタログ( HERE Map Content カタログ)を使用します。 カタログは、まずプロジェクトにリンクして、プロジェクト内で使用する必要があります。 これを行うには、{{YOUR_PROJECT_HRN}}
を「プロジェクトでの作業の整理」チュートリアルで作成したプロジェクトの HERE リソースネームで置き換え、次のコマンドを実行します。
olp project resource link {{YOUR_PROJECT_HRN}} hrn:here:data::olp-here:rib-2
コマンドが成功すると、 CLI は次のメッセージを返します。
Project resource hrn:here:data::olp-here:rib-2 has been linked.
この例の POM は、 Spark チュートリアルのパスマッチングの POM と同じです。
親 POM :
<parent>
<groupId>com.here.platform</groupId>
<artifactId>sdk-batch-bom_2.12</artifactId>
<version>2.54.3</version>
<relativePath/>
</parent>
依存関係 :
<dependencies>
<dependency>
<groupId>com.here.platform.pipeline</groupId>
<artifactId>pipeline-interface_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.data.client</groupId>
<artifactId>spark-support_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.hrn</groupId>
<artifactId>hrn_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
</dependencies>
入力カタログと出力カタログの両方を作成する必要があります。 これらの手順を実行するに は、「 OLP コマンド ライン インターフェース ( CLI ) を使用してプロジェクトで作業を整理する」で説明されている手順を実行します。
たとえば、カタログには一意の識別子名を使用する必要 {{YOUR_USERNAME}}-spark-connector-input
があります。
spark-connector-input.json
以下の内容のファイルを作成 {{YOUR_INPUT_CATALOG_ID}}
し、選択した識別子に置き換えます。
注
すべてのタイムスタンプは、エポック( 1970 年 1 月 1 日 00:00 UTC )からの UTC ミリ秒数です。 別のタイムゾーンでアプリケーションを実行する場合は、データのクエリまたはアップロードを行う前に、タイムスタンプが UTC に変換されていることを確認してください。 Java または Scala では、次の関数呼び出しを使用して変換を実行できます。 Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis()
{
"id": "spark-connector-input",
"name": "Simulated road topology data archive (From tutorial) spark-connector-input",
"summary": "Archive of simulated road topology data",
"description": "Archive of simulated road topology data.",
"tags": ["Tutorial", "Simulated"],
"layers": [
{
"id": "index-layer-avro-data",
"name": "index-layer-avro-data",
"summary": "Simulated data.",
"description": "Simulated road topology data.\n\nThis data is meant to be used for demonstration purposes and \"playing\" with data.",
"tags": ["Tutorial", "Simulated"],
"contentType": "application/x-avro-binary",
"layerType": "index",
"indexProperties": {
"indexDefinitions": [
{
"name": "ingestion_timestamp",
"duration": 600000,
"type": "timewindow"
},
{
"name": "tile_id",
"type": "heretile",
"zoomLevel": 12
}
],
"ttl": "unlimited"
},
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "generic"
}
},
{
"id": "versioned-layer-protobuf-data",
"name": "versioned-layer-protobuf-data",
"summary": "Simulated data.",
"description": "Simulated road topology data for versioned-layer-protobuf-data",
"contentType": "application/x-protobuf",
"layerType": "versioned",
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "generic"
},
"schema": {
"hrn": "hrn:here:schema::olp-here:com.here.schema.rib:topology-geometry_v2:2.41.0"
}
},
{
"id": "versioned-layer-custom-data",
"name": "versioned-layer-custom-data",
"summary": "Simulated data.",
"description": "Simulated road topology data for versioned-layer-custom-data",
"contentType": "application/octet-stream",
"layerType": "versioned",
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "generic"
}
}
]
}
出力カタログでは spark-connector-ouput.json
、次のようにファイルに名前を付けることができます。
{
"id": "spark-connector-output",
"name": "Simulated data archive (From tutorial) test-spark-connector-output",
"summary": "Archive of simulated road topology data",
"description": "Archive of simulated road topology data.",
"tags": ["Tutorial", "Simulated"],
"layers": [
{
"id": "volatile-layer-parquet-data",
"name": "volatile-layer-parquet-data",
"summary": "Simulated data.",
"description": "Simulated road topology data.",
"contentType": "application/x-parquet",
"layerType": "volatile",
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "generic"
}
},
{
"id": "versioned-layer-custom-data",
"name": "versioned-layer-custom-data",
"summary": "Simulated data.",
"description": "Simulated road topology data for versioned-layer-custom-data",
"contentType": "application/octet-stream",
"layerType": "versioned",
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "generic"
}
}
]
}
{{YOUR_CATALOG_ID}}
次のコード スニペットに示されているように、を独自の識別子に置き換えます。 また {{YOUR_PROJECT_HRN}}
、「 プロジェクトの作業を整理」の「 HERE リソースネーム 」に置き換え てから、次のコマンドを実行します。
olp catalog create {{YOUR_CATALOG_ID}} \
"Simulated road topology data input from tutorial ({{YOUR_USERNAME}})" \
--config spark-connector-input.json \
--scope {{YOUR_PROJECT_HRN}}
注
レルムで請求タグが必要な場合 は、layer
セクションにbillingTags: ["YOUR_BILLING_TAG"]
プロパティを追加して設定 ファイルを更新します。
データソースを設定します
この手順では、このチュートリアルのメイン部分で使用するデータソースを設定するためのアプリケーションを実装します。 Spark コネクターは HERE 、道路のトポロジーおよび形状のデータを含むソースからデータを読み取り、次のステージで入力として使用されるカタログに書き込むために使用されます。
このためには、道路トポロジおよびジオメトリデータが含まれているデータセットからデータを読み取り、 3 つのデータソースに保存する必要があります。各データソースは、入力カタログを作成するときに上記で定義したいずれかのレイヤータイプおよびデータ形式に対応しています。
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Row, SparkSession}
import com.here.platform.data.client.spark.scaladsl.{
GroupedData,
VersionedDataConverter,
VersionedRowMetadata
}
import org.slf4j.LoggerFactory
object DataSourcesSetupScala extends App {
private val logger = LoggerFactory.getLogger(DataSourcesSetupScala.getClass)
private val pipelineContext = new PipelineContext
private val roadTopologyDataSourceCatalogHrn =
pipelineContext.config.inputCatalogs("roadTopologySource")
private val roadTopologyDataSourceLayerId = "topology-geometry"
private val inputRoadTopologyCatalogHrn =
pipelineContext.config.inputCatalogs("roadTopologyInput")
private val inputIndexAvroLayerId = "index-layer-avro-data"
private val inputVersionedProtobufLayerId = "versioned-layer-protobuf-data"
private val inputversionedLayerCustomId = "versioned-layer-custom-data"
val sparkSession: SparkSession =
SparkSession.builder().appName("DataSourcesSetupScalaPipeline").getOrCreate()
import sparkSession.implicits._
private val berlinPartition = "23618361"
private val munichPartition = "23611420"
val sourceRoadTopologyData = sparkSession
.readLayer(roadTopologyDataSourceCatalogHrn, roadTopologyDataSourceLayerId)
.query(s"mt_partition=in=($berlinPartition, $munichPartition)")
.load()
val transformedData = sourceRoadTopologyData
.select(col("partition_name"), explode(col("segment")) as "road")
.select(col("partition_name"),
col("road.identifier") as "identifier",
explode(col("road.geometry.point")) as "points",
col("road.length") as "length")
.select("partition_name", "identifier", "points.*", "length")
val indexLayerData = transformedData
.withColumn("idx_tile_id", col("partition_name"))
.withColumn("idx_ingestion_timestamp", unix_timestamp())
indexLayerData
.writeLayer(inputRoadTopologyCatalogHrn, inputIndexAvroLayerId)
.save()
val versionedLayerData = sourceRoadTopologyData
.withColumn("mt_partition", col("partition_name"))
versionedLayerData
.writeLayer(inputRoadTopologyCatalogHrn, inputVersionedProtobufLayerId)
.save()
val customRawLayerId = transformedData
.select(col("length"))
.map(v => s"${lit("road-partition-")}:${lit(v.mkString).toString}".getBytes)
.select(col("value") as "data")
.withColumn("mt_partition", concat(lit("partition-"), monotonically_increasing_id))
customRawLayerId
.writeLayer(inputRoadTopologyCatalogHrn, inputversionedLayerCustomId)
.withDataConverter(new VersionedDataConverter {
override def serializeGroup(rowMetadata: VersionedRowMetadata,
rows: Iterator[Row]): GroupedData[VersionedRowMetadata] = {
val bytes = rows.next().getAs[Array[Byte]]("data")
GroupedData(rowMetadata, bytes)
}
})
.save()
logger.info(s"Finished setting up data sources in catalog: $inputRoadTopologyCatalogHrn")
sparkSession.stop()
}
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import com.here.platform.data.client.spark.javadsl.VersionedDataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.VersionedRowMetadata;
import com.here.platform.pipeline.PipelineContext;
import java.util.Iterator;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DataSourcesSetup {
private static PipelineContext pipelineContext = new PipelineContext();
private static HRN roadTopologyDataSourceCatalogHrn =
pipelineContext.getConfig().getInputCatalogs().get("roadTopologySource");
private static String roadTopologyDataSourceLayerId = "topology-geometry";
private static HRN inputRoadTopologyCatalogHrn =
pipelineContext.getConfig().getInputCatalogs().get("roadTopologyInput");
private static String inputIndexAvroLayerId = "index-layer-avro-data";
private static String inputVersionedProtobufLayerId = "versioned-layer-protobuf-data";
private static String inputversionedLayerCustomId = "versioned-layer-custom-data";
private static SparkSession sparkSession =
SparkSession.builder().appName("DataSourcesSetupPipeline").getOrCreate();
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger(DataSourcesSetup.class);
String berlinPartition = "23618361";
String munichPartition = "23611420";
Dataset<Row> sourceRoadTopologyData =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(roadTopologyDataSourceCatalogHrn, roadTopologyDataSourceLayerId)
.query(String.format("mt_partition=in=(%s, %s)", berlinPartition, munichPartition))
.load();
Dataset<Row> transformedData =
sourceRoadTopologyData
.select(col("partition_name"), explode(col("segment")).as("road"))
.select(
col("partition_name"),
col("road.identifier").as("identifier"),
explode(col("road.geometry.point")),
col("road.length").as("length"))
.select("partition_name", "identifier", "col.*", "length");
Dataset<Row> indexLayerData =
transformedData
.withColumn("idx_tile_id", col("partition_name"))
.withColumn("idx_ingestion_timestamp", unix_timestamp());
JavaLayerDataFrameWriter.create(indexLayerData)
.writeLayer(inputRoadTopologyCatalogHrn, inputIndexAvroLayerId)
.save();
Dataset<Row> versionedLayerData =
sourceRoadTopologyData.withColumn("mt_partition", col("partition_name"));
JavaLayerDataFrameWriter.create(versionedLayerData)
.writeLayer(inputRoadTopologyCatalogHrn, inputVersionedProtobufLayerId)
.save();
Dataset<Row> customRawLayerId =
transformedData
.select(col("length"))
.map((MapFunction<Row, byte[]>) row -> row.toString().getBytes(), Encoders.BINARY())
.select(col("value").as("data"))
.withColumn("mt_partition", concat(lit("partition-"), monotonically_increasing_id()));
JavaLayerDataFrameWriter.create(customRawLayerId)
.writeLayer(inputRoadTopologyCatalogHrn, inputversionedLayerCustomId)
.withDataConverter(
new VersionedDataConverter() {
@Override
public GroupedData<VersionedRowMetadata> serializeGroup(
VersionedRowMetadata rowMetadata, Iterator<Row> rows) {
byte[] bytes = rows.next().getAs("data");
return new GroupedData<>(rowMetadata, bytes);
}
})
.save();
logger.info(
String.format(
"Finished setting up data sources in catalog: %s", inputRoadTopologyCatalogHrn));
sparkSession.stop();
}
}
Spark Connector アプリケーションを実装します
このアプリケーションは、前のステージで作成したデータソースを使用して、異なるレイヤーおよびデータ形式から読み取ります。 これにより、結果のデータに何らかの変換が実行され、前に作成されたカタログから出力レイヤーに書き込みます。
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import com.here.platform.data.client.spark.scaladsl.{
GroupedData,
VersionedDataConverter,
VersionedRowMetadata
}
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, Row, SparkSession}
import org.slf4j.LoggerFactory
object SparkConnectorScala extends App {
private val logger = LoggerFactory.getLogger(SparkConnectorScala.getClass)
private val pipelineContext = new PipelineContext
private val inputRoadTopologyCatalogHrn =
pipelineContext.config.inputCatalogs("roadTopologyInput")
private val outputRoadTopologyCatalogHrn = pipelineContext.config.outputCatalog
private val inputIndexAvroLayerId = "index-layer-avro-data"
private val inputVersionedProtobufLayerId = "versioned-layer-protobuf-data"
private val versionedLayerCustomId = "versioned-layer-custom-data"
private val outputVolatileParquetLayerId = "volatile-layer-parquet-data"
val sparkSession: SparkSession =
SparkSession.builder().appName("SparkConnectorScalaPipeline").getOrCreate()
import sparkSession.implicits._
private val berlinPartition = "23618361"
private val munichPartition = "23611420"
val avroData: DataFrame = sparkSession
.readLayer(inputRoadTopologyCatalogHrn, inputIndexAvroLayerId)
.query(s"tile_id==$munichPartition")
.load()
avroData.printSchema()
avroData.show()
avroData.collect()
val protobufData: DataFrame = sparkSession
.readLayer(inputRoadTopologyCatalogHrn, inputVersionedProtobufLayerId)
.query(s"mt_partition=in=($berlinPartition, $munichPartition)")
.load()
case class RoadSegmentTopology(identifier: String,
partition_name: String,
points: Array[SegmentPoint],
length: Double)
case class SegmentPoint(latitude: String, longitude: String, z_level: Integer, elevation: Double)
val transformedData = protobufData
.select(col("partition_name"), explode(col("segment")) as "road")
.select(col("road.identifier") as "identifier",
col("partition_name"),
col("road.geometry.point") as "points",
col("road.length") as "length")
.as[RoadSegmentTopology]
val statsByPartition: DataFrame = transformedData
.groupBy("partition_name")
.agg(round(avg("length")) as "average_length",
round(stddev("length")) as "std_length",
count("points") as "number_of_points")
.withColumn("mt_partition", col("partition_name"))
statsByPartition
.writeLayer(outputRoadTopologyCatalogHrn, outputVolatileParquetLayerId)
.save()
val customReadData: Dataset[String] = sparkSession
.readLayer(inputRoadTopologyCatalogHrn, versionedLayerCustomId)
.query(s"mt_partition=in=(partition-1, partition-2, partition-3)")
.load()
.map { row =>
row.getAs[Array[Byte]]("data").map(_.toChar).mkString
}(Encoders.STRING)
val customData: DataFrame = customReadData
.map(_.getBytes)
.toDF
.withColumn("data", col("value"))
.select(col("value") as "data")
.withColumn("mt_partition", concat(lit("id-"), monotonically_increasing_id))
customData
.writeLayer(outputRoadTopologyCatalogHrn, versionedLayerCustomId)
.withDataConverter(new VersionedDataConverter {
override def serializeGroup(rowMetadata: VersionedRowMetadata,
rows: Iterator[Row]): GroupedData[VersionedRowMetadata] = {
val bytes = rows.next().getAs[Array[Byte]]("data")
GroupedData(rowMetadata, bytes)
}
})
.save()
logger.info(s"Finished setting up data sources in catalog: $outputRoadTopologyCatalogHrn")
sparkSession.stop()
}
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import com.here.platform.data.client.spark.javadsl.VersionedDataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.VersionedRowMetadata;
import com.here.platform.pipeline.PipelineContext;
import java.io.Serializable;
import java.util.Iterator;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SparkConnector {
private static final PipelineContext pipelineContext = new PipelineContext();
private static final HRN inputRoadTopologyCatalogHrn =
pipelineContext.getConfig().getInputCatalogs().get("roadTopologyInput");
private static final HRN outputRoadTopologyCatalogHrn =
pipelineContext.getConfig().getOutputCatalog();
private static final String INPUT_INDEX_AVRO_LAYER_ID = "index-layer-avro-data";
private static final String INPUT_VERSIONED_PROTOBUF_LAYER_ID = "versioned-layer-protobuf-data";
private static final String VERSIONED_LAYER_CUSTOM_ID = "versioned-layer-custom-data";
private static final String OUTPUT_VOLATILE_PARQUET_LAYER_ID = "volatile-layer-parquet-data";
public static void main(String[] args) {
SparkSession sparkSession =
SparkSession.builder().appName("SparkConnectorPipeline").getOrCreate();
Logger logger = LoggerFactory.getLogger(SparkConnector.class);
String berlinPartition = "23618361";
String munichPartition = "23611420";
Dataset<Row> avroData =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(inputRoadTopologyCatalogHrn, INPUT_INDEX_AVRO_LAYER_ID)
.query(String.format("tile_id==%s", munichPartition))
.load();
avroData.printSchema();
avroData.show();
avroData.collect();
Dataset<Row> protobufData =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(inputRoadTopologyCatalogHrn, INPUT_VERSIONED_PROTOBUF_LAYER_ID)
.query(String.format("mt_partition=in=(%s, %s)", berlinPartition, munichPartition))
.load();
Encoder<RoadSegmentTopology> roadSegmentTopologyEncoder =
Encoders.bean(RoadSegmentTopology.class);
Dataset<RoadSegmentTopology> transformedData =
protobufData
.select(col("partition_name"), explode(col("segment")).as("road"))
.select(
col("road.identifier").as("identifier"),
col("partition_name"),
col("road.geometry.point").as("points"),
col("road.length").as("length"))
.as(roadSegmentTopologyEncoder);
Dataset<Row> statsByPartition =
transformedData
.groupBy("partition_name")
.agg(
round(avg("length")).as("average_length"),
round(stddev("length")).as("std_length"),
count("points").as("number_of_points"))
.withColumn("mt_partition", col("partition_name"));
JavaLayerDataFrameWriter.create(statsByPartition)
.writeLayer(outputRoadTopologyCatalogHrn, OUTPUT_VOLATILE_PARQUET_LAYER_ID)
.save();
Dataset<Row> customData =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(inputRoadTopologyCatalogHrn, VERSIONED_LAYER_CUSTOM_ID)
.query("mt_partition=in=(partition-1, partition-2, partition-3)")
.load();
Dataset<Row> partitionedData =
customData.withColumn(
"mt_partition", concat(lit("partition-"), monotonically_increasing_id()));
JavaLayerDataFrameWriter.create(partitionedData)
.writeLayer(outputRoadTopologyCatalogHrn, VERSIONED_LAYER_CUSTOM_ID)
.withDataConverter(
new VersionedDataConverter() {
@Override
public GroupedData<VersionedRowMetadata> serializeGroup(
VersionedRowMetadata rowMetadata, Iterator<Row> rows) {
byte[] bytes = rows.next().getAs("data");
return new GroupedData<>(rowMetadata, bytes);
}
})
.save();
logger.info(
String.format(
"Finished setting up data sources in catalog: %s", outputRoadTopologyCatalogHrn));
sparkSession.stop();
}
public static class RoadSegmentTopology implements Serializable {
private String identifier;
private String partition_name;
private SegmentPoint[] points;
private Double length;
}
public static class SegmentPoint implements Serializable {
private String partition_name;
private String longitude;
private Integer z_level;
private Double elevation;
}
}
ローカルでコンパイルおよび実行します
アプリケーションをローカルで実行するには、次のコマンドを実行します。
mvn compile exec:java \
-Dexec.mainClass=DataSourcesSetupScala \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf \
-Dexec.cleanupDaemonThreads=false \
-Dspark.master="local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
mvn compile exec:java \
-Dexec.mainClass=DataSourcesSetup \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf \
-Dexec.cleanupDaemonThreads=false \
-Dspark.master="local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
チュートリアルの主な部分で、データソースを配置した後で、次のコマンドを実行する必要があります。
mvn compile exec:java \
-Dexec.mainClass=SparkConnectorScala \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf \
-Dexec.cleanupDaemonThreads=false \
-Dspark.master="local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
mvn compile exec:java \
-Dexec.mainClass=SparkConnector \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf \
-Dexec.cleanupDaemonThreads=false \
-Dspark.master="local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
このチュートリアルで扱うトピックの詳細については、次のソースを参照してください。