センサーデータと Spark の GeoJSON が一致するパス

目的: 位置ライブラリを使用して、スタンドアロン Spark のセンサーデータをパスマッチングします。

複雑さ: 中級者です

所要時間: 45 分

前提条件: プロジェクトでの作業を整理します

ソースコード: ダウンロード

この例では、次の操作を実行する方法を示します。

入力カタログには、 GPS デバイスから記録されたパス座標が含まれています。

入力
図 1. 入力

地図の一致が可能な場合、出力カタログには HERE Road Network と一致するものと同じ旅程が含まれています。

出力
図 2. 出力

Maven プロジェクトを設定します

プロジェクトの次のフォルダー構造を作成します。

batch-path-matching
└── src
    └── main
        ├── java
        └── scala

この操作は、次の bash 1 つのコマンドで実行できます。

mkdir -p batch-path-matching/src/main/{java,scala}

Maven POM ファイルは 、 Maven 設定の確認 の例で親 POM と依存関係のセクションが更新されたのと似ています。

親 POM :

<parent>
    <groupId>com.here.platform</groupId>
    <artifactId>sdk-batch-bom_2.12</artifactId>
    <version>2.54.3</version>
    <relativePath/>
</parent>

依存関係 :

<dependencies>
    <dependency>
        <groupId>com.here.platform.pipeline</groupId>
        <artifactId>pipeline-interface_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.platform.location</groupId>
        <artifactId>location-integration-optimized-map-dcl2_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.platform.data.client</groupId>
        <artifactId>spark-support_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.hrn</groupId>
        <artifactId>hrn_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
    </dependency>
</dependencies>

トークンスコープを含めるように資格情報を変更します

HERE トークン スコープを credentials.properties ファイルに追加して、作成したプロジェクト内で以降のすべてのコマンドが実行されるようにします。

olp credentials update default --scope {{YOUR_PROJECT_HRN}}

チュートリアルを完了するときに、here.token.scopeが含まれている行をcredentials.propertiesファイルから削除してください。この操作は、 OLP CLI を使用して行うことができます。

olp credentials update default --scope empty

出力カタログを作成します

この設定ファイルを使用して、一致する旅程を含む出力カタログを作成し、 {{YOUR_CATALOG_ID}}{{YOUR_USERNAME}} をご自身のものと置き換えます。

{
  "id": "{{YOUR_CATALOG_ID}}",
  "name": "{{YOUR_USERNAME}} Path Matching Tutorial",
  "summary": "Berlin sample matched path in GeoJSON",
  "description": "Berlin sample matched path in GeoJSON",
  "layers": [
    {
      "id": "matched-trips",
      "name": "GeoJSON matched trips",
      "summary": "GeoJSON matched trips",
      "description": "GeoJSON matched trips",
      "contentType": "application/vnd.geo+json",
      "layerType": "versioned",
      "volume": {
        "volumeType": "durable"
      },
      "partitioning": {
        "scheme": "heretile",
        "tileLevels": [14]
      },
      "coverage": {
        "adminAreas": [
          "DE"
        ]
      }

    }
  ]
}

レベル 14 の GeoJSON レイヤーを使用して、プロジェクトにカタログを作成します(入力カタログと同じ)。

olp catalog create {{YOUR_CATALOG_ID}} "{{YOUR_USERNAME}} Matched Paths" \
    --config output-catalog.json

CLI は次のように戻ります。

Catalog {{YOUR_CATALOG_HRN}} has been created.

レルムで請求タグが必要な場合 は、layerセクションにbillingTags: ["YOUR_BILLING_TAG"]プロパティを追加して設定 ファイルを更新します。

パスマッチングアプリケーションを実装します

main() 本文には、パスマッチングロジックの位置ライブラリの直接の呼び出しが含まれています。 ヘルパーメソッドは 、データ クライアント ライブラリを介した入力データおよび出力データのマーシャリングを実装します。

Scala
Java

