バッチアプリケーションでカタログから読み取ります

目的: HERE Map Content の道路形状の合計長さを推定します。

複雑さ: 初心者向け

所要時間: 30 分

前提条件: プロジェクトでの作業を整理します

ソースコード: ダウンロード

パイプラインでジョブを展開する目的で HERE Workspace で開発する場合、 2 つのランタイム環境から選択できます。

  • バッチを使用して、 Spark ベースのアプリケーションを実行できます。
  • ストリームを使用して、 Flinkベースのアプリケーションを実行できます。

この例 では、マップ内に存在する道路ジオメトリの合計長を推定するために、 HERE Map Content カタログtopology-geometryレイヤーからデータをダウンロードする単純な Spark バッチアプリケーションを示します。

topology-geometry このレイヤーには、 HERE Map Content トポロジーと道路セグメントの形状が含まれています。 このレイヤーの空間パーティション分割方式はです HereTileHereTile パーティション分割の詳細については 、このドキュメントを参照してください。

各セグメントには length 、その合計長をメートル単位で表す属性も含まれています。

まず、を使用 queryMetadata して、レイヤーのパーティションのリストが含まれているレイヤーのメタデータをダウンロードし、使用可能なパーティションの約 1/1000 のランダムなサンプルを選択します。

選択したパーティションごとに、関連するデータをダウンロードし、各パーティションで使用できるすべての読み取り桁数を合計します。 RDD これにより、倍精度浮動小数点数型の結果が 1 つの数値に削減され、選択したタイルに存在するすべての値が合計されます。

Maven でプロジェクトを設定します

Spark を使用してパイプラインで実行されるアプリケーションを開発するには sdk-batch-bom_2.12 、をアプリケーションの親ポムとして使用します。

<parent>
    <groupId>com.here.platform</groupId>
    <artifactId>sdk-batch-bom_2.12</artifactId>
    <version>2.54.3</version>
    <relativePath/>
</parent>

Scala と Java の依存関係を調整します。

<dependencies>
    <dependency>
        <groupId>com.here.platform.pipeline</groupId>
        <artifactId>pipeline-interface_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.schema.rib</groupId>
        <artifactId>topology-geometry_v2_scala_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.schema.rib</groupId>
        <artifactId>topology-geometry_v2_java</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.platform.data.client</groupId>
        <artifactId>spark-support_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.hrn</groupId>
        <artifactId>hrn_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
    </dependency>
</dependencies>

アプリケーションを実装します

このアプリケーションは 、topology-geometryレイヤー内のパーティションを介して MapReduce を実装し、各パーティションのすべての道路セグメントの長さを合計します。

このアプリケーションは、すべてのパーティションの長さを合計する代わりに、パーティションの小さなサブセットをサンプリングし、すべての長さの合計をサンプリングレートで割り、すべてのパーティションの合計長を推定します。 これにより、妥当な見積もりが短時間で作成されます。

執筆時点では、 HERE カタログには約 5,900 万 km のジオメトリがあります。

Scala
Java
/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import akka.actor.{ActorSystem, CoordinatedShutdown}
import akka.actor.CoordinatedShutdown.UnknownReason
import com.here.hrn.HRN
import com.here.platform.data.client.engine.scaladsl.DataEngine
import com.here.platform.data.client.scaladsl.{DataClient, Partition}
import com.here.platform.data.client.spark.DataClientSparkContextUtils
import com.here.platform.pipeline.PipelineContext
import com.here.schema.rib.v2.topology_geometry_partition.TopologyGeometryPartition
import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory

import scala.concurrent.Await
import scala.concurrent.duration.Duration

// Import the .awaitResult method
import com.here.platform.data.client.spark.SparkSupport._

object SparkPipelineScala {

  private val logger = LoggerFactory.getLogger(SparkPipelineScala.getClass)
  private val sampleFraction = 1.0 / 1000.0

