getRealTimeTrafficFlow() このメソッドは、 HERE Traffic API client この項の冒頭で説明したパラメータを使用して、を作成し、リアルタイムのトラフィックフローを取得します。 に HERE Traffic API は認証が必要です。 デフォルト $HOME/.here/credentials.properties では、に保存されている資格情報が使用されます。 詳細については 、データ クライアント ベース ライブラリのドキュメントを参照してください。
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/importakka.actor.ActorSystem;importakka.stream.javadsl.Source;importcom.fasterxml.jackson.annotation.JsonInclude;importcom.fasterxml.jackson.core.JsonProcessingException;importcom.fasterxml.jackson.databind.ObjectMapper;importcom.here.hrn.HRN;importcom.here.platform.data.client.base.javadsl.BaseClient;importcom.here.platform.data.client.base.javadsl.BaseClientJava;importcom.here.platform.data.client.base.ols.generated.javadsl.api.traffic.RealTimeTrafficApi;importcom.here.platform.data.client.base.ols.generated.scaladsl.model.traffic.CurrentFlow;importcom.here.platform.data.client.base.ols.generated.scaladsl.model.traffic.FlowItem;importcom.here.platform.data.client.base.ols.generated.scaladsl.model.traffic.FlowResponse;importcom.here.platform.data.client.base.ols.generated.scaladsl.model.traffic.JLocationShape;importcom.here.platform.data.client.engine.javadsl.DataEngine;importcom.here.platform.data.client.engine.javadsl.WriteEngine;importcom.here.platform.data.client.javadsl.NewPartition;importcom.here.platform.data.client.model.PendingPartition;importcom.here.platform.data.client.model.geojson.Feature;importcom.here.platform.data.client.model.geojson.FeatureCollection;importcom.here.platform.data.client.model.geojson.LineString;importcom.here.platform.data.client.spark.DataClientSparkContextUtils;importcom.here.platform.location.core.geospatial.GeoCoordinate;importcom.here.platform.location.integration.herecommons.geospatial.HereTileLevel;importcom.here.platform.location.integration.herecommons.geospatial.javadsl.HereTileResolver;importcom.here.platform.pipeline.PipelineContext;importjava.io.Serializable;importjava.util.*;importjava.util.concurrent.ExecutionException;publicclassRealTimeTrafficApiJava{privatestaticfinalString LAYER_ID ="realtime-traffic";publicstaticvoidmain(String[] args)throwsJsonProcessingException,ExecutionException,InterruptedException{// get real time traffic flowFlowResponse flowResponse =getRealTimeTrafficFlow("circle:52.51652,13.37885;r=3000");// convert the retrieved data to GeoJson and split them in tilesList<PartitionData> partitionData =prepareGeoJsonData(flowResponse.getResults());// publish data to output catalogPipelineContext pipelineContext =newPipelineContext();uploadPartitions(pipelineContext.getConfig().getOutputCatalog(), partitionData);}/** Get real-time traffic flow from the RealTimeTrafficApi */privatestaticFlowResponsegetRealTimeTrafficFlow(String geo){BaseClient client =BaseClientJava.instance();RealTimeTrafficApi trafficApi =newRealTimeTrafficApi(client);return trafficApi
.getFlow().withIn(geo).withLocationReferencing(Collections.singletonList("shape")).withAdvancedFeatures(Collections.singletonList("deepCoverage")).build().toEntity();}/** Convert the real-time traffic flow response to GeoJson and splitting them by tiles */privatestaticList<PartitionData>prepareGeoJsonData(List<FlowItem> flowItems)throwsJsonProcessingException{ObjectMapper objectMapper =newObjectMapper();
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);List<PartitionData> customDataList =newArrayList<>();Map<Long,List<Feature>> tileIdToFeatureList =newHashMap<>();// go through the flow items and create features from the data
flowItems.forEach(
flowItem ->{List<List<Double>> coordinates =newArrayList<>();// get coordinates from the location to create the geometry from
flowItem
.getLocation().getShape().orElse(newJLocationShape.Builder().build()).getLinks().stream().flatMap(link -> link.getPoints().stream()).forEach(point -> coordinates.add(Arrays.asList(point.getLng(), point.getLat())));// create lineString geometryLineString lineString =newLineString.Builder().withCoordinates(coordinates).build();// create feature using the lineString geometry and flow properties, such as speed,// jamFactor, etcFeature feature =newFeature.Builder().withId(UUID.randomUUID().toString()).withGeometry(lineString).withProperties(getGeoJsonProperties(flowItem.getCurrentFlow())).build();// find tile ID for the given coordinate and save feature per each tile ID
lineString
.getCoordinates().forEach(
list ->{double latitude = list.get(1);double longitude = list.get(0);GeoCoordinate coordinate =newGeoCoordinate(latitude, longitude);long tileId =newHereTileResolver(newHereTileLevel(15)).fromCoordinate(coordinate);List<Feature> featureList =
tileIdToFeatureList.computeIfAbsent(tileId,(k)->newArrayList<>());
featureList.add(feature);});});// create partition data from the featuresfor(Map.Entry<Long,List<Feature>> entry : tileIdToFeatureList.entrySet()){Long tileId = entry.getKey();FeatureCollection featureCollection =newFeatureCollection.Builder().withFeatures(entry.getValue()).build();PartitionData customData =newPartitionData(Long.toString(tileId), LAYER_ID, objectMapper.writeValueAsBytes(featureCollection));
customDataList.add(customData);}return customDataList;}/** Upload data to the versioned layer using Data Client */publicstaticvoiduploadPartitions(HRN catalog,List<PartitionData> partitions)throwsExecutionException,InterruptedException{ActorSystem actorSystem =DataClientSparkContextUtils.context().actorSystem();// create writeEngine for source catalogWriteEngine writeEngine =DataEngine.get(actorSystem).writeEngine(catalog);// parallelism defines how many parallel requests would be made to fetch the dataint parallelism =10;// create a list partitions to uploadArrayList<PendingPartition> partitionList =newArrayList<>();
partitions.forEach(
partitionData ->{NewPartition newPartition =newNewPartition.Builder().withPartition(partitionData.partition).withData(partitionData.data).withLayer(LAYER_ID).build();
partitionList.add(newPartition);});// upload partitions to the catalog
writeEngine
.publishBatch2(
parallelism,Optional.of(Collections.singletonList(LAYER_ID)),Collections.emptyList(),Source.from(partitionList)).toCompletableFuture().get();}/** Generate the real-time traffic flow properties for the Feature object */privatestaticMap<String,Object>getGeoJsonProperties(CurrentFlow currentFlow){Map<String,Object> properties =newHashMap<>();
properties.put("style",generateFillColor(currentFlow.getJamFactor()));
properties.put("speed", currentFlow.getSpeed().orElse(0.0));
properties.put("speedUncapped", currentFlow.getSpeedUncapped().orElse(0.0));
properties.put("freeFlow", currentFlow.getFreeFlow());
properties.put("jamFactor", currentFlow.getJamFactor());
properties.put("confidence", currentFlow.getConfidence().orElse(0.0));
properties.put("traversability", currentFlow.getTraversability().orElse("empty"));return properties;}/**
* Generate RGB encoded color. For high Jam Factor color have more red component, for low Jam
* Factor - more green component. Each color component can have value between 0 and 255, blue
* component not used.
*/privatestaticMap<String,Object>generateFillColor(double jamFactor){Map<String,Object> style =newHashMap<>();int green;int red;if(jamFactor <=3){
green =255;
red =(int)(255*(jamFactor *10)/100*2);}else{
green =(int)(255*((100-(jamFactor *10))/100));
red =255;}
style.put("color","rgb("+ red +", "+ green +", 0)");
style.put("weight","5");return style;}/** Class to store the metadata and real-time traffic flow data */@SuppressWarnings("serial")staticclassPartitionDataimplementsSerializable{String partition;String layerId;byte[] data;PartitionData(String partition,String layerId,byte[] data){this.partition = partition;this.layerId = layerId;this.data = data;}}}
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/importakka.stream.scaladsl.Source
importcom.fasterxml.jackson.annotation.JsonInclude
importcom.fasterxml.jackson.databind.ObjectMapper
importcom.fasterxml.jackson.module.scala.DefaultScalaModule
importcom.here.hrn.HRN
importcom.here.platform.data.client.base.ols.generated.codecs.traffic.JsonSupport._
importcom.here.platform.data.client.base.ols.generated.scaladsl.api.traffic.RealTimeTrafficApi
importcom.here.platform.data.client.base.ols.generated.scaladsl.model.traffic.{
CurrentFlow,
FlowItem,
FlowResponse
}importcom.here.platform.data.client.base.scaladsl.BaseClient
importcom.here.platform.data.client.engine.scaladsl.DataEngine
importcom.here.platform.data.client.model.geojson.{Feature, FeatureCollection, LineString}importcom.here.platform.data.client.scaladsl.NewPartition
importcom.here.platform.data.client.spark.DataClientSparkContextUtils.context.actorSystem
importcom.here.platform.location.core.geospatial.GeoCoordinate
importcom.here.platform.location.integration.herecommons.geospatial.{
HereTileLevel,
HereTileResolver
}importcom.here.platform.pipeline.PipelineContext
importjava.utilimportscala.collection.convert.ImplicitConversions.`map AsJavaMap`
importscala.collection.mutable.ListBuffer
importscala.concurrent.ExecutionContext.Implicits.global
object RealTimeTrafficApiScala {privateval LAYER_ID ="realtime-traffic"def main(args: Array[String]):Unit={// get real time traffic flowval flowResponse = getRealTimeTrafficFlow("circle:52.51652,13.37885;r=3000")// convert the retrieved data to GeoJson and split them in tilesval partitionData = prepareGeoJsonData(flowResponse.getResults)// publish data to output catalogval pipelineContext =new PipelineContext
uploadPartitions(pipelineContext.config.outputCatalog, partitionData)}/**
* Get real-time traffic flow from the RealTimeTrafficApi
*/def getRealTimeTrafficFlow(str:String): FlowResponse ={val client = BaseClient()val trafficApi = client.of[RealTimeTrafficApi]
trafficApi
.getFlow(
in = str,
locationReferencing = List("shape"),
advancedFeatures = List("deepCoverage")).toEntity()}/**
* Convert the real-time traffic flow response to GeoJson
* and splitting them by tiles
*/def prepareGeoJsonData(flowItems: util.List[FlowItem]): List[PartitionData]={val objectMapper =new ObjectMapper()
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
objectMapper.registerModule(new DefaultScalaModule)val customDataList = ListBuffer[PartitionData]()val tileIdToFeatureList = scala.collection.mutable.Map[Long, ListBuffer[Feature]]()// go through the flow items and create features from the data
flowItems.forEach(item =>{val coordinates = ListBuffer(ListBuffer[Double]())// get coordinates from the location to create the geometry from
item.getLocation.shape.get.links
.flatMap(link => link.points).foreach(point =>{
coordinates += ListBuffer(point.getLng, point.getLat)})// create lineString geometry
coordinates.remove(0)val lineString = LineString(
coordinates = Option(coordinates.map(_.toList).toList))// create feature using the lineString geometry and flow properties, such as speed,// jamFactor, etcval feature = Feature.apply(
id = Option(java.util.UUID.randomUUID.toString),
geometry = Option(lineString),
properties = Option(getGeoJsonProperties(item.getCurrentFlow).toMap))// find tile ID for the given coordinate and save feature per each tile ID
lineString.getCoordinates.forEach(list =>{val latitude = list.get(1)val longitude = list.get(0)val coordinate = GeoCoordinate.apply(latitude, longitude)val tileId =new HereTileResolver(HereTileLevel(15)).fromCoordinate(coordinate).value
val featureList = tileIdToFeatureList.getOrElseUpdate(tileId, ListBuffer())
featureList += feature
})})// create partition data from the features
tileIdToFeatureList
.entrySet().forEach(entry =>{val tileId = entry.getKey
val featureCollection = FeatureCollection.apply(features = entry.getValue.toList)val customData = PartitionData.apply(
tileId.toString,
LAYER_ID,
objectMapper.writeValueAsBytes(featureCollection))
customDataList += customData
})
customDataList.toList
}/**
* Upload data to the versioned layer using Data Client
*/def uploadPartitions(catalog: HRN, partitionsData: Seq[PartitionData]):Unit={// create writeEngine for a catalogval writeEngine = DataEngine().writeEngine(catalog)val partitionList = ListBuffer[NewPartition]()// create a list partitions to upload
partitionsData.foreach(partitionsData =>{val partition = NewPartition(
partition = partitionsData.partition,
layer = LAYER_ID,
data = NewPartition.ByteArrayData(partitionsData.data))
partitionList += partition
})// upload partitions to the catalog
writeEngine.publishBatch2(parallelism =10,
layerIds = Some(Seq(LAYER_ID)),
dependencies = Seq.empty,
partitions = Source.apply(partitionList.toList))}/**
* Generate the real-time traffic flow properties for the Feature object
*/def getGeoJsonProperties(currentFlow: CurrentFlow): scala.collection.mutable.Map[String,Any]={val properties = scala.collection.mutable.Map[String,Any]()
properties +=("style"-> generateFillColor(currentFlow.getJamFactor))
properties +=("speed"-> currentFlow.getSpeed.orElse(0.0))
properties +=("speedUncapped"-> currentFlow.getSpeedUncapped.orElse(0.0))
properties +=("freeFlow"-> currentFlow.getFreeFlow)
properties +=("jamFactor"-> currentFlow.getJamFactor)
properties +=("confidence"-> currentFlow.getConfidence.orElse(0.0))
properties +=("traversability"-> currentFlow.getTraversability.orElse("empty"))
properties
}/**
* Generate RGB encoded color. For high Jam Factor color have more red component, for low Jam
* Factor - more green component. Each color component can have value between 0 and 255, blue
* component not used.
*/privatedef generateFillColor(jamFactor:Double)={val style = scala.collection.mutable.Map[String,Any]()var green =0var red =0if(jamFactor <=3){
green =255
red =(255*(jamFactor *10)/100*2).toInt
}else{
green =(255*((100-(jamFactor *10))/100)).toInt
red =255}val color ="rgb("+ red.toString +", "+ green.toString +", 0)"
style +=("color"-> color)
style +=("weight"->"5")
style
}/**
* Class to store the metadata and real-time traffic flow data
*/caseclass PartitionData(partition:String, layer:String, data: Array[Byte])}
{"id":"realtime-traffic-output","name":"Real time traffic output catalog","summary":"Catalog with real time traffic","description":"Catalog with real time traffic","layers":[{"id":"realtime-traffic","name":"realtime-traffic","summary":"RealTimeTraffic","description":"RealTimeTraffic","contentType":"application/geo+json","layerType":"versioned","volume":{"volumeType":"durable"},"partitioning":{"scheme":"heretile","tileLevels":[15]}}]}
プラットフォームに展開するには、アプリケーションに Fat JAR のすべての依存関係をパッケージ化する必要があります。 java/Scala SDK は platform 、ファット JAR を生成するプロファイルを提供します。 次のコマンドは、プロファイルを使用して Fat JAR を生成します。
mvn -Pplatform clean package
real-time-traffic-tutorial-<tutorial-version>-platform.jar Fat JAR は ターゲット フォルダに作成する必要があります。