/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.hrn.HRN
import com.here.platform.data.client.base.scaladsl.BaseClient
import com.here.platform.data.client.engine.scaladsl.DataEngine
import com.here.platform.data.client.model.VersionDependency
import com.here.platform.data.client.scaladsl.{DataClient, NewPartition}
import com.here.platform.data.client.spark.DataClientSparkContextUtils
import com.here.platform.data.client.spark.LayerDataFrameReader._
import com.here.platform.data.client.spark.SparkSupport._
import com.here.platform.location.core.geospatial.GeoCoordinate
import com.here.platform.location.core.mapmatching.OnRoad
import com.here.platform.location.integration.optimizedmap.OptimizedMapLayers
import com.here.platform.location.integration.optimizedmap.dcl2.OptimizedMapCatalog
import com.here.platform.location.integration.optimizedmap.mapmatching.PathMatchers
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Dataset, Row, SparkSession}

// Since it is not convenient to recreate an instance of BaseClient, it's connection pool
// and OptimizedMapCatalog caches, they are stored in a singleton and called from it.
object OptimizedMapLayersSingleton {
  lazy val optimizedMapLayers: OptimizedMapLayers = {
    val pipelineContext = new PipelineContext
    val optimizedMapCatalogHrn = pipelineContext.config.inputCatalogs("location")
    val optimizedMapCatalogVersion = pipelineContext.job.get.inputCatalogs("location").version
    OptimizedMapCatalog
      .from(optimizedMapCatalogHrn)
      .usingBaseClient(BaseClient())
      .newInstance
      .version(optimizedMapCatalogVersion)
  }
}

/**
  * Read sensor data from an SDII archive and output the map-matched paths
  * to a GeoJSON output catalog with the same tiling scheme.
  */
object BatchPathMatchingScala {

  private val pipelineContext = new PipelineContext

  private val outputCatalogHrn = pipelineContext.config.outputCatalog
  private val outputLayer = "matched-trips"

  private val sensorDataArchiveHrn = pipelineContext.config.inputCatalogs("sensor")
  private val sensorDataArchiveVersionLayerName = "sample-index-layer"

  private val locationCatalogHrn = pipelineContext.config.inputCatalogs("location")
  private val locationCatalogVersion = pipelineContext.job.get.inputCatalogs("location").version

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().appName("BatchPathMatchingPipeline").getOrCreate()

    val sensorData = getSensorData(sparkSession)
    val matchedPaths = matchPaths(sparkSession, sensorData)
    publish(matchedPaths)

