Read from a catalog in a batch application

Objectives: Estimate the total length of the road geometries of the HERE Map Content.

Complexity: Beginner

Time to complete: 30 min

Prerequisites: Organize your work in projects

Source code: Download

When developing in the HERE Workspace with the purpose of deploying a job in a pipeline, you can choose between two runtime environments.

  • You can use batch to run Spark-based applications.
  • You can use stream to run Flink-based applications.

This example demonstrates a simple Spark batch application that downloads data from the HERE Map Content catalog topology-geometry layer, in order to estimate the total length of the road geometries present in the map.

The topology-geometry layer contains the HERE Map Content topology and the geometry of the road segments. The spatial partitioning scheme for this layer is HereTile. For more information on HereTile partitioning, see this document.

Each segment also contains a length attribute that represents its total length in meters.

First download the metadata for the layer that contains the list of partitions for the layer using the queryMetadata and select a random sample of about 1/1000 of the available partitions.

For each selected partition, download the related data and sum all the lengths available in each partition. This reduces the resulting RDD of doubles to a single number and sums up all the values present in the selected tiles.

Set up the project in Maven

In order to develop an application which runs on pipelines with Spark, use the sdk-batch-bom_2.12 as the parent pom for our application:

<parent>
    <groupId>com.here.platform</groupId>
    <artifactId>sdk-batch-bom_2.12</artifactId>
    <version>2.54.3</version>
    <relativePath/>
</parent>

Adjust dependencies for Scala and Java.

<dependencies>
    <dependency>
        <groupId>com.here.platform.pipeline</groupId>
        <artifactId>pipeline-interface_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.schema.rib</groupId>
        <artifactId>topology-geometry_v2_scala_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.schema.rib</groupId>
        <artifactId>topology-geometry_v2_java</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.platform.data.client</groupId>
        <artifactId>spark-support_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.hrn</groupId>
        <artifactId>hrn_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
    </dependency>
</dependencies>

Implement the application

This application implements MapReduce over partitions in the topology-geometry layer, summing the lengths of all road segments in each partition.

Instead of summing lengths over all the partitions, this application samples a small subset of partitions and divides the sum of all their lengths by the sampling rate to estimate the total length over all partitions. This produces a reasonable estimation in a fraction of the time.

Note

At the time of writing, there are approximately 59 million km of geometries in the HERE catalog.

Scala
Java
/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import akka.actor.{ActorSystem, CoordinatedShutdown}
import akka.actor.CoordinatedShutdown.UnknownReason
import com.here.hrn.HRN
import com.here.platform.data.client.engine.scaladsl.DataEngine
import com.here.platform.data.client.scaladsl.{DataClient, Partition}
import com.here.platform.data.client.spark.DataClientSparkContextUtils
import com.here.platform.pipeline.PipelineContext
import com.here.schema.rib.v2.topology_geometry_partition.TopologyGeometryPartition
import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory

import scala.concurrent.Await
import scala.concurrent.duration.Duration

// Import the .awaitResult method
import com.here.platform.data.client.spark.SparkSupport._

object SparkPipelineScala {

  private val logger = LoggerFactory.getLogger(SparkPipelineScala.getClass)
  private val sampleFraction = 1.0 / 1000.0

