バッチアプリケーションでカタログから読み取ります
目的: HERE Map Content の道路形状の合計長さを推定します。
複雑さ: 初心者向け
所要時間: 30 分
前提条件: プロジェクトでの作業を整理します
ソースコード: ダウンロード
パイプラインでジョブを展開する目的で HERE Workspace で開発する場合、 2 つのランタイム環境から選択できます。
- バッチを使用して、 Spark ベースのアプリケーションを実行できます。
- ストリームを使用して、 Flinkベースのアプリケーションを実行できます。
この例 では、マップ内に存在する道路ジオメトリの合計長を推定するために、 HERE Map Content カタログtopology-geometry
レイヤーからデータをダウンロードする単純な Spark バッチアプリケーションを示します。
topology-geometry
このレイヤーには、 HERE Map Content トポロジーと道路セグメントの形状が含まれています。 このレイヤーの空間パーティション分割方式はです HereTile
。 HereTile
パーティション分割の詳細については 、このドキュメントを参照してください。
各セグメントには length
、その合計長をメートル単位で表す属性も含まれています。
まず、を使用 queryMetadata
して、レイヤーのパーティションのリストが含まれているレイヤーのメタデータをダウンロードし、使用可能なパーティションの約 1/1000 のランダムなサンプルを選択します。
選択したパーティションごとに、関連するデータをダウンロードし、各パーティションで使用できるすべての読み取り桁数を合計します。 RDD
これにより、倍精度浮動小数点数型の結果が 1 つの数値に削減され、選択したタイルに存在するすべての値が合計されます。
Maven でプロジェクトを設定します
Spark を使用してパイプラインで実行されるアプリケーションを開発するには sdk-batch-bom_2.12
、をアプリケーションの親ポムとして使用します。
<parent>
<groupId>com.here.platform</groupId>
<artifactId>sdk-batch-bom_2.12</artifactId>
<version>2.54.3</version>
<relativePath/>
</parent>
Scala と Java の依存関係を調整します。
<dependencies>
<dependency>
<groupId>com.here.platform.pipeline</groupId>
<artifactId>pipeline-interface_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.schema.rib</groupId>
<artifactId>topology-geometry_v2_scala_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.schema.rib</groupId>
<artifactId>topology-geometry_v2_java</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.data.client</groupId>
<artifactId>spark-support_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.hrn</groupId>
<artifactId>hrn_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
</dependencies>
アプリケーションを実装します
このアプリケーションは 、topology-geometry
レイヤー内のパーティションを介して MapReduce を実装し、各パーティションのすべての道路セグメントの長さを合計します。
このアプリケーションは、すべてのパーティションの長さを合計する代わりに、パーティションの小さなサブセットをサンプリングし、すべての長さの合計をサンプリングレートで割り、すべてのパーティションの合計長を推定します。 これにより、妥当な見積もりが短時間で作成されます。
注
執筆時点では、 HERE カタログには約 5,900 万 km のジオメトリがあります。
import akka.actor.{ActorSystem, CoordinatedShutdown}
import akka.actor.CoordinatedShutdown.UnknownReason
import com.here.hrn.HRN
import com.here.platform.data.client.engine.scaladsl.DataEngine
import com.here.platform.data.client.scaladsl.{DataClient, Partition}
import com.here.platform.data.client.spark.DataClientSparkContextUtils
import com.here.platform.pipeline.PipelineContext
import com.here.schema.rib.v2.topology_geometry_partition.TopologyGeometryPartition
import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import com.here.platform.data.client.spark.SparkSupport._
object SparkPipelineScala {
private val logger = LoggerFactory.getLogger(SparkPipelineScala.getClass)
private val sampleFraction = 1.0 / 1000.0
def main(args: Array[String]): Unit = {
val sparkContext: SparkContext = new SparkContext(new SparkConf().setAppName("SparkPipeline"))
val pipelineContext = new PipelineContext
val hereMapContent = pipelineContext.config.inputCatalogs("hereMapContent")
val hereMapContentVersion = pipelineContext.job.get.inputCatalogs("hereMapContent").version
val outputCatalog = pipelineContext.config.outputCatalog
val config: Config = ConfigFactory.empty
.withValue("here.platform.data-client.endpoint-locator.discovery-service-env",
ConfigValueFactory.fromAnyRef("custom"))
.withValue(
"here.platform.data-client.endpoint-locator.discovery-service-url",
ConfigValueFactory.fromAnyRef("https://api-lookup.data.api.platform.here.com")
)
val appName = "SparkPipelineExampleScala"
implicit lazy val actorSystem: ActorSystem = ActorSystem.create(appName, config)
try {
val topologyLayerMetadata: RDD[Partition] =
queryMetadata(hereMapContent,
hereMapContentVersion,
"topology-geometry",
sparkContext,
actorSystem)
.sample(withReplacement = true, sampleFraction)
val topologyPartitions: RDD[TopologyGeometryPartition] =
topologyLayerMetadata.map(readTopologyGeometry(hereMapContent))
val roadLengths: RDD[Double] =
topologyPartitions.map(_.segment.map(_.length).sum)
val totalMeters: Double =
roadLengths.reduce(_ + _) / sampleFraction
logger.info(f"The estimated length of all roads in the map is: $totalMeters%.0fm")
logger.info(s"The configured output catalog is: $outputCatalog")
} finally {
val shutdown = CoordinatedShutdown(actorSystem).run(UnknownReason)
Await.result(shutdown, Duration.Inf)
}
}
private def queryMetadata(catalog: HRN,
catalogVersion: Long,
layerName: String,
sparkContext: SparkContext,
actorSystem: ActorSystem): RDD[Partition] = {
val query = DataClient(actorSystem).queryApi(catalog)
val partitions = query.getPartitionsAsIterator(catalogVersion, layerName)
sparkContext.parallelize(partitions.awaitResult.toStream)
}
private def readTopologyGeometry(catalog: HRN)(partition: Partition) =
TopologyGeometryPartition.parseFrom(read(catalog)(partition))
private def read(catalog: HRN)(partition: Partition) =
DataEngine(DataClientSparkContextUtils.context.actorSystem)
.readEngine(catalog)
.getDataAsBytes(partition)
.awaitResult()
}
import akka.actor.ActorSystem;
import akka.actor.CoordinatedShutdown;
import com.google.protobuf.InvalidProtocolBufferException;
import com.here.hrn.HRN;
import com.here.platform.data.client.engine.javadsl.DataEngine;
import com.here.platform.data.client.javadsl.DataClient;
import com.here.platform.data.client.javadsl.Partition;
import com.here.platform.data.client.javadsl.QueryApi;
import com.here.platform.data.client.spark.DataClientSparkContextUtils;
import com.here.platform.pipeline.PipelineContext;
import com.here.schema.rib.v2.TopologyGeometry;
import com.here.schema.rib.v2.TopologyGeometryPartitionOuterClass.TopologyGeometryPartition;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SparkPipelineJava {
private static final double sampleFraction = 1.0 / 1000.0;
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger(SparkPipelineJava.class);
JavaSparkContext sparkContext =
new JavaSparkContext(new SparkConf().setAppName("SparkPipeline"));
PipelineContext pipelineContext = new PipelineContext();
HRN hereMapContent = pipelineContext.getConfig().getInputCatalogs().get("hereMapContent");
Long hereMapContentVersion =
pipelineContext.getJob().get().getInputCatalogs().get("hereMapContent").version();
HRN outputCatalog = pipelineContext.getConfig().getOutputCatalog();
Config config = ConfigFactory.empty();
config =
config.withValue(
"here.platform.data-client.endpoint-locator.discovery-service-env",
ConfigValueFactory.fromAnyRef("custom"));
config =
config.withValue(
"here.platform.data-client.endpoint-locator.discovery-service-url",
ConfigValueFactory.fromAnyRef("https://api-lookup.data.api.platform.here.com"));
ActorSystem sparkActorSystem = ActorSystem.create("SparkPipelineExampleJava", config);
try {
JavaRDD<Partition> topologyLayerMetadata =
queryMetadata(
hereMapContent,
hereMapContentVersion,
"topology-geometry",
sparkContext,
sparkActorSystem)
.sample(true, sampleFraction);
TopologyGeometryReader readTopologyGeometry = new TopologyGeometryReader(hereMapContent);
JavaRDD<TopologyGeometryPartition> topologyGeometryPartition =
topologyLayerMetadata.map(readTopologyGeometry::read);
JavaRDD<Double> roadLengths =
topologyGeometryPartition.map(
partition ->
partition
.getSegmentList()
.stream()
.map(TopologyGeometry.Segment::getLength)
.mapToDouble(Double::doubleValue)
.sum());
Double totalMeters = roadLengths.reduce(Double::sum) / sampleFraction;
logger.info(
String.format("The estimated length of all roads in the map is: %.0fm", totalMeters));
logger.info(String.format("The configured output catalog is: %s", outputCatalog));
} finally {
CoordinatedShutdown.get(sparkActorSystem)
.runAll(CoordinatedShutdown.unknownReason())
.toCompletableFuture()
.join();
}
}
private static JavaRDD<Partition> queryMetadata(
HRN catalog,
Long catalogVersion,
String layerName,
JavaSparkContext sparkContext,
ActorSystem sparkActorSystem) {
QueryApi query = DataClient.get(sparkActorSystem).queryApi(catalog);
ArrayList<Partition> partitions = new ArrayList<>();
query
.getPartitionsAsIterator(catalogVersion, layerName, Collections.emptySet())
.toCompletableFuture()
.join()
.forEachRemaining(partitions::add);
return sparkContext.parallelize(partitions);
}
}
class TopologyGeometryReader implements Serializable {
private HRN catalog;
TopologyGeometryReader(HRN catalog) {
this.catalog = catalog;
}
TopologyGeometryPartition read(Partition partition) throws InvalidProtocolBufferException {
return TopologyGeometryPartition.parseFrom(readRaw(partition));
}
private byte[] readRaw(Partition partition) {
return DataEngine.get(DataClientSparkContextUtils.context().actorSystem())
.readEngine(catalog)
.getDataAsBytes(partition)
.toCompletableFuture()
.join();
}
}
この pipeline-config.conf
ファイルでは、 HERE リソースネーム for HERE Map Content が heremapcontent
パイプラインの入力カタログ、および出力カタログの HERE リソースネーム として宣言されています。 このパイプラインは出力カタログを記述しないため、出力 HERE リソースネーム は単なるダミー値にすぎません。
本番パイプラインでは、出力 HERE リソースネーム は、アプリまたは共有グループ(あるいはその両方)が書き込み権限を持つ既存のカタログを参照します。 これらの権限の管理の詳細について は、このドキュメントを参照してください。
pipeline.config {
//Please use "hrn:here-cn:data::olp-cn-here:dummy" on China environment
output-catalog {hrn = "hrn:here:data::olp-here:dummy"}
input-catalogs {
//Please, use hrn:here-cn:data::olp-cn-here:here-map-content-china-2 on China Environment
hereMapContent {hrn = "hrn:here:data::olp-here:rib-2"}
}
}
このチュートリアルでは、パブリックカタログ( HERE Map Content カタログ)を使用します。 カタログは、まずプロジェクトにリンクして、プロジェクト内で使用する必要があります。 これを行うには、{{YOUR_PROJECT_HRN}}
を「プロジェクトでの作業の整理」チュートリアルの説明に従って作成したプロジェクトの実際の HERE リソースネームで置き換え、次のコマンドを実行します。
olp project resource link {{YOUR_PROJECT_HRN}} hrn:here:data::olp-here:rib-2
コマンドが成功すると、 CLI は次のメッセージを返します。
Project resource hrn:here:data::olp-here:rib-2 has been linked.
この pipeline-job.conf
ファイルは heremapcontent
、入力およびダミー出力のバージョンを宣言します。
pipeline.job.catalog-versions {
output-catalog {base-version = -1}
input-catalogs {
hereMapContent {
processing-type = "reprocess"
// Please, use "version = 0" on China Environment
version = 4
}
}
}
アプリケーションを実行します
アプリケーションをローカルで実行するには、次のコマンドを実行します。
mvn compile exec:java \
-Dexec.mainClass=SparkPipelineScala \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf \
-Dspark.master="local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
mvn compile exec:java \
-Dexec.mainClass=SparkPipelineJava \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf \
-Dspark.master="local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
-Dhere.platform.data-client.request-signer.credentials.here-account.*
パラメーターのセットを考慮してください。 credentials.properties
ファイルから {{YOUR_PROJECT_HRN}}
データ クライアント ライブラリにデータを渡すには、これらのパラメーターを指定します。 データ クライアント ライブラリの初期化の詳細について は、「 Java システムのプロパティを使用して資格情報を設定する」を参照してください。
HERE から、このジョブをパイプラインで実行できます。
OLP CLI 開発者ガイドのパイプラインコマンド
sample
メタデータでの呼び出しと totalMeters
、変数の最終結果のスケーリングを削除します。 これにより、プログラムが概算見積書から並列プログラムに変換され、カタログ内のすべての値が数分で合計されます。
また、 KM の合計数に関する情報を、 generic
パーティション分割された 1 つの text\plain
レイヤーと、カタログ情報を含む 1 つのパーティションに公開することもできます。 カタログの作成およびレイヤーの設定の詳細については、「プロジェクトでの作業の整理」のチュートリアルおよびコマンド ライン インターフェース開発者ガイドを参照してください。