    sparkSession.stop()
  }

  /**
    * Retrieves the contents of the sensor data archive partitions.
    */
  private def getSensorData(sparkSession: SparkSession): Dataset[SensorData] = {
    import sparkSession.implicits._

    // Extracting input data.
    // There are two different options for the query parameter: `hour_from` and `tile_id`.
    // You can either make it `hour_from>0` to get all available messages or make it
    // `tile_id==$tile_id` to get messages by the specific partition.
    // Possible tile_ids = [377893756, 377894443, 377894442, ...].
    // For CN environment = [389695267, 389696772, 389695401, ...].
    // You can implement more complex queries with RSQL
    // (https://here-tech.skawa.fun/documentation/data-client-library/dev_guide/client/rsql.html)
    val sdiiMessages = sparkSession
      .readLayer(sensorDataArchiveHrn, sensorDataArchiveVersionLayerName)
      .query(s"hour_from>0")
      .load()

    val sensorData = sdiiMessages
      .select($"idx_tile_id" as "tileId", $"path.positionEstimate" as "fullPositionEstimate")
      .withColumn("positionEstimate", convertToGeoCoordinate($"fullPositionEstimate"))
      .groupBy("tileId")
      .agg(collect_list("positionEstimate") as "positionEstimateList")
      .as[SensorData]

    sensorData
  }

  /**
    * Map match each path inside each sensor archive tile
    * (https://en.wikipedia.org/wiki/Map_matching)
    * and return a list of lists of coordinates.
    * Each element in the top-level list represents a path as a list of coordinates.
    * Convert each path to GeoJson.
    */
  private def matchPaths(sparkSession: SparkSession,
                         sensorData: Dataset[SensorData]): Dataset[(Long, String)] = {
    import sparkSession.implicits._

    /**
      * Convert the trips to GeoJson; each trip is converted to a linestring.
      */
    def convertToGeoJSON(
        matchedTrips: Dataset[(Long, Seq[Seq[GeoCoordinate]])]): Dataset[(Long, String)] =
      matchedTrips.map { matchedTrip =>
        val features = matchedTrip._2
          .map(
            coordinates =>
              """{ "type": "Feature", "geometry": """ +
                """{ "type": "LineString", "coordinates": """ +
                coordinates.map(c => s"[${c.longitude}, ${c.latitude}]").mkString("[", ",", "]")
                + "}" + """, "properties": {} }""")
        val featureCollection =
          features.mkString("""{ "type": "FeatureCollection", "features": [""", ",", "]}")
        (matchedTrip._1, featureCollection)
      }

    val matchedTrips = sensorData.mapPartitions { sensorDataIterator =>
      val optimizedMap = OptimizedMapLayersSingleton.optimizedMapLayers
      val pathMatcher =
        PathMatchers(optimizedMap).carPathMatcherWithoutTransitions[GeoCoordinate]
      sensorDataIterator.map { sensorDataElement =>
        sensorDataElement.tileId ->
          sensorDataElement.positionEstimateList
            .map {
              pathMatcher
                .matchPath(_)
                .results
                .flatMap {
                  case OnRoad(matched) =>
                    Some(GeoCoordinate(matched.nearest.latitude, matched.nearest.longitude))
                  case _ => None
                }
            }
      }
    }

    convertToGeoJSON(matchedTrips)
  }

  val convertToGeoCoordinate: UserDefinedFunction = udf((positionEstimateRows: Seq[Row]) => {
    positionEstimateRows.map { positionEstimate =>
      val lat = positionEstimate.getAs[Double]("latitude_deg")
      val lon = positionEstimate.getAs[Double]("longitude_deg")
      GeoCoordinate(lat, lon)
    }
  })

  /**
    * Publish the GeoJSON partitions to the output catalog using Data Client Library.
    */
  private def publish(geoJsonByTile: Dataset[(Long, String)]): Unit = {
    val masterActorSystem =
      DataClientSparkContextUtils.context.actorSystem

    // Start commit on master.
    val masterPublishApi = DataClient(masterActorSystem).publishApi(outputCatalogHrn)

    // Get the latest catalog version of the output catalog.
    val baseVersion = masterPublishApi.getBaseVersion().awaitResult()

    // Fill in the direct and indirect dependencies for the output catalog, given the direct inputs.
    // This is good practice, especially if you intend to use the scheduler to
    // determine when the pipeline should run, (i.e., by using the --with-scheduler option
    // in
    // For users using platform.here.com:
    // https://here-tech.skawa.fun/documentation/open-location-platform-cli/user_guide/topics/pipeline/version-commands.html#pipeline-version-create
    // For users using platform.hereolp.cn:
    // https://here-tech.skawa.fun/cn/documentation/open-location-platform-cli/user_guide/topics/pipeline/version-commands.html#pipeline-version-create
    val dependencies =
      gatherDependencies(locationCatalogHrn, locationCatalogVersion)

    // Start a publication batch on top of most recent catalog version.
    val batchToken =
      masterPublishApi.startBatch2(baseVersion, Some(Seq(outputLayer)), dependencies).awaitResult()

    // Send partitions to workers, and upload data and metadata.
    geoJsonByTile.foreachPartition({ partitions: Iterator[(Long, String)] =>
      val workerActorSystem = DataClientSparkContextUtils.context.actorSystem
      val workerPublishApi = DataClient(workerActorSystem).publishApi(outputCatalogHrn)
      val workerWriteEngine = DataEngine(workerActorSystem).writeEngine(outputCatalogHrn)
      val committedPartitions =
        partitions.map {
          case (tileId: Long, geoJson: String) =>
            val newPartition =
              NewPartition(
                partition = tileId.toString,
                layer = outputLayer,
                data = NewPartition.ByteArrayData(geoJson.getBytes)
              )
            workerWriteEngine.put(newPartition).awaitResult()
        }
      workerPublishApi
        .publishToBatch(batchToken, committedPartitions)
        .awaitResult()
      ()
    })
    masterPublishApi.completeBatch(batchToken).awaitResult()
  }

  /**
    * Gather the dependencies for the output catalog which depends on the location library catalog.
    */
  private def gatherDependencies(locationHrn: HRN, locationVersion: Long) = {
    val sparkActorSystem = DataClientSparkContextUtils.context.actorSystem
    val locationLibraryQuery = DataClient(sparkActorSystem).queryApi(locationHrn)
    val locationLibraryDeps =
      locationLibraryQuery.getVersion(locationVersion).awaitResult().dependencies

    // The output catalog depends directly on the location library
    // catalog, and indirectly on its respective dependencies.
    locationLibraryDeps.map(_.copy(direct = false)) ++
      Seq(VersionDependency(locationHrn, locationVersion, direct = true))
  }

  /**
    * Class used for encoding data in Dataset.
    */
  case class SensorData(tileId: Long, positionEstimateList: Seq[Seq[GeoCoordinate]])
}