  def main(args: Array[String]): Unit = {

    val sparkContext: SparkContext = new SparkContext(new SparkConf().setAppName("SparkPipeline"))

    val pipelineContext = new PipelineContext
    val hereMapContent = pipelineContext.config.inputCatalogs("hereMapContent")
    val hereMapContentVersion = pipelineContext.job.get.inputCatalogs("hereMapContent").version
    val outputCatalog = pipelineContext.config.outputCatalog

    val config: Config = ConfigFactory.empty
      .withValue("here.platform.data-client.endpoint-locator.discovery-service-env",
                 ConfigValueFactory.fromAnyRef("custom"))
      .withValue(
        "here.platform.data-client.endpoint-locator.discovery-service-url",
        // Please use https://api-lookup.data.api.platform.hereolp.cn URL for China Environment
        // We should define a custom URL, specific to China Environment, for a discovery service
        // endpoint that allows discovering various Data Service APIs like publish, metadata, query, etc.
        ConfigValueFactory.fromAnyRef("https://api-lookup.data.api.platform.here.com")
      )

    val appName = "SparkPipelineExampleScala"
    implicit lazy val actorSystem: ActorSystem = ActorSystem.create(appName, config)
    try {
      val topologyLayerMetadata: RDD[Partition] =
        queryMetadata(hereMapContent,
                      hereMapContentVersion,
                      "topology-geometry",
                      sparkContext,
                      actorSystem)
          .sample(withReplacement = true, sampleFraction)

      val topologyPartitions: RDD[TopologyGeometryPartition] =
        topologyLayerMetadata.map(readTopologyGeometry(hereMapContent))

      // gather all the topology segment lengths (in meters)
      val roadLengths: RDD[Double] =
        topologyPartitions.map(_.segment.map(_.length).sum)

      // sum up all the lengths and extrapolate the results in m
      // divided by sampleProbability to extrapolate the value to the global sample
      val totalMeters: Double =
        roadLengths.reduce(_ + _) / sampleFraction

      // Note that the default pipeline logging level is "warn". If
      // you are running this on a pipeline,
      // be sure to set the logging level accordingly
      // in order to see this message in the Splunk logs.
      // For more details, please see
      // For users using platform.here.com:
      // https://here-tech.skawa.fun/documentation/pipeline/topics/pipeline-logging.html
      // https://here-tech.skawa.fun/documentation/open-location-platform-cli/user_guide/topics/pipeline/log-commands.html
      // For users using platform.hereolp.cn:
      // https://here-tech.skawa.fun/cn/documentation/pipeline/topics/pipeline-logging.html
      // https://here-tech.skawa.fun/cn/documentation/open-location-platform-cli/user_guide/topics/pipeline/log-commands.html

      logger.info(f"The estimated length of all roads in the map is: $totalMeters%.0fm")

      // In a production pipeline the output will be written to the output catalog
      logger.info(s"The configured output catalog is: $outputCatalog")

    } finally {
      val shutdown = CoordinatedShutdown(actorSystem).run(UnknownReason)
      Await.result(shutdown, Duration.Inf)
    }
  }

  // Loads the list of available partitions from a catalog layer of a given version as an RDD
  private def queryMetadata(catalog: HRN,
                            catalogVersion: Long,
                            layerName: String,
                            sparkContext: SparkContext,
                            actorSystem: ActorSystem): RDD[Partition] = {
    val query = DataClient(actorSystem).queryApi(catalog)
    val partitions = query.getPartitionsAsIterator(catalogVersion, layerName)
    sparkContext.parallelize(partitions.awaitResult.toStream)
  }

  // Download and decode the data for one partition
  private def readTopologyGeometry(catalog: HRN)(partition: Partition) =
    TopologyGeometryPartition.parseFrom(read(catalog)(partition))

  // Download the raw data for one partition
  private def read(catalog: HRN)(partition: Partition) =
    DataEngine(DataClientSparkContextUtils.context.actorSystem)
      .readEngine(catalog)
      .getDataAsBytes(partition)
      .awaitResult()
}