  def main(args: Array[String]): Unit = {

    val sparkContext: SparkContext = new SparkContext(new SparkConf().setAppName("SparkPipeline"))

    val pipelineContext = new PipelineContext
    val hereMapContent = pipelineContext.config.inputCatalogs("hereMapContent")
    val hereMapContentVersion = pipelineContext.job.get.inputCatalogs("hereMapContent").version
    val outputCatalog = pipelineContext.config.outputCatalog

    val config: Config = ConfigFactory.empty
      .withValue("here.platform.data-client.endpoint-locator.discovery-service-env",
                 ConfigValueFactory.fromAnyRef("custom"))
      .withValue(
        "here.platform.data-client.endpoint-locator.discovery-service-url",
        // Please use https://api-lookup.data.api.platform.hereolp.cn URL for China Environment
        // We should define a custom URL, specific to China Environment, for a discovery service
        // endpoint that allows discovering various Data Service APIs like publish, metadata, query, etc.
        ConfigValueFactory.fromAnyRef("https://api-lookup.data.api.platform.here.com")
      )

    val appName = "SparkPipelineExampleScala"
    implicit lazy val actorSystem: ActorSystem = ActorSystem.create(appName, config)
    try {
      val topologyLayerMetadata: RDD[Partition] =
        queryMetadata(hereMapContent,
                      hereMapContentVersion,
                      "topology-geometry",
                      sparkContext,
                      actorSystem)
          .sample(withReplacement = true, sampleFraction)

      val topologyPartitions: RDD[TopologyGeometryPartition] =
        topologyLayerMetadata.map(readTopologyGeometry(hereMapContent))

      // gather all the topology segment lengths (in meters)
      val roadLengths: RDD[Double] =
        topologyPartitions.map(_.segment.map(_.length).sum)

      // sum up all the lengths and extrapolate the results in m
      // divided by sampleProbability to extrapolate the value to the global sample
      val totalMeters: Double =
        roadLengths.reduce(_ + _) / sampleFraction

      // Note that the default pipeline logging level is "warn". If
      // you are running this on a pipeline,
      // be sure to set the logging level accordingly
      // in order to see this message in the Splunk logs.
      // For more details, please see
      // For users using platform.here.com:
      // https://here-tech.skawa.fun/documentation/pipeline/topics/pipeline-logging.html
      // https://here-tech.skawa.fun/documentation/open-location-platform-cli/user_guide/topics/pipeline/log-commands.html
      // For users using platform.hereolp.cn:
      // https://here-tech.skawa.fun/cn/documentation/pipeline/topics/pipeline-logging.html
      // https://here-tech.skawa.fun/cn/documentation/open-location-platform-cli/user_guide/topics/pipeline/log-commands.html

      logger.info(f"The estimated length of all roads in the map is: $totalMeters%.0fm")

      // In a production pipeline the output will be written to the output catalog
      logger.info(s"The configured output catalog is: $outputCatalog")

    } finally {
      val shutdown = CoordinatedShutdown(actorSystem).run(UnknownReason)
      Await.result(shutdown, Duration.Inf)
    }
  }

  // Loads the list of available partitions from a catalog layer of a given version as an RDD
  private def queryMetadata(catalog: HRN,
                            catalogVersion: Long,
                            layerName: String,
                            sparkContext: SparkContext,
                            actorSystem: ActorSystem): RDD[Partition] = {
    val query = DataClient(actorSystem).queryApi(catalog)
    val partitions = query.getPartitionsAsIterator(catalogVersion, layerName)
    sparkContext.parallelize(partitions.awaitResult.toStream)
  }

  // Download and decode the data for one partition
  private def readTopologyGeometry(catalog: HRN)(partition: Partition) =
    TopologyGeometryPartition.parseFrom(read(catalog)(partition))

  // Download the raw data for one partition
  private def read(catalog: HRN)(partition: Partition) =
    DataEngine(DataClientSparkContextUtils.context.actorSystem)
      .readEngine(catalog)
      .getDataAsBytes(partition)
      .awaitResult()
}