/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.collect_list;
import static org.apache.spark.sql.functions.udf;

import akka.actor.ActorSystem;
import akka.japi.Pair;
import com.here.hrn.HRN;
import com.here.platform.data.client.base.javadsl.BaseClient;
import com.here.platform.data.client.base.javadsl.BaseClientJava;
import com.here.platform.data.client.engine.javadsl.DataEngine;
import com.here.platform.data.client.engine.javadsl.WriteEngine;
import com.here.platform.data.client.javadsl.CommitPartition;
import com.here.platform.data.client.javadsl.DataClient;
import com.here.platform.data.client.javadsl.NewPartition;
import com.here.platform.data.client.javadsl.PublishApi;
import com.here.platform.data.client.javadsl.QueryApi;
import com.here.platform.data.client.model.BatchToken;
import com.here.platform.data.client.model.VersionDependency;
import com.here.platform.data.client.spark.DataClientSparkContextUtils;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import com.here.platform.location.core.geospatial.GeoCoordinate;
import com.here.platform.location.core.mapmatching.MatchResult;
import com.here.platform.location.core.mapmatching.NoTransition;
import com.here.platform.location.core.mapmatching.OnRoad;
import com.here.platform.location.core.mapmatching.javadsl.MatchResults;
import com.here.platform.location.core.mapmatching.javadsl.PathMatcher;
import com.here.platform.location.inmemory.graph.Vertex;
import com.here.platform.location.integration.optimizedmap.OptimizedMapLayers;
import com.here.platform.location.integration.optimizedmap.dcl2.javadsl.OptimizedMapCatalog;
import com.here.platform.location.integration.optimizedmap.mapmatching.javadsl.PathMatchers;
import com.here.platform.pipeline.PipelineContext;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataTypes;
import scala.collection.JavaConverters;
import scala.collection.mutable.WrappedArray;

/**
 * Read sensor data from an SDII archive and output the map-matched paths to a GeoJSON output
 * catalog with the same tiling scheme.
 */
public class BatchPathMatchingJava {

  private static final PipelineContext pipelineContext = new PipelineContext();

  private static final HRN OUTPUT_CATALOG_HRN = pipelineContext.getConfig().getOutputCatalog();
  private static final String OUTPUT_LAYER = "matched-trips";

