HERE platform サービスを使用します
目的: Spark アプリケーションの HERE Traffic API クライアントを使用して、マップ上のリアルタイムのトラフィックフローを取得して表示します
複雑さ: 初心者向け
所要時間: 30 分
前提条件: 資格情報を取得し、資格情報を確認します
ソースコード: ダウンロード
このチュートリアルでは 、 Spark アプリケーションで HERE Traffic API クライアントを使用する方法について説明します。 HERE Traffic API クライアント はデータ クライアント ベース ライブラリの一部です。 データ クライアント ベース ライブラリには、プロジェクトで使用して HERE platform にアクセスできる Scala/Java ライブラリが用意されています。 HERE platform データ API や 、Geocode API
、 Routing API
、RevGeocode API
HERE Traffic API
などの HERE platform サービスへの、低レベルでステートレスかつスレッドセーフなプログラムによるアクセスを提供 します。 詳細については 、データ クライアント ベース ライブラリのマニュアルを参照してください。
HERE Traffic API は、次のような RESTful API です。
- JSON 形式のリアルタイムのトラフィックフローデータへのアクセスを提供します。このデータには、各リクエストで定義されているリージョンの速度およびジャムファクタに関する情報が含まれます。 Traffic API v7 は、フローに関連する道路セグメントのジオメトリなどの追加データも提供できます。
- 各交通事故のタイプと場所、ステータス、開始時間と終了時間、およびその他の関連データを含む、 JSON 形式の交通事故に関する集約情報を提供します。 このデータは、ルート計算を動的に最適化するのに役立ちます。
このチュートリアルでは、リアルタイムのトラフィックフローのスナップショットを取得し、バージョン付レイヤーにデータをアップロードして、地図上に結果を表示します。 軽負荷の道路は、緑の線で示され、中負荷がかかります ( 黄色の線が表示され、高負荷の道路は赤い線が表示されます ) 。
このチュートリアルでは、次のトピックについて説明します。
- Maven プロジェクトを設定します
- Traffic API クライアントを使用します
- アプリケーションを実装します
- アプリケーションをローカルで実行します
- プラットフォームでアプリケーションを実行します
Maven プロジェクトを設定します
リアルタイムの交通状況を取得するには、チュートリアルの冒頭でソースコードをダウンロードして任意のフォルダーに保存するか、次のようにプロジェクトのフォルダー構造を最初から作成します。
here-realtime-traffic-client
└── src
└── main
├── java
└── resources
└── scala
この操作は、次の bash
1 つのコマンドで実行できます。
mkdir -p here-realtime-traffic/src/main/{java,resources,scala}
Maven POM ファイルは Maven 設定の確認 チュートリアルのファイルと似ていますが、親の POM および依存関係のセクションが更新されています。
親 POM sdk-batch-bom_2.12
は、このチュートリアルがローカル環境とプラットフォーム環境の両方で Spark アプリケーションを実行するように設計されているためです。
<parent>
<groupId>com.here.platform</groupId>
<artifactId>sdk-batch-bom_2.12</artifactId>
<version>2.54.3</version>
<relativePath/>
</parent>
次の依存関係が使用されます。
-
com.here.platform.data.client.base:ols-traffic-v7_${scala.compat.version}
HERE Traffic API クライアントを使用します。 -
com.here.platform.data.client:local-support_${scala.compat.version}
ローカルカタログにデータを書き込みます。 -
com.here.platform.data.client:spark-support_${scala.compat.version}
プラットフォームのカタログにデータを書き込むには、次の手順に従います。 -
org.apache.spark:spark-core_${scala.compat.version}
Java/Scala Spark アプリケーションを実行します。 -
com.here.platform.pipeline:pipeline-interface_${scala.compat.version}
から入力カタログに関する情報を取得PipelineContext
します。 -
com.here.platform.location:location-integration-here-commons_${scala.compat.version}
特定のジオコードの パーティション タイル ID を計算するためのツールが含まれています。 詳細について は、「 パーティション タイル ID の計算」を参照してください。 -
com.here.hrn:hrn_${scala.compat.version}
カタログ HRNS を解析します。
依存関係 :
<dependencies>
<dependency>
<groupId>com.here.platform.data.client.base</groupId>
<artifactId>ols-traffic-v7_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.data.client</groupId>
<artifactId>spark-support_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.pipeline</groupId>
<artifactId>pipeline-interface_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.data.client</groupId>
<artifactId>local-support_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.location</groupId>
<artifactId>location-integration-here-commons_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.hrn</groupId>
<artifactId>hrn_${scala.compat.version}</artifactId>
</dependency>
</dependencies>
Maven プロジェクトを作成したら、次のステップでアプリケーションのコードを記述して実行します。
Traffic API クライアントを使用します
前に説明したように、このチュートリアルでは、 HERE Traffic API からデータを取得してマップ上のデータを表示する簡単な Spark アプリケーションの作成方法を示します。 取得したすべてのデータがバージョン付レイヤーに追加 され、ローカルデータ インスペクターで表示されます。 詳細について は、「 CLI を使用したローカルでの開発とテスト」を参照してください。
HERE Real-Time Traffic は、コネクテッド カー プローブ、道路センサー、ライブ オペレーション センターなど、複数のソースからの高精度のデータで構成されています。
リアルタイムトラフィックには、次の 2 種類のトラフィックデータが含まれています。
- フロー - 道路のセグメントに沿った走行速度および渋滞についての情報。 フローデータは毎分更新されます。
- インシデント - トラフィックのフローに影響を与えるイベント、またはドライバーが把握することが重要となる可能性のあるイベントに関する情報。 インシデントデータは 2 分ごとに更新されます。
リアルタイムの交通量は世界 70 か国以上に及びます。 カバレッジの詳細について は、「トラフィックカバレッジ」を参照してください。
HERE Traffic API は、次のエンドポイントを提供します。
-
Real-Time Flow Information
- 地理空間エリアのリアルタイムのトラフィックフローデータを取得します。 -
Real-Time Incident Information
- 地理空間エリアの交通事故を取得します。 -
Real-Time Incident Information for the given ID
- 特定の ID の交通事故を取得します。
HERE Traffic API エンドポイントの詳細について は、『 Traffic API リファレンス 』を参照してください。
Real-Time Flow Information
エンドポイントを使用して、地理空間エリアのリアルタイムのトラフィックフローデータを取得します。
基本的なフロー リクエストは、地理空間フィルター (in
) と返される位置参照のタイプ (locationReferencing
) で構成されます。
レスポンスには、リクエストされたエリア内にある道路セグメントのトラフィックフロー情報が含まれます。
地理空間フィルタ in
は circle
、bounding box
、corridor
、またはのいずれかで、 それぞれ独自の形式を持つことができます。 以下のアプリケーションでは latitude
longitude
、およびradius
のポイントによって指定されたcircle
を使用します 。in=circle:52.51652,13.37885;r=3000
。 このパラメータでは、ブランデンブルクゲートの中心と半径 3km の円を定義します 地理空間のフィルタリングタイプの詳細について は、「地理空間のフィルタリング」を参照してください。
に locationReferencing
は tmc
、olr
、shape
、の 1 つ以上 を指定できます。 位置参照は位置を記述します。 位置は、地球表面の特定の点、曲線、または 2 次元の形状にすることができます。 場所が用途で使用される場合、道路、建物、山、水域などの特定の人為的または地理的な特徴を指します。 このチュートリアルで shape
は、位置参照を使用します。 これにより、地図上のデータを簡単に視覚化できます。 型を参照する場所の詳細について は、「場所の参照」を参照してください。
このエンドポイントへのリクエストは、フローアイテムの配列を返し ます。各フローアイテムには、location
要素とcurrentFlow
要素が 1 つずつ含まれています。 各場所に はメートル単位のlength
があり description
、通常は通りの名前です。また、locationReferencing
パラメーターによって要求された場所に応じて、場所の利用可能な場所参照があります。 この場合 latitudes
は、 AND です longitudes
。 各フローアイテムのcurrentFlow
要素に は、フローアイテムのspeed
、jam factor
、およびtraversability
の情報が含まれています。
以下のコード スニペットでは、上記のパラメータを使用してリアルタイムのフロー情報を取得する方法を示します。
FlowResponse response = trafficApi
.getFlow()
.withIn("circle:52.51652,13.37885;r=3000")
.withLocationReferencing(Collections.singletonList("shape"))
.withAdvancedFeatures(Collections.singletonList("deepCoverage"))
.build()
.toEntity();
val response = trafficApi
.getFlow(in = "circle:52.51652,13.37885;r=3000",
locationReferencing = List("shape"),
advancedFeatures = List("deepCoverage")
)
.toEntity()
API クライアントについて理解できたら、アプリケーションのレビューに進むことができます。
アプリケーションを実装します
この Spark アプリケーションの実装について見てみましょう。 以下のコード スニペットでは、主なメソッドロジックが次の 3 つの方法で実装されていることがわかります。
getRealTimeTrafficFlow()
prepareGeoJsonData()
uploadPartitions()
これらのメソッドの実装について詳しく見ていきましょう。
getRealTimeTrafficFlow()
この方式は、リアルタイムのトラフィックフローを取得するアプリケーションの主要な部分です。 上記の他のすべての後続のメソッドは、このメソッドから受信したデータを処理します。 処理を続行せ com.here.platform.data.client.base:ols-traffic-v7_${scala.compat.version}
ずにリアルタイムのトラフィックフローのみを取得する場合は、上記の依存関係を追加して、このメソッドをアプリケーションに追加するだけで済みます。
getRealTimeTrafficFlow()
このメソッドは、 HERE Traffic API client
この項の冒頭で説明したパラメータを使用して、を作成し、リアルタイムのトラフィックフローを取得します。 に HERE Traffic API
は認証が必要です。 デフォルト $HOME/.here/credentials.properties
では、に保存されている資格情報が使用されます。 詳細については 、データ クライアント ベース ライブラリのドキュメントを参照してください。
このprepareGeoJsonData()
メソッド は、HereTileResolver
を使用してトラフィックデータをパーティションに分割し(詳細については、「 パーティション タイル ID の計算」を参照)、各タイルごとにデータを GeoJSON に変換します。 jamFactor
プロパティによって、データの表示スタイルが異なります。 軽負荷の道路は、緑の線で示されます。中負荷の道路は黄色の線で示され、高負荷の道路は赤色の線で示されます。 詳細について は、「 GeoJSON データとスタイル GeoJSON の視覚化」を参照してください。 このメソッドでは 、 GeoJSON を表すクラスLineString
、Geometry
、Feature
、FeatureCollection
を使用します。 詳細については 、データ クライアント ライブラリのドキュメントを参照してください。
このuploadPartitions()
メソッド は、データ クライアント ライブラリを使用して GeoJSON データをバージョン管理されたカタログに送信します。
アプリケーションの動作の詳細については、次のコードのコメントを参照してください。
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import akka.actor.ActorSystem;
import akka.stream.javadsl.Source;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.here.hrn.HRN;
import com.here.platform.data.client.base.javadsl.BaseClient;
import com.here.platform.data.client.base.javadsl.BaseClientJava;
import com.here.platform.data.client.base.ols.generated.javadsl.api.traffic.RealTimeTrafficApi;
import com.here.platform.data.client.base.ols.generated.scaladsl.model.traffic.CurrentFlow;
import com.here.platform.data.client.base.ols.generated.scaladsl.model.traffic.FlowItem;
import com.here.platform.data.client.base.ols.generated.scaladsl.model.traffic.FlowResponse;
import com.here.platform.data.client.base.ols.generated.scaladsl.model.traffic.JLocationShape;
import com.here.platform.data.client.engine.javadsl.DataEngine;
import com.here.platform.data.client.engine.javadsl.WriteEngine;
import com.here.platform.data.client.javadsl.NewPartition;
import com.here.platform.data.client.model.PendingPartition;
import com.here.platform.data.client.model.geojson.Feature;
import com.here.platform.data.client.model.geojson.FeatureCollection;
import com.here.platform.data.client.model.geojson.LineString;
import com.here.platform.data.client.spark.DataClientSparkContextUtils;
import com.here.platform.location.core.geospatial.GeoCoordinate;
import com.here.platform.location.integration.herecommons.geospatial.HereTileLevel;
import com.here.platform.location.integration.herecommons.geospatial.javadsl.HereTileResolver;
import com.here.platform.pipeline.PipelineContext;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.ExecutionException;
public class RealTimeTrafficApiJava {
private static final String LAYER_ID = "realtime-traffic";
public static void main(String[] args)
throws JsonProcessingException, ExecutionException, InterruptedException {
// get real time traffic flow
FlowResponse flowResponse = getRealTimeTrafficFlow("circle:52.51652,13.37885;r=3000");
// convert the retrieved data to GeoJson and split them in tiles
List<PartitionData> partitionData = prepareGeoJsonData(flowResponse.getResults());
// publish data to output catalog
PipelineContext pipelineContext = new PipelineContext();
uploadPartitions(pipelineContext.getConfig().getOutputCatalog(), partitionData);
}
/** Get real-time traffic flow from the RealTimeTrafficApi */
private static FlowResponse getRealTimeTrafficFlow(String geo) {
BaseClient client = BaseClientJava.instance();
RealTimeTrafficApi trafficApi = new RealTimeTrafficApi(client);
return trafficApi
.getFlow()
.withIn(geo)
.withLocationReferencing(Collections.singletonList("shape"))
.withAdvancedFeatures(Collections.singletonList("deepCoverage"))
.build()
.toEntity();
}
/** Convert the real-time traffic flow response to GeoJson and splitting them by tiles */
private static List<PartitionData> prepareGeoJsonData(List<FlowItem> flowItems)
throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
List<PartitionData> customDataList = new ArrayList<>();
Map<Long, List<Feature>> tileIdToFeatureList = new HashMap<>();
// go through the flow items and create features from the data
flowItems.forEach(
flowItem -> {
List<List<Double>> coordinates = new ArrayList<>();
// get coordinates from the location to create the geometry from
flowItem
.getLocation()
.getShape()
.orElse(new JLocationShape.Builder().build())
.getLinks()
.stream()
.flatMap(link -> link.getPoints().stream())
.forEach(point -> coordinates.add(Arrays.asList(point.getLng(), point.getLat())));
// create lineString geometry
LineString lineString = new LineString.Builder().withCoordinates(coordinates).build();
// create feature using the lineString geometry and flow properties, such as speed,
// jamFactor, etc
Feature feature =
new Feature.Builder()
.withId(UUID.randomUUID().toString())
.withGeometry(lineString)
.withProperties(getGeoJsonProperties(flowItem.getCurrentFlow()))
.build();
// find tile ID for the given coordinate and save feature per each tile ID
lineString
.getCoordinates()
.forEach(
list -> {
double latitude = list.get(1);
double longitude = list.get(0);
GeoCoordinate coordinate = new GeoCoordinate(latitude, longitude);
long tileId =
new HereTileResolver(new HereTileLevel(15)).fromCoordinate(coordinate);
List<Feature> featureList =
tileIdToFeatureList.computeIfAbsent(tileId, (k) -> new ArrayList<>());
featureList.add(feature);
});
});
// create partition data from the features
for (Map.Entry<Long, List<Feature>> entry : tileIdToFeatureList.entrySet()) {
Long tileId = entry.getKey();
FeatureCollection featureCollection =
new FeatureCollection.Builder().withFeatures(entry.getValue()).build();
PartitionData customData =
new PartitionData(
Long.toString(tileId), LAYER_ID, objectMapper.writeValueAsBytes(featureCollection));
customDataList.add(customData);
}
return customDataList;
}
/** Upload data to the versioned layer using Data Client */
public static void uploadPartitions(HRN catalog, List<PartitionData> partitions)
throws ExecutionException, InterruptedException {
ActorSystem actorSystem = DataClientSparkContextUtils.context().actorSystem();
// create writeEngine for source catalog
WriteEngine writeEngine = DataEngine.get(actorSystem).writeEngine(catalog);
// parallelism defines how many parallel requests would be made to fetch the data
int parallelism = 10;
// create a list partitions to upload
ArrayList<PendingPartition> partitionList = new ArrayList<>();
partitions.forEach(
partitionData -> {
NewPartition newPartition =
new NewPartition.Builder()
.withPartition(partitionData.partition)
.withData(partitionData.data)
.withLayer(LAYER_ID)
.build();
partitionList.add(newPartition);
});
// upload partitions to the catalog
writeEngine
.publishBatch2(
parallelism,
Optional.of(Collections.singletonList(LAYER_ID)),
Collections.emptyList(),
Source.from(partitionList))
.toCompletableFuture()
.get();
}
/** Generate the real-time traffic flow properties for the Feature object */
private static Map<String, Object> getGeoJsonProperties(CurrentFlow currentFlow) {
Map<String, Object> properties = new HashMap<>();
properties.put("style", generateFillColor(currentFlow.getJamFactor()));
properties.put("speed", currentFlow.getSpeed().orElse(0.0));
properties.put("speedUncapped", currentFlow.getSpeedUncapped().orElse(0.0));
properties.put("freeFlow", currentFlow.getFreeFlow());
properties.put("jamFactor", currentFlow.getJamFactor());
properties.put("confidence", currentFlow.getConfidence().orElse(0.0));
properties.put("traversability", currentFlow.getTraversability().orElse("empty"));
return properties;
}
/**
* Generate RGB encoded color. For high Jam Factor color have more red component, for low Jam
* Factor - more green component. Each color component can have value between 0 and 255, blue
* component not used.
*/
private static Map<String, Object> generateFillColor(double jamFactor) {
Map<String, Object> style = new HashMap<>();
int green;
int red;
if (jamFactor <= 3) {
green = 255;
red = (int) (255 * (jamFactor * 10) / 100 * 2);
} else {
green = (int) (255 * ((100 - (jamFactor * 10)) / 100));
red = 255;
}
style.put("color", "rgb(" + red + ", " + green + ", 0)");
style.put("weight", "5");
return style;
}
/** Class to store the metadata and real-time traffic flow data */
@SuppressWarnings("serial")
static class PartitionData implements Serializable {
String partition;
String layerId;
byte[] data;
PartitionData(String partition, String layerId, byte[] data) {
this.partition = partition;
this.layerId = layerId;
this.data = data;
}
}
}
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import akka.stream.scaladsl.Source
import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.here.hrn.HRN
import com.here.platform.data.client.base.ols.generated.codecs.traffic.JsonSupport._
import com.here.platform.data.client.base.ols.generated.scaladsl.api.traffic.RealTimeTrafficApi
import com.here.platform.data.client.base.ols.generated.scaladsl.model.traffic.{
CurrentFlow,
FlowItem,
FlowResponse
}
import com.here.platform.data.client.base.scaladsl.BaseClient
import com.here.platform.data.client.engine.scaladsl.DataEngine
import com.here.platform.data.client.model.geojson.{Feature, FeatureCollection, LineString}
import com.here.platform.data.client.scaladsl.NewPartition
import com.here.platform.data.client.spark.DataClientSparkContextUtils.context.actorSystem
import com.here.platform.location.core.geospatial.GeoCoordinate
import com.here.platform.location.integration.herecommons.geospatial.{
HereTileLevel,
HereTileResolver
}
import com.here.platform.pipeline.PipelineContext
import java.util
import scala.collection.convert.ImplicitConversions.`map AsJavaMap`
import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionContext.Implicits.global
object RealTimeTrafficApiScala {
private val LAYER_ID = "realtime-traffic"
def main(args: Array[String]): Unit = {
// get real time traffic flow
val flowResponse = getRealTimeTrafficFlow("circle:52.51652,13.37885;r=3000")
// convert the retrieved data to GeoJson and split them in tiles
val partitionData = prepareGeoJsonData(flowResponse.getResults)
// publish data to output catalog
val pipelineContext = new PipelineContext
uploadPartitions(pipelineContext.config.outputCatalog, partitionData)
}
/**
* Get real-time traffic flow from the RealTimeTrafficApi
*/
def getRealTimeTrafficFlow(str: String): FlowResponse = {
val client = BaseClient()
val trafficApi = client.of[RealTimeTrafficApi]
trafficApi
.getFlow(
in = str,
locationReferencing = List("shape"),
advancedFeatures = List("deepCoverage")
)
.toEntity()
}
/**
* Convert the real-time traffic flow response to GeoJson
* and splitting them by tiles
*/
def prepareGeoJsonData(flowItems: util.List[FlowItem]): List[PartitionData] = {
val objectMapper = new ObjectMapper()
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
objectMapper.registerModule(new DefaultScalaModule)
val customDataList = ListBuffer[PartitionData]()
val tileIdToFeatureList = scala.collection.mutable.Map[Long, ListBuffer[Feature]]()
// go through the flow items and create features from the data
flowItems.forEach(item => {
val coordinates = ListBuffer(ListBuffer[Double]())
// get coordinates from the location to create the geometry from
item.getLocation.shape.get.links
.flatMap(link => link.points)
.foreach(point => {
coordinates += ListBuffer(point.getLng, point.getLat)
})
// create lineString geometry
coordinates.remove(0)
val lineString = LineString(
coordinates = Option(coordinates.map(_.toList).toList)
)
// create feature using the lineString geometry and flow properties, such as speed,
// jamFactor, etc
val feature = Feature.apply(
id = Option(java.util.UUID.randomUUID.toString),
geometry = Option(lineString),
properties = Option(getGeoJsonProperties(item.getCurrentFlow).toMap)
)
// find tile ID for the given coordinate and save feature per each tile ID
lineString.getCoordinates.forEach(list => {
val latitude = list.get(1)
val longitude = list.get(0)
val coordinate = GeoCoordinate.apply(latitude, longitude)
val tileId = new HereTileResolver(HereTileLevel(15)).fromCoordinate(coordinate).value
val featureList = tileIdToFeatureList.getOrElseUpdate(tileId, ListBuffer())
featureList += feature
})
})
// create partition data from the features
tileIdToFeatureList
.entrySet()
.forEach(entry => {
val tileId = entry.getKey
val featureCollection = FeatureCollection.apply(features = entry.getValue.toList)
val customData = PartitionData.apply(
tileId.toString,
LAYER_ID,
objectMapper.writeValueAsBytes(featureCollection)
)
customDataList += customData
})
customDataList.toList
}
/**
* Upload data to the versioned layer using Data Client
*/
def uploadPartitions(catalog: HRN, partitionsData: Seq[PartitionData]): Unit = {
// create writeEngine for a catalog
val writeEngine = DataEngine().writeEngine(catalog)
val partitionList = ListBuffer[NewPartition]()
// create a list partitions to upload
partitionsData.foreach(partitionsData => {
val partition = NewPartition(
partition = partitionsData.partition,
layer = LAYER_ID,
data = NewPartition.ByteArrayData(partitionsData.data)
)
partitionList += partition
})
// upload partitions to the catalog
writeEngine.publishBatch2(parallelism = 10,
layerIds = Some(Seq(LAYER_ID)),
dependencies = Seq.empty,
partitions = Source.apply(partitionList.toList))
}
/**
* Generate the real-time traffic flow properties for the Feature object
*/
def getGeoJsonProperties(currentFlow: CurrentFlow): scala.collection.mutable.Map[String, Any] = {
val properties = scala.collection.mutable.Map[String, Any]()
properties += ("style" -> generateFillColor(currentFlow.getJamFactor))
properties += ("speed" -> currentFlow.getSpeed.orElse(0.0))
properties += ("speedUncapped" -> currentFlow.getSpeedUncapped.orElse(0.0))
properties += ("freeFlow" -> currentFlow.getFreeFlow)
properties += ("jamFactor" -> currentFlow.getJamFactor)
properties += ("confidence" -> currentFlow.getConfidence.orElse(0.0))
properties += ("traversability" -> currentFlow.getTraversability.orElse("empty"))
properties
}
/**
* Generate RGB encoded color. For high Jam Factor color have more red component, for low Jam
* Factor - more green component. Each color component can have value between 0 and 255, blue
* component not used.
*/
private def generateFillColor(jamFactor: Double) = {
val style = scala.collection.mutable.Map[String, Any]()
var green = 0
var red = 0
if (jamFactor <= 3) {
green = 255
red = (255 * (jamFactor * 10) / 100 * 2).toInt
} else {
green = (255 * ((100 - (jamFactor * 10)) / 100)).toInt
red = 255
}
val color = "rgb(" + red.toString + ", " + green.toString + ", 0)"
style += ("color" -> color)
style += ("weight" -> "5")
style
}
/**
* Class to store the metadata and real-time traffic flow data
*/
case class PartitionData(partition: String, layer: String, data: Array[Byte])
}
コードが完了すると、リソースを準備してアプリケーションを実行できます。
アプリケーションをローカルで実行します
アプリケーションを実行するには、リソースを準備する必要があります。つまり、バージョン付レイヤーを使用してカタログを作成します。
このチュートリアルでは、アプリケーションをローカルで実行します。したがって、ローカルカタログを作成するだけで十分です。 これらの名前はローカルマシンに含まれているため、レルム内での命名の競合の影響を受けません。また、任意の名前を使用できます。
チュートリアルでは入力カタログを使用しませ んが、設定ファイルのinput-catalog
フィールドに入力するためにを作成する必要があります。作成しないと、無効なカタログ HERE リソースネーム についてエラーが発生します。
チュートリアルフォルダーのルートから次の OLP CLI コマンドを実行して、ローカル入力カタログを作成します。
olp local catalog create batch-catalog batch-catalog --summary "Input catalog" --description "Input catalog"
チュートリアルフォルダーのルートから次の OLP CLI コマンドを実行して、バージョン付レイヤーを使用してローカル出力カタログを作成します。
olp local catalog create output-batch-catalog output-batch-catalog --config output-catalog-configuration.json
output-catalog-configuration.json
ファイルの構造は次のとおりです。
{
"id": "realtime-traffic-output",
"name": "Real time traffic output catalog",
"summary": "Catalog with real time traffic",
"description": "Catalog with real time traffic",
"layers": [
{
"id": "realtime-traffic",
"name": "realtime-traffic",
"summary": "RealTimeTraffic",
"description": "RealTimeTraffic",
"contentType": "application/geo+json",
"layerType": "versioned",
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "heretile",
"tileLevels": [15]
}
}
]
}
バージョン付レイヤーのパラメーターを詳しく見てみましょう。 プロパティ "contentType": "application/geo+json"
は、レイヤーに GeoJSON 形式のデータが含まれていることを示します。 プロパティ : scheme:heretile
データが 4 つのツリーとして保存されることを意味します。 データは 15
ズームレベルで保存されます。 ローカルデータ インスペクターでは、追加のレンダリングプラグインなしで、このタイリングスキーマおよび GeoJSON を表示できます。 カタログを作成した後、次のコマンドを使用して、ダウンロードしたチュートリアルのルートからアプリケーションを実行します。
mvn compile exec:java -q -D"exec.mainClass"="RealTimeTrafficApiJava" \
-Dhere.platform.data-client.endpoint-locator.discovery-service-env=local \
-Dspark.master=local[*] \
-Dpipeline-config.file=local-pipeline-config.conf
mvn compile exec:java -q -D"exec.mainClass"="RealTimeTrafficApiScala" \
-Dhere.platform.data-client.endpoint-locator.discovery-service-env=local \
-Dspark.master=local[*] \
-Dpipeline-config.file=local-pipeline-config.conf
このコマンドには、次のパラメータがあります。
-
exec.mainClass
–アプリケーションを実行するためのエントリポイント。 -
here.platform.data-client.endpoint-locator.discovery-service-env=local
–ローカルカタログのみを使用するようにデータ クライアント ライブラリ を設定します。 -
spark.master=local[*]
–マシン上に論理コアがあるのと同じ数のワーカースレッドで実行されるローカル Spark を構成します。 -
pipeline-config.file=local-pipeline-config.conf
- 入力および出力カタログに関する情報を含む設定ファイル。
アプリケーションが正常に終了すると、コンソールでバージョン付レイヤー に追加されたデータを確認できます。
GeoJSON データがカタログにパブリッシュされたら、次のコマンドを使用して結果を表示します。
olp local catalog layer inspect hrn:local:data:::output-batch-catalog realtime-traffic
このコマンドを実行すると、リアルタイムのトラフィックフローを示すブラウザタブが開きます。
このアプリケーションをプラットフォームで実行できるようになりました。
プラットフォームでアプリケーションを実行します
プラットフォームでアプリケーションを実行する前に、そのアプリケーションのすべてのリソースを準備する必要があります。
プロジェクトでアプリケーションを実行します。 これを行うには、 OLP CLI を使用してプロジェクトを作成します。
olp project create $PROJECT_ID $PROJECT_NAME
OLP CLI から次のメッセージが返されます。
Project YOUR_PROJECT_HRN has been created.
このYOUR_PROJECT_HRN
値を $PROJECT_HRN
console 変数に保存すると、コマンドの実行が簡単になります。
HERE Traffic サービスを使用するので、 HERE Traffic クライアントがサービスの基本 URL を解決できるように、このサービスをプロジェクトにリンクする必要があります。 これを行うには、次のコマンドを使用して、サービスをプロジェクトにリンクします。
olp project resource link $PROJECT_HRN hrn:here:service::olp-here:traffic-api-7
次のステップでは、前に作成したプロジェクトに入力および出力カタログを作成します。 --scope
このパラメーターは、 OLP CLI でプロジェクトを指定するために使用されます。
入力カタログは当社のアプリケーションでは使用されていません。 このカタログでは、pipeline-config.conf
ファイルのinput-catalog
プロパティを指定する必要があります。
チュートリアルフォルダーのルートから次のコマンドを実行して、入力カタログを作成します。
olp catalog create $CATALOG_ID $CATALOG_NAME --summary "Input catalog" --description "Input catalog" --scope $PROJECT_HRN
OLP CLI から次のメッセージが返されます。
Catalog YOUR_INPUT_CATALOG_HRN has been created.
ファイルを使用して出力カタログを作成する必要 output-catalog-configuration.json
があります。 チュートリアルフォルダーのルートから次のコマンドを実行して、出力カタログを作成します。
olp catalog create $CATALOG_ID $CATALOG_NAME --config output-catalog-configuration.json --scope $PROJECT_HRN
次の手順では、前に作成した出力カタログを使用するようにソースコードを設定します。 pipeline-config.conf
ファイルのINPUT_CATALOG_HRN
およびOUTPUT_CATALOG_HRN
プレースホルダーを、上記で作成したカタログの HRNS に置き換えます。
プラットフォームに展開するには、アプリケーションに Fat JAR のすべての依存関係をパッケージ化する必要があります。 java/Scala SDK は platform
、ファット JAR を生成するプロファイルを提供します。 次のコマンドは、プロファイルを使用して Fat JAR を生成します。
mvn -Pplatform clean package
real-time-traffic-tutorial-<tutorial-version>-platform.jar
Fat JAR は ターゲット フォルダに作成する必要があります。
これで、アプリケーションで使用されているすべてのリソースと、それらを使用するように設定されたアプリケーションが、プラットフォームに展開できるようになりました。
次の OLP CLI コマンドを使用して、プロジェクト範囲にパイプライン を作成してみましょう。
olp pipeline create $PIPELINE_ID --scope $PROJECT_HRN
OLP CLI から次のメッセージが返されます。
Pipeline YOUR_PIPELINE_ID has been created.
次のステップでは、パイプライン テンプレートを作成します。
独自のパイプライン テンプレート を作成するには、テンプレート 名 batch
ランタイム環境 を指定する必要があります。このチュートリアルでは、 Spark アプリケーションを実行し、 batch
プラットフォーム 上の環境、 mvn package -Pplatform
コマンドで作成されたファット JAR 、メインクラス、パイプライン が属するプロジェクトを使用します。 および入力カタログ ID がパイプライン バージョンの設定で必要です。
チュートリアルフォルダーのルートから次の OLP CLI コマンドを実行して、パイプライン テンプレート を作成しましょう。
olp pipeline template create traffic-template \
batch-3.0 target/real-time-traffic-tutorial-<tutorial-version>-platform.jar \
RealTimeTrafficApiJava --input-catalog-ids pipeline-config.conf \
--scope $PROJECT_HRN
olp pipeline template create traffic-template \
batch-3.0 target/real-time-traffic-tutorial-<tutorial-version>-platform.jar \
RealTimeTrafficApiScala --input-catalog-ids pipeline-config.conf \
--scope $PROJECT_HRN
次のステップでは、パイプラインバージョンを作成します。
プロジェクト範囲にパイプラインバージョンを作成しましょう。
olp pipeline version create test-spark-version $PIPELINE_ID $PIPELINE_TEMPLATE_ID pipeline-config.conf --scope $PROJECT_HRN
注
レルムで請求タグが必要な場合 --billing-tag <your-billing-tag>
は、パラメータを使用します。
バッチ パイプライン 、パイプライン テンプレート 、およびパイプライン バージョンを作成したら、パイプライン バージョンをアクティブ化することで、プラットフォーム でアプリケーションを実行できます。
パイプライン バージョンをアクティブ化するには、次の OLP CLI コマンドを実行します。
olp pipeline version activate $PIPELINE_ID $PIPELINE_VERSION_ID --scope $PROJECT_HRN
パイプラインがアクティブ化された後 Completed
、パイプラインが状態に達するまで待機する必要があります。 このためには、次の OLP CLI コマンドを実行します。
olp pipeline version wait $PIPELINE_ID $PIPELINE_VERSION_ID \
--job-state completed --timeout 600 \
--scope $PROJECT_HRN
パイプライン Completed
が状態になったら、アプリケーションの結果を確認できます。
このためには、次の OLP CLI コマンドを実行します。
olp catalog layer inspect $OUTPUT_CATALOG_HRN realtime-traffic
このコマンドを実行すると、リアルタイムのトラフィックフローを示すブラウザタブが開きます。
結論
このチュートリアルでは、 Java/Scala Spark アプリケーションの HERE Traffic API クライアントを使用してリアルタイムのトラフィックフローのスナップショットを取得し、マップに表示する方法を学習しました。
詳細情報
このチュートリアルで扱うトピックの詳細については、次のソースを参照してください。
- HERE Traffic API の詳細については 、『 HERE Traffic API v7 開発者ガイド』を参照してください
- HERE Traffic API エンドポイントの詳細については 、『 HERE Traffic API リファレンス 』を参照してください
- プロジェクトでのデータクライアントベースの使用方法の詳細 については、データ クライアント ベース ライブラリのドキュメントを参照してください。
- Spark アプリケーションの開発方法の詳細については、『Apache Spark プログラミングガイド』を参照してください。
- OLP CLI コマンドの詳細 については、コマンド ライン インターフェースのドキュメントを参照してください。
- ローカルデータ インスペクターの詳細について は、のマニュアルを参照してください。
- Spark と HERE platform データを接続するユーティリティの詳細について は、データ クライアント ライブラリのドキュメントを参照してください。