/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import akka.actor.ActorSystem;
import akka.actor.CoordinatedShutdown;
import com.google.protobuf.InvalidProtocolBufferException;
import com.here.hrn.HRN;
import com.here.platform.data.client.engine.javadsl.DataEngine;
import com.here.platform.data.client.javadsl.DataClient;
import com.here.platform.data.client.javadsl.Partition;
import com.here.platform.data.client.javadsl.QueryApi;
import com.here.platform.data.client.spark.DataClientSparkContextUtils;
import com.here.platform.pipeline.PipelineContext;
import com.here.schema.rib.v2.TopologyGeometry;
import com.here.schema.rib.v2.TopologyGeometryPartitionOuterClass.TopologyGeometryPartition;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkPipelineJava {

  private static final double sampleFraction = 1.0 / 1000.0;

  public static void main(String[] args) {
    Logger logger = LoggerFactory.getLogger(SparkPipelineJava.class);
    JavaSparkContext sparkContext =
        new JavaSparkContext(new SparkConf().setAppName("SparkPipeline"));

    PipelineContext pipelineContext = new PipelineContext();
    HRN hereMapContent = pipelineContext.getConfig().getInputCatalogs().get("hereMapContent");
    Long hereMapContentVersion =
        pipelineContext.getJob().get().getInputCatalogs().get("hereMapContent").version();
    HRN outputCatalog = pipelineContext.getConfig().getOutputCatalog();

    Config config = ConfigFactory.empty();
    config =
        config.withValue(
            "here.platform.data-client.endpoint-locator.discovery-service-env",
            ConfigValueFactory.fromAnyRef("custom"));
    config =
        config.withValue(
            "here.platform.data-client.endpoint-locator.discovery-service-url",
            // Please use https://api-lookup.data.api.platform.hereolp.cn URL for China Environment
            // We should define a custom URL, specific to China Environment, for a discovery service
            // endpoint that allows discovering various Data Service APIs like publish, metadata,
            // query, etc.
            ConfigValueFactory.fromAnyRef("https://api-lookup.data.api.platform.here.com"));

    ActorSystem sparkActorSystem = ActorSystem.create("SparkPipelineExampleJava", config);

    try {
      JavaRDD<Partition> topologyLayerMetadata =
          queryMetadata(
                  hereMapContent,
                  hereMapContentVersion,
                  "topology-geometry",
                  sparkContext,
                  sparkActorSystem)
              .sample(true, sampleFraction);

      TopologyGeometryReader readTopologyGeometry = new TopologyGeometryReader(hereMapContent);
      JavaRDD<TopologyGeometryPartition> topologyGeometryPartition =
          topologyLayerMetadata.map(readTopologyGeometry::read);

      // gather all the topology segment lengths (in meters)
      JavaRDD<Double> roadLengths =
          topologyGeometryPartition.map(
              partition ->
                  partition
                      .getSegmentList()
                      .stream()
                      .map(TopologyGeometry.Segment::getLength)
                      .mapToDouble(Double::doubleValue)
                      .sum());

      // sum up all the lengths and extrapolate the results in m
      // divided by sampleProbability to extrapolate the value to the global sample
      Double totalMeters = roadLengths.reduce(Double::sum) / sampleFraction;

      // Note that the default pipeline logging level is "warn". If
      // you are running this on a pipeline,
      // be sure to set the logging level accordingly
      // in order to see this message in the Splunk logs.
      // For more details, please see
      // For users using platform.here.com:
      // https://here-tech.skawa.fun/documentation/pipeline/topics/pipeline-logging.html
      // https://here-tech.skawa.fun/documentation/open-location-platform-cli/user_guide/topics/pipeline/log-commands.html
      // For users using platform.hereolp.cn:
      // https://here-tech.skawa.fun/cn/documentation/pipeline/topics/pipeline-logging.html
      // https://here-tech.skawa.fun/cn/documentation/open-location-platform-cli/user_guide/topics/pipeline/log-commands.html
      logger.info(
          String.format("The estimated length of all roads in the map is: %.0fm", totalMeters));

      // In a production pipeline the output will be written to your output catalog
      logger.info(String.format("The configured output catalog is: %s", outputCatalog));

    } finally {
      CoordinatedShutdown.get(sparkActorSystem)
          .runAll(CoordinatedShutdown.unknownReason())
          .toCompletableFuture()
          .join();
    }
  }

  // Loads the list of available partitions from a catalog layer of a given version as an RDD
  private static JavaRDD<Partition> queryMetadata(
      HRN catalog,
      Long catalogVersion,
      String layerName,
      JavaSparkContext sparkContext,
      ActorSystem sparkActorSystem) {
    QueryApi query = DataClient.get(sparkActorSystem).queryApi(catalog);
    ArrayList<Partition> partitions = new ArrayList<>();
    query
        .getPartitionsAsIterator(catalogVersion, layerName, Collections.emptySet())
        .toCompletableFuture()
        .join()
        .forEachRemaining(partitions::add);
    return sparkContext.parallelize(partitions);
  }
}