  private static final HRN SENSOR_DATA_ARCHIVE_HRN =
      pipelineContext.getConfig().getInputCatalogs().get("sensor");
  private static final String SENSOR_DATA_ARCHIVE_VERSION_LAYER_NAME = "sample-index-layer";

  private static final HRN LOCATION_CATALOG_HRN =
      pipelineContext.getConfig().getInputCatalogs().get("location");
  private static final Long LOCATION_CATALOG_VERSION =
      pipelineContext.getJob().get().getInputCatalogs().get("location").version();

  // Since it is not convenient to recreate an instance of BaseClient, it's connection pool
  // and OptimizedMapCatalog caches, they are stored in a singleton and called from it.
  private enum OptimizedMapLayersSingleton {
    INSTANCE;

    private final OptimizedMapLayers optimizedMap;

    OptimizedMapLayersSingleton() {
      BaseClient baseClient = BaseClientJava.instance();
      optimizedMap =
          OptimizedMapCatalog.from(LOCATION_CATALOG_HRN)
              .usingBaseClient(baseClient)
              .newInstance()
              .version(LOCATION_CATALOG_VERSION);
    }

    private OptimizedMapLayers optimizedMap() {
      return optimizedMap;
    }
  }

  public static void main(String[] args) throws InterruptedException, ExecutionException {
    SparkSession sparkSession =
        SparkSession.builder().appName("BatchPathMatchingPipeline").getOrCreate();

    Dataset<SensorData> sensorData = getSensorData(sparkSession);
    JavaRDD<Pair<Long, String>> matchedPaths = matchPaths(sensorData);
    publish(matchedPaths);

    sparkSession.stop();
  }

  /** Retrieves the contents of the sensor data archive partitions. */
  private static Dataset<SensorData> getSensorData(SparkSession sparkSession) {

    // Extracting input data.
    // There are two different options for the query parameter: `hour_from` and `tile_id`.
    // You can either make it `hour_from>0` to get all available messages or make it
    // `tile_id==$tile_id` to get messages by the specific partition.
    // Possible tile_ids = [377893756, 377894443, 377894442, ...].
    // For CN environment = [389695267, 389696772, 389695401, ...].
    // You can implement more complex queries with RSQL
    // (https://here-tech.skawa.fun/documentation/data-client-library/dev_guide/client/rsql.html)
    Dataset<Row> sdiiMessages =
        JavaLayerDataFrameReader.create(sparkSession)
            .readLayer(SENSOR_DATA_ARCHIVE_HRN, SENSOR_DATA_ARCHIVE_VERSION_LAYER_NAME)
            .query("hour_from>0")
            .load();

    // Define structure of List of coordinate pairs - Spark uses it to encode data
    ArrayType schema = DataTypes.createArrayType(DataTypes.StringType);

    // UDF to extract only coordinates from the complex structure of path
    UserDefinedFunction positionEstimateToCoordinatePair =
        udf(
            (UDF1<WrappedArray<Row>, List<String>>)
                positionEstimateList ->
                    JavaConverters.seqAsJavaList(positionEstimateList)
                        .stream()
                        .map(
                            positionEstimate -> {
                              Double lat = positionEstimate.getAs("latitude_deg");
                              Double lon = positionEstimate.getAs("longitude_deg");
                              return lat + "," + lon;
                            })
                        .collect(Collectors.toList()),
            schema);

    Encoder<SensorData> sensorDataEncoder = Encoders.bean(SensorData.class);
    // Extract list of paths (each path represented as a list of coordinates)
    return sdiiMessages
        .select(
            col("idx_tile_id").as("tileId"),
            col("path.positionEstimate").as("fullPositionEstimate"))
        .withColumn(
            "positionEstimate", positionEstimateToCoordinatePair.apply(col("fullPositionEstimate")))
        .groupBy("tileId")
        .agg(collect_list("positionEstimate").as("positionEstimateList"))
        .as(sensorDataEncoder);
  }

