/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/importcom.here.hrn.HRN
importcom.here.platform.data.client.base.scaladsl.BaseClient
importcom.here.platform.data.client.engine.scaladsl.DataEngine
importcom.here.platform.data.client.model.VersionDependency
importcom.here.platform.data.client.scaladsl.{DataClient, NewPartition}importcom.here.platform.data.client.spark.DataClientSparkContextUtils
importcom.here.platform.data.client.spark.LayerDataFrameReader._
importcom.here.platform.data.client.spark.SparkSupport._
importcom.here.platform.location.core.geospatial.GeoCoordinate
importcom.here.platform.location.core.mapmatching.OnRoad
importcom.here.platform.location.integration.optimizedmap.OptimizedMapLayers
importcom.here.platform.location.integration.optimizedmap.dcl2.OptimizedMapCatalog
importcom.here.platform.location.integration.optimizedmap.mapmatching.PathMatchers
importcom.here.platform.pipeline.PipelineContext
importorg.apache.spark.sql.expressions.UserDefinedFunction
importorg.apache.spark.sql.functions._
importorg.apache.spark.sql.{Dataset, Row, SparkSession}// Since it is not convenient to recreate an instance of BaseClient, it's connection pool// and OptimizedMapCatalog caches, they are stored in a singleton and called from it.object OptimizedMapLayersSingleton {lazyval optimizedMapLayers: OptimizedMapLayers ={val pipelineContext =new PipelineContext
val optimizedMapCatalogHrn = pipelineContext.config.inputCatalogs("location")val optimizedMapCatalogVersion = pipelineContext.job.get.inputCatalogs("location").version
OptimizedMapCatalog
.from(optimizedMapCatalogHrn).usingBaseClient(BaseClient()).newInstance
.version(optimizedMapCatalogVersion)}}/**
* Read sensor data from an SDII archive and output the map-matched paths
* to a GeoJSON output catalog with the same tiling scheme.
*/object BatchPathMatchingScala {privateval pipelineContext =new PipelineContext
privateval outputCatalogHrn = pipelineContext.config.outputCatalog
privateval outputLayer ="matched-trips"privateval sensorDataArchiveHrn = pipelineContext.config.inputCatalogs("sensor")privateval sensorDataArchiveVersionLayerName ="sample-index-layer"privateval locationCatalogHrn = pipelineContext.config.inputCatalogs("location")privateval locationCatalogVersion = pipelineContext.job.get.inputCatalogs("location").version
def main(args: Array[String]):Unit={val sparkSession = SparkSession.builder().appName("BatchPathMatchingPipeline").getOrCreate()val sensorData = getSensorData(sparkSession)val matchedPaths = matchPaths(sparkSession, sensorData)
publish(matchedPaths)
sparkSession.stop()}/**
* Retrieves the contents of the sensor data archive partitions.
*/privatedef getSensorData(sparkSession: SparkSession): Dataset[SensorData]={importsparkSession.implicits._
// Extracting input data.// There are two different options for the query parameter: `hour_from` and `tile_id`.// You can either make it `hour_from>0` to get all available messages or make it// `tile_id==$tile_id` to get messages by the specific partition.// Possible tile_ids = [377893756, 377894443, 377894442, ...].// For CN environment = [389695267, 389696772, 389695401, ...].// You can implement more complex queries with RSQL// (https://here-tech.skawa.fun/documentation/data-client-library/dev_guide/client/rsql.html)val sdiiMessages = sparkSession
.readLayer(sensorDataArchiveHrn, sensorDataArchiveVersionLayerName).query(s"hour_from>0").load()val sensorData = sdiiMessages
.select($"idx_tile_id" as "tileId", $"path.positionEstimate" as "fullPositionEstimate").withColumn("positionEstimate", convertToGeoCoordinate($"fullPositionEstimate")).groupBy("tileId").agg(collect_list("positionEstimate") as "positionEstimateList").as[SensorData]
sensorData
}/**
* Map match each path inside each sensor archive tile
* (https://en.wikipedia.org/wiki/Map_matching)
* and return a list of lists of coordinates.
* Each element in the top-level list represents a path as a list of coordinates.
* Convert each path to GeoJson.
*/privatedef matchPaths(sparkSession: SparkSession,
sensorData: Dataset[SensorData]): Dataset[(Long,String)]={importsparkSession.implicits._
/**
* Convert the trips to GeoJson; each trip is converted to a linestring.
*/def convertToGeoJSON(
matchedTrips: Dataset[(Long, Seq[Seq[GeoCoordinate]])]): Dataset[(Long,String)]=
matchedTrips.map { matchedTrip =>val features = matchedTrip._2
.map(
coordinates =>"""{ "type": "Feature", "geometry": """+"""{ "type": "LineString", "coordinates": """+
coordinates.map(c => s"[${c.longitude}, ${c.latitude}]").mkString("[",",","]")+"}"+""", "properties": {} }""")val featureCollection =
features.mkString("""{ "type": "FeatureCollection", "features": [""",",","]}")(matchedTrip._1, featureCollection)}val matchedTrips = sensorData.mapPartitions { sensorDataIterator =>val optimizedMap = OptimizedMapLayersSingleton.optimizedMapLayers
val pathMatcher =
PathMatchers(optimizedMap).carPathMatcherWithoutTransitions[GeoCoordinate]
sensorDataIterator.map { sensorDataElement =>
sensorDataElement.tileId ->
sensorDataElement.positionEstimateList
.map {
pathMatcher
.matchPath(_).results
.flatMap {case OnRoad(matched)=>
Some(GeoCoordinate(matched.nearest.latitude, matched.nearest.longitude))case _ => None
}}}}
convertToGeoJSON(matchedTrips)}val convertToGeoCoordinate: UserDefinedFunction = udf((positionEstimateRows: Seq[Row])=>{
positionEstimateRows.map { positionEstimate =>val lat = positionEstimate.getAs[Double]("latitude_deg")val lon = positionEstimate.getAs[Double]("longitude_deg")
GeoCoordinate(lat, lon)}})/**
* Publish the GeoJSON partitions to the output catalog using Data Client Library.
*/privatedef publish(geoJsonByTile: Dataset[(Long,String)]):Unit={val masterActorSystem =
DataClientSparkContextUtils.context.actorSystem
// Start commit on master.val masterPublishApi = DataClient(masterActorSystem).publishApi(outputCatalogHrn)// Get the latest catalog version of the output catalog.val baseVersion = masterPublishApi.getBaseVersion().awaitResult()// Fill in the direct and indirect dependencies for the output catalog, given the direct inputs.// This is good practice, especially if you intend to use the scheduler to// determine when the pipeline should run, (i.e., by using the --with-scheduler option// in// For users using platform.here.com:// https://here-tech.skawa.fun/documentation/open-location-platform-cli/user_guide/topics/pipeline/version-commands.html#pipeline-version-create// For users using platform.hereolp.cn:// https://here-tech.skawa.fun/cn/documentation/open-location-platform-cli/user_guide/topics/pipeline/version-commands.html#pipeline-version-createval dependencies =
gatherDependencies(locationCatalogHrn, locationCatalogVersion)// Start a publication batch on top of most recent catalog version.val batchToken =
masterPublishApi.startBatch2(baseVersion, Some(Seq(outputLayer)), dependencies).awaitResult()// Send partitions to workers, and upload data and metadata.
geoJsonByTile.foreachPartition({ partitions: Iterator[(Long,String)]=>val workerActorSystem = DataClientSparkContextUtils.context.actorSystem
val workerPublishApi = DataClient(workerActorSystem).publishApi(outputCatalogHrn)val workerWriteEngine = DataEngine(workerActorSystem).writeEngine(outputCatalogHrn)val committedPartitions =
partitions.map {case(tileId:Long, geoJson:String)=>val newPartition =
NewPartition(
partition = tileId.toString,
layer = outputLayer,
data = NewPartition.ByteArrayData(geoJson.getBytes))
workerWriteEngine.put(newPartition).awaitResult()}
workerPublishApi
.publishToBatch(batchToken, committedPartitions).awaitResult()()})
masterPublishApi.completeBatch(batchToken).awaitResult()}/**
* Gather the dependencies for the output catalog which depends on the location library catalog.
*/privatedef gatherDependencies(locationHrn: HRN, locationVersion:Long)={val sparkActorSystem = DataClientSparkContextUtils.context.actorSystem
val locationLibraryQuery = DataClient(sparkActorSystem).queryApi(locationHrn)val locationLibraryDeps =
locationLibraryQuery.getVersion(locationVersion).awaitResult().dependencies
// The output catalog depends directly on the location library// catalog, and indirectly on its respective dependencies.
locationLibraryDeps.map(_.copy(direct =false))++
Seq(VersionDependency(locationHrn, locationVersion, direct =true))}/**
* Class used for encoding data in Dataset.
*/caseclass SensorData(tileId:Long, positionEstimateList: Seq[Seq[GeoCoordinate]])}
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/importstaticorg.apache.spark.sql.functions.col;importstaticorg.apache.spark.sql.functions.collect_list;importstaticorg.apache.spark.sql.functions.udf;importakka.actor.ActorSystem;importakka.japi.Pair;importcom.here.hrn.HRN;importcom.here.platform.data.client.base.javadsl.BaseClient;importcom.here.platform.data.client.base.javadsl.BaseClientJava;importcom.here.platform.data.client.engine.javadsl.DataEngine;importcom.here.platform.data.client.engine.javadsl.WriteEngine;importcom.here.platform.data.client.javadsl.CommitPartition;importcom.here.platform.data.client.javadsl.DataClient;importcom.here.platform.data.client.javadsl.NewPartition;importcom.here.platform.data.client.javadsl.PublishApi;importcom.here.platform.data.client.javadsl.QueryApi;importcom.here.platform.data.client.model.BatchToken;importcom.here.platform.data.client.model.VersionDependency;importcom.here.platform.data.client.spark.DataClientSparkContextUtils;importcom.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;importcom.here.platform.location.core.geospatial.GeoCoordinate;importcom.here.platform.location.core.mapmatching.MatchResult;importcom.here.platform.location.core.mapmatching.NoTransition;importcom.here.platform.location.core.mapmatching.OnRoad;importcom.here.platform.location.core.mapmatching.javadsl.MatchResults;importcom.here.platform.location.core.mapmatching.javadsl.PathMatcher;importcom.here.platform.location.inmemory.graph.Vertex;importcom.here.platform.location.integration.optimizedmap.OptimizedMapLayers;importcom.here.platform.location.integration.optimizedmap.dcl2.javadsl.OptimizedMapCatalog;importcom.here.platform.location.integration.optimizedmap.mapmatching.javadsl.PathMatchers;importcom.here.platform.pipeline.PipelineContext;importjava.io.Serializable;importjava.util.*;importjava.util.concurrent.ExecutionException;importjava.util.stream.Collectors;importjava.util.stream.StreamSupport;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Encoder;importorg.apache.spark.sql.Encoders;importorg.apache.spark.sql.Row;importorg.apache.spark.sql.SparkSession;importorg.apache.spark.sql.api.java.UDF1;importorg.apache.spark.sql.expressions.UserDefinedFunction;importorg.apache.spark.sql.types.ArrayType;importorg.apache.spark.sql.types.DataTypes;importscala.collection.JavaConverters;importscala.collection.mutable.WrappedArray;/**
* Read sensor data from an SDII archive and output the map-matched paths to a GeoJSON output
* catalog with the same tiling scheme.
*/publicclassBatchPathMatchingJava{privatestaticfinalPipelineContext pipelineContext =newPipelineContext();privatestaticfinalHRN OUTPUT_CATALOG_HRN = pipelineContext.getConfig().getOutputCatalog();privatestaticfinalString OUTPUT_LAYER ="matched-trips";privatestaticfinalHRN SENSOR_DATA_ARCHIVE_HRN =
pipelineContext.getConfig().getInputCatalogs().get("sensor");privatestaticfinalString SENSOR_DATA_ARCHIVE_VERSION_LAYER_NAME ="sample-index-layer";privatestaticfinalHRN LOCATION_CATALOG_HRN =
pipelineContext.getConfig().getInputCatalogs().get("location");privatestaticfinalLong LOCATION_CATALOG_VERSION =
pipelineContext.getJob().get().getInputCatalogs().get("location").version();// Since it is not convenient to recreate an instance of BaseClient, it's connection pool// and OptimizedMapCatalog caches, they are stored in a singleton and called from it.privateenumOptimizedMapLayersSingleton{
INSTANCE;privatefinalOptimizedMapLayers optimizedMap;OptimizedMapLayersSingleton(){BaseClient baseClient =BaseClientJava.instance();
optimizedMap =OptimizedMapCatalog.from(LOCATION_CATALOG_HRN).usingBaseClient(baseClient).newInstance().version(LOCATION_CATALOG_VERSION);}privateOptimizedMapLayersoptimizedMap(){return optimizedMap;}}publicstaticvoidmain(String[] args)throwsInterruptedException,ExecutionException{SparkSession sparkSession =SparkSession.builder().appName("BatchPathMatchingPipeline").getOrCreate();Dataset<SensorData> sensorData =getSensorData(sparkSession);JavaRDD<Pair<Long,String>> matchedPaths =matchPaths(sensorData);publish(matchedPaths);
sparkSession.stop();}/** Retrieves the contents of the sensor data archive partitions. */privatestaticDataset<SensorData>getSensorData(SparkSession sparkSession){// Extracting input data.// There are two different options for the query parameter: `hour_from` and `tile_id`.// You can either make it `hour_from>0` to get all available messages or make it// `tile_id==$tile_id` to get messages by the specific partition.// Possible tile_ids = [377893756, 377894443, 377894442, ...].// For CN environment = [389695267, 389696772, 389695401, ...].// You can implement more complex queries with RSQL// (https://here-tech.skawa.fun/documentation/data-client-library/dev_guide/client/rsql.html)Dataset<Row> sdiiMessages =JavaLayerDataFrameReader.create(sparkSession).readLayer(SENSOR_DATA_ARCHIVE_HRN, SENSOR_DATA_ARCHIVE_VERSION_LAYER_NAME).query("hour_from>0").load();// Define structure of List of coordinate pairs - Spark uses it to encode dataArrayType schema =DataTypes.createArrayType(DataTypes.StringType);// UDF to extract only coordinates from the complex structure of pathUserDefinedFunction positionEstimateToCoordinatePair =udf((UDF1<WrappedArray<Row>,List<String>>)
positionEstimateList ->JavaConverters.seqAsJavaList(positionEstimateList).stream().map(
positionEstimate ->{Double lat = positionEstimate.getAs("latitude_deg");Double lon = positionEstimate.getAs("longitude_deg");return lat +","+ lon;}).collect(Collectors.toList()),
schema);Encoder<SensorData> sensorDataEncoder =Encoders.bean(SensorData.class);// Extract list of paths (each path represented as a list of coordinates)return sdiiMessages
.select(col("idx_tile_id").as("tileId"),col("path.positionEstimate").as("fullPositionEstimate")).withColumn("positionEstimate", positionEstimateToCoordinatePair.apply(col("fullPositionEstimate"))).groupBy("tileId").agg(collect_list("positionEstimate").as("positionEstimateList")).as(sensorDataEncoder);}/**
* Map match each path inside each sensor archive tile
* (https://en.wikipedia.org/wiki/Map_matching) and return a list of lists of (lon,lat) string
* pairs. Each element in the top-level list represents a path as a list of coordinates. Convert
* each path to GeoJson.
*/privatestaticJavaRDD<Pair<Long,String>>matchPaths(Dataset<SensorData> sensorData){JavaRDD<Pair<Long,List<List<GeoCoordinate>>>> matchedTrips =
sensorData
.javaRDD().mapPartitions(
tiles ->{OptimizedMapLayers optimizedMap =OptimizedMapLayersSingleton.INSTANCE.optimizedMap();PathMatcher<GeoCoordinate,Vertex,NoTransition> pathMatcher =newPathMatchers(optimizedMap).carPathMatcherWithoutTransitions();Iterable<SensorData> tileIterable =()-> tiles;returnStreamSupport.stream(tileIterable.spliterator(),false).map(
sensorDataElement ->newPair<>(
sensorDataElement.getTileId(),getCoordinatesMatchedToPath(
pathMatcher, sensorDataElement.getPositionEstimateList()))).iterator();});returnconvertToGeoJSON(matchedTrips);}/** Using path matcher get new list of lists of GeoCoordinates, each list representing a path. */privatestaticList<List<GeoCoordinate>>getCoordinatesMatchedToPath(PathMatcher<GeoCoordinate,Vertex,NoTransition> pathMatcher,List<List<String>> positionEstimatePathList){return positionEstimatePathList
.stream().map(BatchPathMatchingJava::convertToGeoCoordinates).map(
positionEstimatePath ->
pathMatcher
.matchPath(positionEstimatePath).results().stream().map(BatchPathMatchingJava::matchResultOnRoad).filter(Objects::nonNull).collect(Collectors.toList())).collect(Collectors.toList());}/** Convert list of string pairs, representing coordinates to actual GeoCoordinates */privatestaticList<GeoCoordinate>convertToGeoCoordinates(List<String> coordinatePairs){return coordinatePairs
.stream().map(
coordinatePair ->{String[] coordinates = coordinatePair.split(",",2);double latitude =Double.parseDouble(coordinates[0]);double longitude =Double.parseDouble(coordinates[1]);returnnewGeoCoordinate(latitude, longitude);}).collect(Collectors.toList());}/** Construct new GeoCoordinates for on-road matches. */privatestaticGeoCoordinatematchResultOnRoad(MatchResult<Vertex> matchResult){if(MatchResults.isOnRoad(matchResult)){OnRoad<Vertex> onRoad =(OnRoad<Vertex>) matchResult;GeoCoordinate nearest = onRoad.elementProjection().nearest();returnnewGeoCoordinate(nearest.latitude(), nearest.longitude());}returnnull;}/** Convert the trips to GeoJson; each trip is converted to a linestring. */privatestaticJavaRDD<Pair<Long,String>>convertToGeoJSON(JavaRDD<Pair<Long,List<List<GeoCoordinate>>>> matchedTrips){return matchedTrips.map(
matchedTrip ->{String featureCollection =
matchedTrip
.second().stream().map(BatchPathMatchingJava::mapToFeature).collect(Collectors.joining(",","{ \"type\": \"FeatureCollection\", \"features\": [","]}"));returnnewPair<>(matchedTrip.first(), featureCollection);});}/** Convert path to GeoJson Feature */privatestaticStringmapToFeature(List<GeoCoordinate> coordinates){StringJoiner coordJoiner =newStringJoiner(",","[","]");
coordinates
.stream().map(c ->"["+ c.getLongitude()+", "+ c.getLatitude()+"]").forEach(coordJoiner::add);return"{ \"type\": \"Feature\", \"geometry\": "+"{ \"type\": \"LineString\", \"coordinates\": "+ coordJoiner
+"}"+", \"properties\": {} }";}/** Publish the GeoJSON partitions to the output catalog using Data Client Library. */privatestaticvoidpublish(JavaRDD<Pair<Long,String>> geoJsonByTile)throwsInterruptedException,ExecutionException{ActorSystem masterActorSystem =DataClientSparkContextUtils.context().actorSystem();// Start commit on master.PublishApi masterPublishApi =DataClient.get(masterActorSystem).publishApi(OUTPUT_CATALOG_HRN);// Get the latest catalog version of the output catalog.OptionalLong baseVersion = masterPublishApi.getBaseVersion().toCompletableFuture().get();// Fill in the direct and indirect dependencies for the output catalog, given the direct inputs.// This is good practice, especially if you intend to use the scheduler to// determine when the pipeline should run, (i.e., by using the --with-scheduler option// in// For users using platform.here.com:// https://here-tech.skawa.fun/documentation/open-location-platform-cli/user_guide/topics/pipeline/version-commands.html#pipeline-version-create// For users using platform.hereolp.cn:// https://here-tech.skawa.fun/cn/documentation/open-location-platform-cli/user_guide/topics/pipeline/version-commands.html#pipeline-version-createList<VersionDependency> dependencies =gatherDependencies(LOCATION_CATALOG_HRN, LOCATION_CATALOG_VERSION);// Start a publication batch on top of most recent catalog version.BatchToken batchToken =
masterPublishApi
.startBatch2(
baseVersion,Optional.of(Collections.singletonList(OUTPUT_LAYER)), dependencies).toCompletableFuture().get();// Send partitions to workers, and upload data and metadata.
geoJsonByTile.foreachPartition(
partitions ->{ActorSystem workerActorSystem =DataClientSparkContextUtils.context().actorSystem();PublishApi workerPublishApi =DataClient.get(workerActorSystem).publishApi(OUTPUT_CATALOG_HRN);WriteEngine workerWriteEngine =DataEngine.get(workerActorSystem).writeEngine(OUTPUT_CATALOG_HRN);ArrayList<CommitPartition> commitPartitions =newArrayList<>();while(partitions.hasNext()){Pair<Long,String> content = partitions.next();NewPartition newPartition =newNewPartition.Builder().withPartition(content.first().toString()).withLayer(OUTPUT_LAYER).withData(content.second().getBytes()).build();
commitPartitions.add(workerWriteEngine.put(newPartition).toCompletableFuture().join());}
workerPublishApi
.publishToBatch(batchToken, commitPartitions.iterator()).toCompletableFuture().join();});// Complete the commit.
masterPublishApi.completeBatch(batchToken).toCompletableFuture().join();}/**
* Gather the dependencies for the output catalog which depends on the location library catalog.
*/privatestaticList<VersionDependency>gatherDependencies(HRN locationHrn,Long locationVersion){ActorSystem sparkActorSystem =DataClientSparkContextUtils.context().actorSystem();QueryApi locationLibraryQuery =DataClient.get(sparkActorSystem).queryApi(locationHrn);List<VersionDependency> locationDeps =
locationLibraryQuery
.getVersion(locationVersion).toCompletableFuture().join().getDependencies();List<VersionDependency> retval =
locationDeps
.stream().map(dep ->newVersionDependency(dep.hrn(), dep.version(),false)).collect(Collectors.toList());
retval.add(newVersionDependency(locationHrn, locationVersion,true));return retval;}/**
* Class used for encoding data in Dataset. Contains tileId and list of lists of pairs, each list
* representing a path and each pair representing coordinate.
*/publicstaticclassSensorDataimplementsSerializable{privateLong tileId;privateList<List<String>> positionEstimateList;publicLonggetTileId(){return tileId;}publicvoidsetTileId(Long tileId){this.tileId = tileId;}publicList<List<String>>getPositionEstimateList(){return positionEstimateList;}publicvoidsetPositionEstimateList(List<List<String>> positionEstimateList){this.positionEstimateList = positionEstimateList;}}}
Project resource hrn:here:data::olp-here:olp-sdii-sample-berlin-2 has been linked.
Project resource hrn:here:data::olp-here:here-optimized-map-for-location-library-2 has been linked.
pipeline-job.conf
// NOTE: If you are running the application in a pipeline with the scheduler,// i.e., using the --with-scheduler option when creating the pipeline version// (https://here-tech.skawa.fun/documentation/open-location-platform-cli/user_guide/topics/pipeline/version-commands.html#pipeline-version-create)// then this file is not needed, as the scheduler will pick up the latest versions// of the input catalogs.
pipeline.job.catalog-versions {
output-catalog {base-version = -1}
input-catalogs {
location {
processing-type = "reprocess"
version = 3204}
sensor {
processing-type = "reprocess"
version = 1}}}