//// Download and decode the data for one partition
class TopologyGeometryReader implements Serializable {
  private HRN catalog;

  TopologyGeometryReader(HRN catalog) {
    this.catalog = catalog;
  }

  TopologyGeometryPartition read(Partition partition) throws InvalidProtocolBufferException {
    return TopologyGeometryPartition.parseFrom(readRaw(partition));
  }

  private byte[] readRaw(Partition partition) {
    return DataEngine.get(DataClientSparkContextUtils.context().actorSystem())
        .readEngine(catalog)
        .getDataAsBytes(partition)
        .toCompletableFuture()
        .join();
  }
}

パイプラインを設定します

この pipeline-config.conf ファイルでは、 HERE リソースネーム for HERE Map Content が heremapcontent パイプラインの入力カタログ、および出力カタログの HERE リソースネーム として宣言されています。 このパイプラインは出力カタログを記述しないため、出力 HERE リソースネーム は単なるダミー値にすぎません。

本番パイプラインでは、出力 HERE リソースネーム は、アプリまたは共有グループ(あるいはその両方)が書き込み権限を持つ既存のカタログを参照します。 これらの権限の管理の詳細について は、このドキュメントを参照してください。

pipeline.config {

  //Please use "hrn:here-cn:data::olp-cn-here:dummy" on China environment
  output-catalog {hrn = "hrn:here:data::olp-here:dummy"}

  input-catalogs {
    //Please, use hrn:here-cn:data::olp-cn-here:here-map-content-china-2 on China Environment
    hereMapContent {hrn = "hrn:here:data::olp-here:rib-2"}
  }
}

このチュートリアルでは、パブリックカタログ( HERE Map Content カタログ)を使用します。 カタログは、まずプロジェクトにリンクして、プロジェクト内で使用する必要があります。 これを行うには、{{YOUR_PROJECT_HRN}}「プロジェクトでの作業の整理」チュートリアルの説明に従って作成したプロジェクトの実際の HERE リソースネームで置き換え、次のコマンドを実行します。


olp project resource link {{YOUR_PROJECT_HRN}} hrn:here:data::olp-here:rib-2

コマンドが成功すると、 CLI は次のメッセージを返します。


Project resource hrn:here:data::olp-here:rib-2 has been linked.

この pipeline-job.conf ファイルは heremapcontent 、入力およびダミー出力のバージョンを宣言します。

pipeline.job.catalog-versions {

  output-catalog {base-version = -1}

  input-catalogs {
    hereMapContent {
      processing-type = "reprocess"
      // Please, use "version = 0" on China Environment
      version = 4
    }
  }
}

アプリケーションを実行します

アプリケーションをローカルで実行するには、次のコマンドを実行します。

Scala を実行します
Java を実行します

mvn compile exec:java \
    -Dexec.mainClass=SparkPipelineScala \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf \
    -Dspark.master="local[*]" \
    -Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
    

mvn compile exec:java \
    -Dexec.mainClass=SparkPipelineJava \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf \
    -Dspark.master="local[*]" \
    -Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}

-Dhere.platform.data-client.request-signer.credentials.here-account.* パラメーターのセットを考慮してください。 credentials.properties ファイルから {{YOUR_PROJECT_HRN}} データ クライアント ライブラリにデータを渡すには、これらのパラメーターを指定します。 データ クライアント ライブラリの初期化の詳細について は、「 Java システムのプロパティを使用して資格情報を設定する」を参照してください。

詳細情報

HERE から、このジョブをパイプラインで実行できます。

OLP CLI 開発者ガイドのパイプラインコマンド

sample メタデータでの呼び出しと totalMeters 、変数の最終結果のスケーリングを削除します。 これにより、プログラムが概算見積書から並列プログラムに変換され、カタログ内のすべての値が数分で合計されます。

また、 KM の合計数に関する情報を、 generic パーティション分割された 1 つの text\plain レイヤーと、カタログ情報を含む 1 つのパーティションに公開することもできます。 カタログの作成およびレイヤーの設定の詳細については、「プロジェクトでの作業の整理」のチュートリアルおよびコマンド ライン インターフェース開発者ガイドを参照してください。

」に一致する結果は 件です

    」に一致する結果はありません