  /**
   * Map match each path inside each sensor archive tile
   * (https://en.wikipedia.org/wiki/Map_matching) and return a list of lists of (lon,lat) string
   * pairs. Each element in the top-level list represents a path as a list of coordinates. Convert
   * each path to GeoJson.
   */
  private static JavaRDD<Pair<Long, String>> matchPaths(Dataset<SensorData> sensorData) {
    JavaRDD<Pair<Long, List<List<GeoCoordinate>>>> matchedTrips =
        sensorData
            .javaRDD()
            .mapPartitions(
                tiles -> {
                  OptimizedMapLayers optimizedMap =
                      OptimizedMapLayersSingleton.INSTANCE.optimizedMap();
                  PathMatcher<GeoCoordinate, Vertex, NoTransition> pathMatcher =
                      new PathMatchers(optimizedMap).carPathMatcherWithoutTransitions();
                  Iterable<SensorData> tileIterable = () -> tiles;
                  return StreamSupport.stream(tileIterable.spliterator(), false)
                      .map(
                          sensorDataElement ->
                              new Pair<>(
                                  sensorDataElement.getTileId(),
                                  getCoordinatesMatchedToPath(
                                      pathMatcher, sensorDataElement.getPositionEstimateList())))
                      .iterator();
                });

    return convertToGeoJSON(matchedTrips);
  }

  /** Using path matcher get new list of lists of GeoCoordinates, each list representing a path. */
  private static List<List<GeoCoordinate>> getCoordinatesMatchedToPath(
      PathMatcher<GeoCoordinate, Vertex, NoTransition> pathMatcher,
      List<List<String>> positionEstimatePathList) {
    return positionEstimatePathList
        .stream()
        .map(BatchPathMatchingJava::convertToGeoCoordinates)
        .map(
            positionEstimatePath ->
                pathMatcher
                    .matchPath(positionEstimatePath)
                    .results()
                    .stream()
                    .map(BatchPathMatchingJava::matchResultOnRoad)
                    .filter(Objects::nonNull)
                    .collect(Collectors.toList()))
        .collect(Collectors.toList());
  }

  /** Convert list of string pairs, representing coordinates to actual GeoCoordinates */
  private static List<GeoCoordinate> convertToGeoCoordinates(List<String> coordinatePairs) {
    return coordinatePairs
        .stream()
        .map(
            coordinatePair -> {
              String[] coordinates = coordinatePair.split(",", 2);
              double latitude = Double.parseDouble(coordinates[0]);
              double longitude = Double.parseDouble(coordinates[1]);
              return new GeoCoordinate(latitude, longitude);
            })
        .collect(Collectors.toList());
  }

  /** Construct new GeoCoordinates for on-road matches. */
  private static GeoCoordinate matchResultOnRoad(MatchResult<Vertex> matchResult) {
    if (MatchResults.isOnRoad(matchResult)) {
      OnRoad<Vertex> onRoad = (OnRoad<Vertex>) matchResult;
      GeoCoordinate nearest = onRoad.elementProjection().nearest();
      return new GeoCoordinate(nearest.latitude(), nearest.longitude());
    }
    return null;
  }

  /** Convert the trips to GeoJson; each trip is converted to a linestring. */
  private static JavaRDD<Pair<Long, String>> convertToGeoJSON(
      JavaRDD<Pair<Long, List<List<GeoCoordinate>>>> matchedTrips) {
    return matchedTrips.map(
        matchedTrip -> {
          String featureCollection =
              matchedTrip
                  .second()
                  .stream()
                  .map(BatchPathMatchingJava::mapToFeature)
                  .collect(
                      Collectors.joining(
                          ",", "{ \"type\": \"FeatureCollection\", \"features\": [", "]}"));
          return new Pair<>(matchedTrip.first(), featureCollection);
        });
  }