/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import akka.actor.ActorSystem;
import akka.actor.CoordinatedShutdown;
import com.google.protobuf.InvalidProtocolBufferException;
import com.here.hrn.HRN;
import com.here.platform.data.client.engine.javadsl.DataEngine;
import com.here.platform.data.client.javadsl.DataClient;
import com.here.platform.data.client.javadsl.Partition;
import com.here.platform.data.client.javadsl.QueryApi;
import com.here.platform.data.client.spark.DataClientSparkContextUtils;
import com.here.platform.pipeline.PipelineContext;
import com.here.schema.rib.v2.TopologyGeometry;
import com.here.schema.rib.v2.TopologyGeometryPartitionOuterClass.TopologyGeometryPartition;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkPipelineJava {

  private static final double sampleFraction = 1.0 / 1000.0;

  public static void main(String[] args) {
    Logger logger = LoggerFactory.getLogger(SparkPipelineJava.class);
    JavaSparkContext sparkContext =
        new JavaSparkContext(new SparkConf().setAppName("SparkPipeline"));

    PipelineContext pipelineContext = new PipelineContext();
    HRN hereMapContent = pipelineContext.getConfig().getInputCatalogs().get("hereMapContent");
    Long hereMapContentVersion =
        pipelineContext.getJob().get().getInputCatalogs().get("hereMapContent").version();
    HRN outputCatalog = pipelineContext.getConfig().getOutputCatalog();

    Config config = ConfigFactory.empty();
    config =
        config.withValue(
            "here.platform.data-client.endpoint-locator.discovery-service-env",
            ConfigValueFactory.fromAnyRef("custom"));
    config =
        config.withValue(
            "here.platform.data-client.endpoint-locator.discovery-service-url",
            // Please use https://api-lookup.data.api.platform.hereolp.cn URL for China Environment
            // We should define a custom URL, specific to China Environment, for a discovery service
            // endpoint that allows discovering various Data Service APIs like publish, metadata,
            // query, etc.
            ConfigValueFactory.fromAnyRef("https://api-lookup.data.api.platform.here.com"));

    ActorSystem sparkActorSystem = ActorSystem.create("SparkPipelineExampleJava", config);

    try {
      JavaRDD<Partition> topologyLayerMetadata =
          queryMetadata(
                  hereMapContent,
                  hereMapContentVersion,
                  "topology-geometry",
                  sparkContext,
                  sparkActorSystem)
              .sample(true, sampleFraction);

      TopologyGeometryReader readTopologyGeometry = new TopologyGeometryReader(hereMapContent);
      JavaRDD<TopologyGeometryPartition> topologyGeometryPartition =
          topologyLayerMetadata.map(readTopologyGeometry::read);

      // gather all the topology segment lengths (in meters)
      JavaRDD<Double> roadLengths =
          topologyGeometryPartition.map(
              partition ->
                  partition
                      .getSegmentList()
                      .stream()
                      .map(TopologyGeometry.Segment::getLength)
                      .mapToDouble(Double::doubleValue)
                      .sum());

      // sum up all the lengths and extrapolate the results in m
      // divided by sampleProbability to extrapolate the value to the global sample
      Double totalMeters = roadLengths.reduce(Double::sum) / sampleFraction;

      // Note that the default pipeline logging level is "warn". If
      // you are running this on a pipeline,
      // be sure to set the logging level accordingly
      // in order to see this message in the Splunk logs.
      // For more details, please see
      // For users using platform.here.com:
      // https://here-tech.skawa.fun/documentation/pipeline/topics/pipeline-logging.html
      // https://here-tech.skawa.fun/documentation/open-location-platform-cli/user_guide/topics/pipeline/log-commands.html
      // For users using platform.hereolp.cn:
      // https://here-tech.skawa.fun/cn/documentation/pipeline/topics/pipeline-logging.html
      // https://here-tech.skawa.fun/cn/documentation/open-location-platform-cli/user_guide/topics/pipeline/log-commands.html
      logger.info(
          String.format("The estimated length of all roads in the map is: %.0fm", totalMeters));

      // In a production pipeline the output will be written to your output catalog
      logger.info(String.format("The configured output catalog is: %s", outputCatalog));

    } finally {
      CoordinatedShutdown.get(sparkActorSystem)
          .runAll(CoordinatedShutdown.unknownReason())
          .toCompletableFuture()
          .join();
    }
  }

  // Loads the list of available partitions from a catalog layer of a given version as an RDD
  private static JavaRDD<Partition> queryMetadata(
      HRN catalog,
      Long catalogVersion,
      String layerName,
      JavaSparkContext sparkContext,
      ActorSystem sparkActorSystem) {
    QueryApi query = DataClient.get(sparkActorSystem).queryApi(catalog);
    ArrayList<Partition> partitions = new ArrayList<>();
    query
        .getPartitionsAsIterator(catalogVersion, layerName, Collections.emptySet())
        .toCompletableFuture()
        .join()
        .forEachRemaining(partitions::add);
    return sparkContext.parallelize(partitions);
  }
}

