<dependencies><dependency><groupId>com.here.platform.pipeline</groupId><artifactId>pipeline-interface_${scala.compat.version}</artifactId></dependency><dependency><groupId>com.here.platform.data.client</groupId><artifactId>spark-support_${scala.compat.version}</artifactId></dependency><!-- To enable support of local catalogs --><dependency><groupId>com.here.platform.data.client</groupId><artifactId>local-support_${scala.compat.version}</artifactId></dependency><dependency><groupId>com.here.hrn</groupId><artifactId>hrn_${scala.compat.version}</artifactId></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${scala.compat.version}</artifactId></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${scala.compat.version}</artifactId></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId></dependency><dependency><groupId>com.here.schema.rib</groupId><artifactId>topology-geometry_v2_scala_${scala.compat.version}</artifactId></dependency><dependency><groupId>com.here.schema.rib</groupId><artifactId>topology-geometry_v2_java</artifactId></dependency><dependency><groupId>com.thesamet.scalapb</groupId><artifactId>sparksql-scalapb_${scala.compat.version}</artifactId><version>0.10.4</version></dependency><!-- To test the Scala version of the application --><dependency><groupId>org.scalatest</groupId><artifactId>scalatest_${scala.compat.version}</artifactId><version>3.0.1</version><scope>test</scope></dependency><!-- To test the Java version of the application --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency></dependencies>
ローカル出力カタログを作成します
まず、パイプラインを開発し、 HERE Map Content から読み取り、ローカルカタログへの書き込みを行います。 さらに、 HERE Map Content カタログのローカルコピーを作成して、完全にオフラインの開発環境を作成します。
pipeline-config.conf 入力カタログ( HERE Map Content )を含むファイルを作成できるようになりました。新しく作成したローカルカタログは出力カタログです。
pipeline.config {// Make sure the local catalog has been created first:// olp local catalog create node-count "Node Count" --config local-development-workflow-output-catalog.json
output-catalog { hrn = "hrn:local:data:::node-count"}
input-catalogs {
here-map-content { hrn = "hrn:here:data::olp-here:rib-2"}}}
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/importcom.here.olp.util.geo.BoundingBox
importcom.here.olp.util.quad.factory.HereQuadFactory
importcom.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
importcom.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
importcom.here.platform.data.client.spark.scaladsl.{
GroupedData,
VersionedDataConverter,
VersionedRowMetadata
}importcom.here.platform.pipeline.PipelineContext
importcom.here.schema.rib.v2.topology_geometry_partition.TopologyGeometryPartition
importcom.typesafe.config.ConfigBeanFactory
importorg.apache.spark.sql.{Dataset, Row, SparkSession}importscalapb.spark.Implicits._
object SparkConnectorLocalScala {// We expose two public methods, `main` and `run`. `main` reads the pipeline context and the// configuration from `pipeline-config.conf` and `application.conf`, while `run` accepts them// as parameters. This makes it convenient to test the logic later using `run` with different// configuration parameters, without having to pass them through the classpath.def main(args: Array[String]):Unit={// Read the default pipeline contextval pipelineContext =new PipelineContext
// Read the bounding box configured in `application.conf`val bbox = ConfigBeanFactory.create(
pipelineContext.applicationConfig.getConfig("tutorial.boundingBox"),
classOf[BoundingBox])
run(pipelineContext, bbox)}def run(pipelineContext: PipelineContext, bbox: BoundingBox):Unit={// Defines the input / output catalogs to be read / written toval inputHrn = pipelineContext.config.inputCatalogs("here-map-content")val outputHrn = pipelineContext.config.outputCatalog
// Input / output layersval topologyGeometryLayer ="topology-geometry"val outputLayer ="node-count"val sparkSession: SparkSession =
SparkSession.builder().appName("SparkTopologyNodeCount").getOrCreate()// Read the input data as a Dataset of topology partitionsval topologyGeometryData: Dataset[TopologyGeometryPartition]= sparkSession
.readLayer(inputHrn, topologyGeometryLayer).query(
s"mt_partition=inboundingbox=(${bbox.getNorth},${bbox.getSouth},${bbox.getEast},${bbox.getWest})").option("olp.connector.force-raw-data", value =true).load().select("data").as[Array[Byte]].map(TopologyGeometryPartition.parseFrom)// This is needed for implicit conversions - moved down to avoid conflicts with scalapb.spark.Implicits._importsparkSession.implicits._
// Compute the output partitionsval nodeCountPartitions: Dataset[NodeCountPartition]= topologyGeometryData.map { partition =>val quads = partition.node
.flatMap(_.geometry).map(
g => HereQuadFactory.INSTANCE.getMapQuadByLocation(g.latitude, g.longitude,14).getLongKey
).groupBy(identity).map {case(tileId, seq)=> NodeCount(tileId, seq.size)}
NodeCountPartition(partition.partitionName, quads.toSeq)}// Encode the output partitions, convert to DataFrame and publish the output data
nodeCountPartitions
.map(p =>(p.partitionName, p.toGeoJson.getBytes)).toDF("mt_partition","data").writeLayer(outputHrn, outputLayer).withDataConverter(new VersionedDataConverter {def serializeGroup(rowMetadata: VersionedRowMetadata,
rows: Iterator[Row]): GroupedData[VersionedRowMetadata]=
GroupedData(rowMetadata, rows.next().getAs[Array[Byte]]("data"))}).save()
sparkSession.stop()}// Class representing the number of nodes in a level 14 sub-tilecaseclass NodeCount(tileId:Long, count:Int){def toGeoJson:String={val box = HereQuadFactory.INSTANCE.getMapQuadByLongKey(tileId).getBoundingBox
val e = box.getEast
val w = box.getWest
val n = box.getNorth
val s = box.getSouth
val color ={// from black (0 nodes) to red (500 nodes)
s"rgb(${(count.min(500).toDouble / 500 * 255).toInt},0,0)"}"""{"type":"Feature","""+
s""""geometry":{"type":"Polygon","coordinates":[[[$w,$s],[$e,$s],[$e,$n],[$w,$n],[$w,$s]]]},"""+
s""""properties":{"description":{"tileId":$tileId,"count":$count},"style":{"color":"$color"}}}"""}}// Class representing the decoded content of an output partitioncaseclass NodeCountPartition(partitionName:String, counts: Seq[NodeCount]){def toGeoJson:String=
s"""{"type":"FeatureCollection","features":[${counts.map(_.toGeoJson).mkString(",")}]}"""}}
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/importcom.here.hrn.HRN;importcom.here.olp.util.geo.BoundingBox;importcom.here.olp.util.quad.factory.HereQuadFactory;importcom.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;importcom.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;importcom.here.platform.data.client.spark.javadsl.VersionedDataConverter;importcom.here.platform.data.client.spark.scaladsl.GroupedData;importcom.here.platform.data.client.spark.scaladsl.VersionedRowMetadata;importcom.here.platform.pipeline.PipelineContext;importcom.here.schema.rib.v2.TopologyGeometry.Node;importcom.here.schema.rib.v2.TopologyGeometryPartitionOuterClass;importcom.here.schema.rib.v2.TopologyGeometryPartitionOuterClass.TopologyGeometryPartition;importcom.typesafe.config.ConfigBeanFactory;importjava.io.Serializable;importjava.util.Iterator;importjava.util.List;importjava.util.stream.Collectors;importorg.apache.spark.api.java.function.MapFunction;importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Encoders;importorg.apache.spark.sql.Row;importorg.apache.spark.sql.SparkSession;publicclassSparkConnectorLocal{// Class representing the number of nodes in a level 14 sub-tilepublicstaticclassNodeCount{privatelong tileId;privateint count;publicNodeCount(long tileId,int count){this.tileId = tileId;this.count = count;}publicStringtoGeoJson(){BoundingBox box =HereQuadFactory.INSTANCE.getMapQuadByLongKey(tileId).getBoundingBox();double e = box.getEast();double w = box.getWest();double n = box.getNorth();double s = box.getSouth();// from black (0 nodes) to red (500 nodes)String color =String.format("rgb(%d,0,0)",(int)(Math.min(count,500)/500.0*255));returnString.format("{\"type\":\"Feature\","+"\"geometry\":{\"type\":\"Polygon\",\"coordinates\":[[[%f, %f], [%f, %f], [%f, %f], [%f, %f], [%f, %f]]]},"+"\"properties\":{\"description\":{\"tileId\":%d,\"count\":%d},\"style\":{\"color\":\"%s\"}}}",
w, s, e, s, e, n, w, n, w, s, tileId, count, color);}}// Class representing the decoded content of an output partitionpublicstaticclassNodeCountPartition{privateString partitionName;privateList<NodeCount> counts;publicNodeCountPartition(String partitionName,List<NodeCount> counts){this.partitionName = partitionName;this.counts = counts;}publicStringtoGeoJson(){returnString.format("{\"type\":\"FeatureCollection\",\"features\":[%s]}",
counts.stream().map(NodeCount::toGeoJson).collect(Collectors.joining(",")));}publicStringgetPartitionName(){return partitionName;}}// Class representing an encoded output partitionpublicstaticclassOutputDataimplementsSerializable{privateString mt_partition;privatebyte[] data;publicOutputData(String mt_partition,byte[] data){this.mt_partition = mt_partition;this.data = data;}// Minimal bean interface - required to use Encoders.beanpublicStringgetMt_partition(){return mt_partition;}publicvoidsetMt_partition(String mt_partition){this.mt_partition = mt_partition;}publicbyte[]getData(){return data;}publicvoidsetData(byte[] data){this.data = data;}}// We expose two static methods, `main` and `run`. `main` reads the pipeline context and the// configuration from `pipeline-config.conf` and `application.conf`, while `run` accepts them// as parameters. This makes it convenient to test the logic later using `run` with different// configuration parameters, without having to pass them through the classpath.publicstaticvoidmain(String[] args){// Read the default pipeline contextPipelineContext pipelineContext =newPipelineContext();// Read the bounding box configured in `application.conf`BoundingBox boundingBox =ConfigBeanFactory.create(
pipelineContext.applicationConfig().getConfig("tutorial.boundingBox"),BoundingBox.class);run(pipelineContext, boundingBox);}publicstaticvoidrun(PipelineContext pipelineContext,BoundingBox bbox){// Defines the input / output catalogs to be read / written toHRN inputHrn = pipelineContext.getConfig().getInputCatalogs().get("here-map-content");HRN outputHrn = pipelineContext.getConfig().getOutputCatalog();// Input / output layersString topologyGeometryLayer ="topology-geometry";String outputLayer ="node-count";SparkSession sparkSession =SparkSession.builder().appName("SparkTopologyNodeCount").getOrCreate();// Read the input data as a Dataset of topology partitionsDataset<TopologyGeometryPartitionOuterClass.TopologyGeometryPartition> topologyGeometryData =JavaLayerDataFrameReader.create(sparkSession).readLayer(inputHrn, topologyGeometryLayer).query(String.format("mt_partition=inboundingbox=(%f,%f,%f,%f)",
bbox.getNorth(), bbox.getSouth(), bbox.getEast(), bbox.getWest())).option("olp.connector.force-raw-data",true).load().select("data").as(Encoders.BINARY()).map((MapFunction<byte[],TopologyGeometryPartition>)TopologyGeometryPartition::parseFrom,Encoders.kryo(TopologyGeometryPartitionOuterClass.TopologyGeometryPartition.class));// Compute the output partitionsDataset<NodeCountPartition> nodeCountPartitions =
topologyGeometryData.map((MapFunction<TopologyGeometryPartition,NodeCountPartition>)
partition ->{List<NodeCount> counts =
partition
.getNodeList().stream().map(Node::getGeometry).map(
g ->HereQuadFactory.INSTANCE
.getMapQuadByLocation(g.getLatitude(), g.getLongitude(),14).getLongKey()).collect(Collectors.groupingBy(t -> t)).entrySet().stream().map(e ->newNodeCount(e.getKey(), e.getValue().size())).collect(Collectors.toList());returnnewNodeCountPartition(partition.getPartitionName(), counts);},Encoders.kryo(NodeCountPartition.class));// Encode the output partitions, convert to DataFrame and publish the output dataJavaLayerDataFrameWriter.create(
nodeCountPartitions
.map((MapFunction<NodeCountPartition,OutputData>)
p ->newOutputData(p.getPartitionName(), p.toGeoJson().getBytes()),Encoders.bean(OutputData.class)).toDF()).writeLayer(outputHrn, outputLayer).withDataConverter(newVersionedDataConverter(){@OverridepublicGroupedData<VersionedRowMetadata>serializeGroup(VersionedRowMetadata rowMetadata,Iterator<Row> rows){returnnewGroupedData<>(rowMetadata, rows.next().getAs("data"));}}).save();
sparkSession.stop();}}
ローカルカタログが作成さ hrn:local:data:::here-map-contentれます。 HERE Map Content カタログの最新バージョンをコピーtopology-geometry し、レイヤーのみを初期化してコピーします。 指定したバウンディング ボックスに含まれているパーティションのみを含めます。 ローカルカタログコピーは、ソースカタログに関する情報を保持する特殊なローカルカタログです。このカタログでは、次のコマンドを実行してローカルコピーを更新できます。
olp local catalog copy update hrn:local:data:::here-map-content
このローカルコピーを入力として使用するようにパイプライン設定を変更できるようになりました。
pipeline.config {
output-catalog {hrn = "hrn:local:data:::node-count"}
input-catalogs {// Notice how we use a local catalog here - instead of the HERE Map Content catalog on the// platform.
here-map-content {hrn = "hrn:local:data:::here-map-content"}}}
here.platform.data-client {
# force local environment for all catalog accesses
endpoint-locator.discovery-service-env = local
}
here.platform.data-local {
# use in-memory local catalogs
memory-mode = true}
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/importcom.here.hrn.HRN
importcom.here.olp.util.geo.BoundingBox
importcom.here.olp.util.quad.factory.HereQuadFactory
importcom.here.platform.data.client.engine.scaladsl.DataEngine
importcom.here.platform.data.client.model._
importcom.here.platform.data.client.scaladsl.{DataClient, NewPartition}importcom.here.platform.data.client.spark.DataClientSparkContextUtils.context._
importcom.here.platform.data.client.spark.SparkSupport._
importcom.here.platform.pipeline.{PipelineConfig, PipelineContext}importcom.here.schema.geometry.v2.geometry.Point
importcom.here.schema.rib.v2.topology_geometry.Node
importcom.here.schema.rib.v2.topology_geometry_partition.TopologyGeometryPartition
importorg.junit.runner.RunWith
importorg.scalatest.junit.JUnitRunner
importorg.scalatest.{FlatSpec, Inspectors, Matchers}importjava.util.UUID
importscala.collection.JavaConverters._
importscala.util.Random
@RunWith(classOf[JUnitRunner])class SparkConnectorLocalScalaTest extends FlatSpec with Matchers with Inspectors {// Specify the bounding box used by our pipeline under test.privateval testBoundingBox =new BoundingBox(52.67551,52.33826,13.76116,13.08835)// Utilities to create a random topology node within a bounding box.privateval random =new Random(42)privatedef randomInRange(min:Double, max:Double):Double=
min +(max - min)* random.nextDouble()privatedef randomNode(boundingBox: BoundingBox): Node =
Node(
geometry = Some(
Point(
latitude = randomInRange(boundingBox.getSouth, boundingBox.getNorth),
longitude = randomInRange(boundingBox.getWest, boundingBox.getEast))))// Utility to create a catalog with a single HERE Tile versioned layer. The catalog will be local// because we use `here.platform.data-client.endpoint-locator.discovery-service-env=local` in// src/test/resources/application.conf. It will be kept in memory only, because we also use// `here.platform.data-local.memory-mode=true`.privatedef createCatalog(catalogIdPrefix:String, layerId:String): HRN =
DataClient().adminApi().createCatalog(
WritableCatalogConfiguration(
catalogIdPrefix +"-"+ UUID.randomUUID().toString,
catalogIdPrefix,"summary","description",
Seq(
WritableLayer(
layerId,
layerId,"summary","description",
LayerTypes.Versioned,
HereTilePartitioning(List(12)),
DurableVolume,
contentType ="application/octet-stream")))).awaitResult()// Utility to publish some test data in a catalog.privatedef writeData(hrn: HRN, layer:String, data: Map[String, Array[Byte]]):Unit={val partitions = data.map {case(partitionId, data)=>
NewPartition(
partitionId,
layer,
NewPartition.ByteArrayData(data))}
DataEngine().writeEngine(hrn).publishBatch2(4, Some(Seq(layer)), Nil, partitions.iterator).awaitResult()}// Utility to read the data from the latest version of a catalog.privatedef readData(hrn: HRN, layer:String): Map[String, Array[Byte]]={val readEngine = DataEngine().readEngine(hrn)val queryApi = DataClient().queryApi(hrn)
queryApi
.getLatestVersion().flatMap {case None => fail("Catalog is empty")case Some(v)=> queryApi.getPartitionsAsIterator(v, layer)}.awaitResult().map(p => p.partition -> readEngine.getDataAsBytes(p).awaitResult()).toMap
}// Utility to create an input test catalog given the expected outcome. `expectedCounts` is a map// containing, for each level 14 tile ID, the number of nodes contained in that sub-tile.// This method creates the requested number of topology nodes in each level-14 tile, encodes them// in a topology partition and publishes them in a freshly created test catalog.privatedef createTestInputCatalog(expectedCounts: Map[Long,Int]): HRN ={val hrn = createCatalog("input","topology-geometry")val partitions: Map[String, Array[Byte]]= expectedCounts
.map {case(tileId, count)=>val boundingBox = HereQuadFactory.INSTANCE.getMapQuadByLongKey(tileId).getBoundingBox
val nodes =(1L to count).map(_ => randomNode(boundingBox))
tileId -> nodes
}.groupBy {case(tileId, _)=>
HereQuadFactory.INSTANCE.getMapQuadByLongKey(tileId).getAncestor(12).getLongKey
}.map {case(tileId, nodes)=>
tileId.toString -> TopologyGeometryPartition(
partitionName = tileId.toString,
node = nodes.values.reduce(_ ++ _)).toByteArray
}
writeData(hrn,"topology-geometry", partitions)
hrn
}// Utility to create an output catalog for the pipeline.privatedef createTestOutputCatalog(): HRN = createCatalog("output","node-count")// Utility to retrieve the actual node count per level 14 tile in the output catalog. It reads the// latest version of the catalog, retrieves the data, decodes it and returns the node count// encoded in each level 14 tile. This is used, after a run of the pipeline under test, to verify// that the node count is the expected on.privatedef readActualNodeCount(hrn: HRN): Map[Long,Int]={// We use regular expressions to extract the tileId of the level 14 tiles and their node count.// Using a proper JSON parser is left as an exercise.val descriptionRegex =""""description":\s*\{"tileId":\s*(\d+),\s*"count":\s*(\d+)}""".r.unanchored
readData(hrn,"node-count").flatMap {case(_, data)=>val geoJson =newString(data)
descriptionRegex.findAllMatchIn(geoJson).map(m => m.group(1).toLong -> m.group(2).toInt)}}// Randomly generated expected result. For each level 14 tile in `testBoundingBox` we generate a// random non-negative node count. We filter out zeros because those are by design not encoded by// the pipeline under test.privateval expectedNodeCount: Map[Long,Int]={
HereQuadFactory.INSTANCE
.iterableBoundingBoxToMapQuad(testBoundingBox,14).asScala
.map { quad =>
quad.getLongKey ->(random.nextInt.abs %100)}.filter(_._2 !=0).toMap
}"SparkConnectorLocalScala" should "correctly compute the number of nodes in each sub-tile" in {// Create the input catalog and publish the test data.val inputHrn = createTestInputCatalog(expectedNodeCount)// Create an empty output catalog.val outputHrn = createTestOutputCatalog()val pipelineContext =
PipelineContext(PipelineConfig(outputHrn, Map("here-map-content"-> inputHrn)))// Run the pipeline under test.
SparkConnectorLocalScala.run(pipelineContext, testBoundingBox)// Check the output data.
readActualNodeCount(outputHrn) should contain theSameElementsAs expectedNodeCount
}}
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/importstaticjunit.framework.TestCase.assertEquals;importcom.here.hrn.HRN;importcom.here.olp.util.geo.BoundingBox;importcom.here.olp.util.quad.factory.HereQuadFactory;importcom.here.platform.data.client.engine.javadsl.DataEngine;importcom.here.platform.data.client.engine.javadsl.ReadEngine;importcom.here.platform.data.client.javadsl.DataClient;importcom.here.platform.data.client.javadsl.NewPartition;importcom.here.platform.data.client.javadsl.Partition;importcom.here.platform.data.client.javadsl.QueryApi;importcom.here.platform.data.client.model.*;importcom.here.platform.data.client.spark.DataClientSparkContextUtils;importcom.here.platform.pipeline.PipelineConfig;importcom.here.platform.pipeline.PipelineContext;importcom.here.schema.geometry.v2.GeometryOuterClass;importcom.here.schema.rib.v2.TopologyGeometry;importcom.here.schema.rib.v2.TopologyGeometryPartitionOuterClass;importjava.util.*;importjava.util.concurrent.CompletionStage;importjava.util.regex.Matcher;importjava.util.regex.Pattern;importjava.util.stream.Collectors;importjava.util.stream.IntStream;importjava.util.stream.StreamSupport;importorg.junit.Test;publicclassSparkConnectorLocalTest{// Simple Pair class used to collect maps with Java streams.staticclassPair<K,V>{privateK key;privateV value;publicPair(K key,V value){this.key = key;this.value = value;}publicKgetKey(){return key;}publicVgetValue(){return value;}}// Specify the bounding box used by our pipeline under test.privateBoundingBox testBoundingBox =newBoundingBox(52.67551,52.33826,13.76116,13.08835);// Utilities to create a random topology node within a bounding box.privateRandom random =newRandom(42);privatedoublerandomInRange(double min,double max){return min +(max - min)* random.nextDouble();}privateTopologyGeometry.NoderandomNode(BoundingBox boundingBox){returnTopologyGeometry.Node.newBuilder().setGeometry(GeometryOuterClass.Point.newBuilder().setLatitude(randomInRange(boundingBox.getSouth(), boundingBox.getNorth())).setLongitude(randomInRange(boundingBox.getWest(), boundingBox.getEast())).build()).build();}// Utility to create a catalog with a single HERE Tile versioned layer. The catalog will be local// because we use `here.platform.data-client.endpoint-locator.discovery-service-env=local` in// src/test/resources/application.conf. It will be kept in memory only, because we also use// `here.platform.data-local.memory-mode=true`.privateHRNcreateCatalog(String catalogIdPrefix,String layerId){returnawait(DataClient.get(DataClientSparkContextUtils.context().actorSystem()).adminApi().createCatalog(newWritableCatalogConfiguration.Builder().withId(catalogIdPrefix +"-"+ UUID.randomUUID().toString()).withName("name").withSummary("summary").withDescription("description").withLayers(Collections.singletonList(newWritableLayer.Builder().withId(layerId).withName("name").withSummary("summary").withDescription("description").withLayerType(LayerTypes.Versioned()).withPartitioning(Partitioning.HereTile().withTileLevels(Collections.singletonList(12)).build()).withVolume(Volumes.Durable()).withContentType("application/octet-stream"))).build()));}// Utility to publish some test data in a catalog.privatevoidwriteData(HRN hrn,String layer,Map<String,byte[]> data){Iterator<NewPartition> partitions =
data.entrySet().stream().map(
e ->newNewPartition.Builder().withPartition(e.getKey()).withLayer(layer).withData(e.getValue()).build()).collect(Collectors.toList()).iterator();await(DataEngine.get(DataClientSparkContextUtils.context().actorSystem()).writeEngine(hrn).publishBatch2(4,Optional.of(Collections.singletonList(layer)),Collections.emptyList(),
partitions));}// Utility to read the data from the latest version of a catalog.privateMap<String,byte[]>readData(HRN hrn,String layer){ReadEngine readEngine =DataEngine.get(DataClientSparkContextUtils.context().actorSystem()).readEngine(hrn);QueryApi queryApi =DataClient.get(DataClientSparkContextUtils.context().actorSystem()).queryApi(hrn);Iterable<Partition> partitions =()->await(
queryApi
.getLatestVersion(OptionalLong.empty()).thenApply(
v -> v.orElseThrow(()->newRuntimeException("Output catalog is empty"))).thenCompose(
v -> queryApi.getPartitionsAsIterator(v, layer,Collections.emptySet())));returnStreamSupport.stream(partitions.spliterator(),false).map(p ->newPair<>(p.getPartition(),await(readEngine.getDataAsBytes(p)))).collect(Collectors.toMap(Pair::getKey,Pair::getValue));}// Utility to create an input test catalog given the expected outcome. `expectedCounts` is a map// containing, for each level 14 tile ID, the number of nodes contained in that sub-tile.// This method creates the requested number of topology nodes in each level-14 tile, encodes them// in a topology partition and publishes them in a freshly created test catalog.privateHRNcreateInputCatalog(Map<Long,Integer> expectedNodeCount){HRN hrn =createCatalog("input","topology-geometry");Map<String,byte[]> partitions =
expectedNodeCount
.entrySet().stream().map(
e ->{BoundingBox boundingBox =HereQuadFactory.INSTANCE.getMapQuadByLongKey(e.getKey()).getBoundingBox();List<TopologyGeometry.Node> nodes =IntStream.rangeClosed(1, e.getValue()).mapToObj(notused ->randomNode(boundingBox)).collect(Collectors.toList());returnnewPair<>(e.getKey(), nodes);}).collect(Collectors.groupingBy(
p ->HereQuadFactory.INSTANCE
.getMapQuadByLongKey(p.getKey()).getAncestor(12).getLongKey())).entrySet().stream().map(
e ->{byte[] data =TopologyGeometryPartitionOuterClass.TopologyGeometryPartition.newBuilder().setPartitionName(String.valueOf(e.getKey())).addAllNode(
e.getValue().stream().flatMap(p -> p.getValue().stream()).collect(Collectors.toList())).build().toByteArray();returnnewPair<>(String.valueOf(e.getKey()), data);}).collect(Collectors.toMap(Pair::getKey,Pair::getValue));writeData(hrn,"topology-geometry", partitions);return hrn;}// Utility to create an output catalog for the pipeline.privateHRNcreateOutputCatalog(){returncreateCatalog("output","node-count");}private<T>Tawait(CompletionStage<T> stage){try{return stage.toCompletableFuture().get();}catch(Exception e){thrownewRuntimeException(e);}}// Utility to retrieve the actual node count per level 14 tile in the output catalog. It reads the// latest version of the catalog, retrieves the data, decodes it and returns the node count// encoded in each level 14 tile. This is used, after a run of the pipeline under test, to verify// that the node count is the expected on.privateMap<Long,Integer>readActualNodeCount(HRN hrn){// We use regular expressions to extract the tileId of the level 14 tiles and their node count.// Using a proper JSON parser is left as an exercise.Pattern descriptionRegex =Pattern.compile("\"description\":\\s*\\{\"tileId\":\\s*(\\d+),\\s*\"count\":\\s*(\\d+)}");returnreadData(hrn,"node-count").entrySet().stream().flatMap(
e ->{String geoJson =newString(e.getValue());Matcher matcher = descriptionRegex.matcher(geoJson);List<Pair<Long,Integer>> matches =newArrayList<>();while(matcher.find()){
matches.add(newPair<>(Long.parseLong(matcher.group(1)),Integer.parseInt(matcher.group(2))));}return matches.stream();}).collect(Collectors.toMap(Pair::getKey,Pair::getValue));}privateMap<Long,Integer>initializeExpectedNodeCount(){returnStreamSupport.stream(HereQuadFactory.INSTANCE
.iterableBoundingBoxToMapQuad(testBoundingBox,14).spliterator(),false).map(quad ->newPair<>(quad.getLongKey(),Math.abs(random.nextInt())%100)).filter(pair -> pair.getValue()!=0).collect(Collectors.toMap(Pair::getKey,Pair::getValue));}// Randomly generated expected result. For each level 14 tile in `testBoundingBox` we generate a// random non-negative node count. We filter out zeros because those are by design not encoded by// the pipeline under test.privateMap<Long,Integer> expectedNodeCount =initializeExpectedNodeCount();@TestpublicvoidcorrectNodeCountTest(){// Create the input catalog and publish the test data.HRN inputHrn =createInputCatalog(expectedNodeCount);// Create an empty output catalog.HRN outputHrn =createOutputCatalog();PipelineContext pipelineContext =newPipelineContext(newPipelineConfig(outputHrn,Collections.singletonMap("here-map-content", inputHrn)),Optional.empty());// Run the pipeline under test.SparkConnectorLocal.run(pipelineContext, testBoundingBox);// Check the output data.assertEquals(readActualNodeCount(outputHrn), expectedNodeCount);}}
Maven でテストを実行するには、次のコマンドを実行します。
mvn test -Dspark.master=local[*]
プラットフォームにデプロイします
アプリケーションに問題がなければ、次のように Fat JAR ファイル を作成できます。
mvn -Pplatform clean package
コードを変更せずに、パイプライン API 経由で展開します。 パイプラインの展開の詳細について は、『 Pipelines 開発者ガイド』を参照してください。