  /** Convert path to GeoJson Feature */
  private static String mapToFeature(List<GeoCoordinate> coordinates) {
    StringJoiner coordJoiner = new StringJoiner(",", "[", "]");
    coordinates
        .stream()
        .map(c -> "[" + c.getLongitude() + ", " + c.getLatitude() + "]")
        .forEach(coordJoiner::add);
    return "{ \"type\": \"Feature\", \"geometry\": "
        + "{ \"type\": \"LineString\", \"coordinates\": "
        + coordJoiner
        + "}"
        + ", \"properties\": {} }";
  }

  /** Publish the GeoJSON partitions to the output catalog using Data Client Library. */
  private static void publish(JavaRDD<Pair<Long, String>> geoJsonByTile)
      throws InterruptedException, ExecutionException {
    ActorSystem masterActorSystem = DataClientSparkContextUtils.context().actorSystem();

    // Start commit on master.
    PublishApi masterPublishApi = DataClient.get(masterActorSystem).publishApi(OUTPUT_CATALOG_HRN);

    // Get the latest catalog version of the output catalog.
    OptionalLong baseVersion = masterPublishApi.getBaseVersion().toCompletableFuture().get();

    // Fill in the direct and indirect dependencies for the output catalog, given the direct inputs.
    // This is good practice, especially if you intend to use the scheduler to
    // determine when the pipeline should run, (i.e., by using the --with-scheduler option
    // in
    // For users using platform.here.com:
    // https://here-tech.skawa.fun/documentation/open-location-platform-cli/user_guide/topics/pipeline/version-commands.html#pipeline-version-create
    // For users using platform.hereolp.cn:
    // https://here-tech.skawa.fun/cn/documentation/open-location-platform-cli/user_guide/topics/pipeline/version-commands.html#pipeline-version-create
    List<VersionDependency> dependencies =
        gatherDependencies(LOCATION_CATALOG_HRN, LOCATION_CATALOG_VERSION);

    // Start a publication batch on top of most recent catalog version.
    BatchToken batchToken =
        masterPublishApi
            .startBatch2(
                baseVersion, Optional.of(Collections.singletonList(OUTPUT_LAYER)), dependencies)
            .toCompletableFuture()
            .get();

    // Send partitions to workers, and upload data and metadata.
    geoJsonByTile.foreachPartition(
        partitions -> {
          ActorSystem workerActorSystem = DataClientSparkContextUtils.context().actorSystem();
          PublishApi workerPublishApi =
              DataClient.get(workerActorSystem).publishApi(OUTPUT_CATALOG_HRN);
          WriteEngine workerWriteEngine =
              DataEngine.get(workerActorSystem).writeEngine(OUTPUT_CATALOG_HRN);
          ArrayList<CommitPartition> commitPartitions = new ArrayList<>();

          while (partitions.hasNext()) {
            Pair<Long, String> content = partitions.next();
            NewPartition newPartition =
                new NewPartition.Builder()
                    .withPartition(content.first().toString())
                    .withLayer(OUTPUT_LAYER)
                    .withData(content.second().getBytes())
                    .build();
            commitPartitions.add(workerWriteEngine.put(newPartition).toCompletableFuture().join());
          }

          workerPublishApi
              .publishToBatch(batchToken, commitPartitions.iterator())
              .toCompletableFuture()
              .join();
        });

    // Complete the commit.
    masterPublishApi.completeBatch(batchToken).toCompletableFuture().join();
  }

  /**
   * Gather the dependencies for the output catalog which depends on the location library catalog.
   */
  private static List<VersionDependency> gatherDependencies(HRN locationHrn, Long locationVersion) {
    ActorSystem sparkActorSystem = DataClientSparkContextUtils.context().actorSystem();
    QueryApi locationLibraryQuery = DataClient.get(sparkActorSystem).queryApi(locationHrn);
    List<VersionDependency> locationDeps =
        locationLibraryQuery
            .getVersion(locationVersion)
            .toCompletableFuture()
            .join()
            .getDependencies();

    List<VersionDependency> retval =
        locationDeps
            .stream()
            .map(dep -> new VersionDependency(dep.hrn(), dep.version(), false))
            .collect(Collectors.toList());
    retval.add(new VersionDependency(locationHrn, locationVersion, true));
    return retval;
  }