//// Download and decode the data for one partition
class TopologyGeometryReader implements Serializable {
  private HRN catalog;

  TopologyGeometryReader(HRN catalog) {
    this.catalog = catalog;
  }

  TopologyGeometryPartition read(Partition partition) throws InvalidProtocolBufferException {
    return TopologyGeometryPartition.parseFrom(readRaw(partition));
  }

  private byte[] readRaw(Partition partition) {
    return DataEngine.get(DataClientSparkContextUtils.context().actorSystem())
        .readEngine(catalog)
        .getDataAsBytes(partition)
        .toCompletableFuture()
        .join();
  }
}

Configure the pipeline

This pipeline-config.conf file declares the HRN for HERE Map Content as the heremapcontent input catalog for the pipeline, as well as an HRN for the output catalog. This pipeline does not write an output catalog, so the output HRN is just a dummy value.

In a production pipeline, the output HRN would point to an existing catalog to which the app and/or sharing group has write permissions. For more on managing these permissions, see this document.

pipeline.config {

  //Please use "hrn:here-cn:data::olp-cn-here:dummy" on China environment
  output-catalog {hrn = "hrn:here:data::olp-here:dummy"}

  input-catalogs {
    //Please, use hrn:here-cn:data::olp-cn-here:here-map-content-china-2 on China Environment
    hereMapContent {hrn = "hrn:here:data::olp-here:rib-2"}
  }
}

In this tutorial, a public catalog is used - HERE Map Content Catalog. The catalog should be linked to your project first to be used within the project. To do this, replace {{YOUR_PROJECT_HRN}} with the actual HRN of the project you created as described in the Organize your work in projects tutorial and execute the following command:


olp project resource link {{YOUR_PROJECT_HRN}} hrn:here-cn:data::olp-cn-here:here-map-content-china-2

If your command is successful, the CLI returns the following message:


Project resource hrn:here-cn:data::olp-cn-here:here-map-content-china-2 has been linked.

This pipeline-job.conf file declares the versions of the heremapcontent input and the dummy output.

pipeline.job.catalog-versions {

  output-catalog {base-version = -1}

  input-catalogs {
    hereMapContent {
      processing-type = "reprocess"
      // Please, use "version = 0" on China Environment
      version = 4
    }
  }
}

Run the application

To run the application locally, execute the following command:

Run Scala
Run Java

mvn compile exec:java \
    -Dexec.mainClass=SparkPipelineScala \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf \
    -Dspark.master="local[*]" \
    -Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
    

mvn compile exec:java \
    -Dexec.mainClass=SparkPipelineJava \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf \
    -Dspark.master="local[*]" \
    -Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}

Consider the set of -Dhere.platform.data-client.request-signer.credentials.here-account.* params. We specify these parameters to pass data from credentials.properties file and {{YOUR_PROJECT_HRN}} to the Data Client Library. For more details about the Data Client Library initialization, see the Set Your Credentials via Java System Properties.

Further information

From here, you can try running this job in a pipeline.

Pipeline commands in the OLP CLI Developer Guide.

Remove the call to sample on the metadata and the scaling of the final result in the totalMeters variable. This transforms the program from an estimator to a parallel program which sums up all the values in the catalog in a few minutes.

You can also try to publish the information on the total number of km into a text\plain layer with generic partitioning and a single partition with catalog information. See the tutorial on Organize your work in projects and the Command Line Interface Developer Guide for more information on creating catalogs and configuring layers.

results matching ""

    No results matching ""