  /**
   * Class used for encoding data in Dataset. Contains tileId and list of lists of pairs, each list
   * representing a path and each pair representing coordinate.
   */
  public static class SensorData implements Serializable {

    private Long tileId;
    private List<List<String>> positionEstimateList;

    public Long getTileId() {
      return tileId;
    }

    public void setTileId(Long tileId) {
      this.tileId = tileId;
    }

    public List<List<String>> getPositionEstimateList() {
      return positionEstimateList;
    }

    public void setPositionEstimateList(List<List<String>> positionEstimateList) {
      this.positionEstimateList = positionEstimateList;
    }
  }
}

パスマッチングアプリケーションへのカタログ入力を宣言します

パスマッチングアプリケーションへのカタログ入力を宣言するパイプライン構成ファイルは、次のとおりです。

output-catalog HERE リソースネームの {{YOUR_CATALOG_HRN}} の値を置き換えます。

pipeline-config.conf

pipeline.config {

  output-catalog {hrn = "{{YOUR_CATALOG_HRN}}"}

  input-catalogs {
    location {hrn = "hrn:here:data::olp-here:here-optimized-map-for-location-library-2"}
    sensor {hrn = "hrn:here:data::olp-here:olp-sdii-sample-berlin-2"}
  }
}

SDII センサーデータサンプルカタログ および HERE Optimized Map for Location Library カタログを 入力データカタログとして使用して、プロジェクトにリンクします。 これを行うには {{YOUR_PROJECT_HRN}} 、次のコマンドでをプロジェクトの HERE リソースネーム に置き換え、実行します。


olp project resource link {{YOUR_PROJECT_HRN}} hrn:here:data::olp-here:olp-sdii-sample-berlin-2
olp project resource link {{YOUR_PROJECT_HRN}} hrn:here:data::olp-here:here-optimized-map-for-location-library-2

CLI は、次のメッセージを返します。


Project resource hrn:here:data::olp-here:olp-sdii-sample-berlin-2 has been linked.
Project resource hrn:here:data::olp-here:here-optimized-map-for-location-library-2 has been linked.

pipeline-job.conf

// NOTE: If you are running the application in a pipeline with the scheduler,
// i.e., using the --with-scheduler option when creating the pipeline version
// (https://here-tech.skawa.fun/documentation/open-location-platform-cli/user_guide/topics/pipeline/version-commands.html#pipeline-version-create)
// then this file is not needed, as the scheduler will pick up the latest versions
// of the input catalogs.
pipeline.job.catalog-versions {

  output-catalog {base-version = -1}

  input-catalogs {
    location {
      processing-type = "reprocess"
      version = 3204
    }
    sensor {
      processing-type = "reprocess"
      version = 1
    }
  }
}

パイプライン設定ファイルの詳細について は、パイプライン API のマニュアルを参照してください。

ローカルでコンパイルおよび実行します

アプリケーションをローカルで実行するには、次のコマンドを実行します。

Scala
Java

mvn compile exec:java \
    -Dexec.mainClass=BatchPathMatchingScala \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf \
    -Dspark.master="local[*]"
    

mvn compile exec:java \
    -Dexec.mainClass=BatchPathMatchingJava \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf \
    -Dspark.master="local[*]"

詳細情報

このチュートリアルで扱うトピックの詳細については、次のソースを参照してください。 開発者ガイド[ コード例 ] ページ には、ロケーション ライブラリを使用したより詳細なコード例も一覧表示されます。

このシンプルなパスマッチングアプリケーションを Fat JAR ファイル として構築することもできます。

mvn -Pplatform clean package

パイプライン API 経由で展開できます。 パイプラインの展開の詳細について は、『 Pipelines 開発者ガイド』を参照してください。

」に一致する結果は 件です

    」に一致する結果はありません