Use Spark Connector to read and write data

Objectives: Understand how to use the Spark Connector to read and write data from different layers and data formats in a catalog.

Complexity: Beginner

Time to complete: 30 min

Prerequisites: Organize your work in projects

Source code: Download

The example in this tutorial demonstrates how to use the Spark Connector provided by the Data Client Library. This provides support for interacting with Spark for batch processing workloads, allowing the use of all standard APIs and functions in Spark to read, write, and delete data. For stream processing workloads, you should use the provided Flink Connector instead.

In the main part of the tutorial, the following usages are covered:

  • Reading, printing, and collecting data from the index layer in the avro format (note that the Spark connector is able to infer the format, and that this does not need to be specified)
  • Reading data from the versioned layer in protobuf format, and then transforming and writing it to the volatile layer in the parquet format
  • Reading, filtering, augmenting, and writing data from versioned layer in a custom format.

As a preparation step, you must create your data sources, including their respective catalog with appropriate layers, so that these are in place when it comes to the main part of this tutorial. The dataset used is sourced from the HERE Map Content catalog, and contains information on road network topology and geometry.

Set up the Maven project

Create the following folder structure for the project:

spark-connector
└── src
    └── main
        ├── java
        └── resources
        └── scala

You can do this with a single bash command:

mkdir -p spark-connector/src/main/{java,resources,scala}

Create the input and output catalogs

Create a file named pipeline-config.conf, and populate it with the following contents, replacing {{YOUR_CATALOG_HRN}} and {{YOUR_OUTPUT_CATALOG_HRN}} with the HRN to the catalog you created in Organize your work in projects.

pipeline.config {

  output-catalog { hrn = "{{YOUR_OUTPUT_CATALOG_HRN}}" }

  input-catalogs {
      //Please, use hrn:here-cn:data::olp-cn-here:here-map-content-china-2 on China Environment
      roadTopologySource { hrn = "hrn:here:data::olp-here:rib-2" }
      roadTopologyInput { hrn = "{{YOUR_CATALOG_HRN}}" }
  }
}

In this tutorial, a public catalog is used - HERE Map Content Catalog. The catalog should be linked to your project first to be used within the project. To do this, replace {{YOUR_PROJECT_HRN}} with the HRN of the project you created in the Organize your work in projects tutorial and execute the following command:


olp project resource link {{YOUR_PROJECT_HRN}} hrn:here-cn:data::olp-cn-here:here-map-content-china-2

If your command is successful, the CLI returns the following message:


Project resource hrn:here-cn:data::olp-cn-here:here-map-content-china-2 has been linked.

The POM for this example is identical to that in the Path Matching in Spark Tutorial.

Parent POM:

<parent>
    <groupId>com.here.platform</groupId>
    <artifactId>sdk-batch-bom_2.12</artifactId>
    <version>2.54.3</version>
    <relativePath/>
</parent>

Dependencies:

<dependencies>
    <dependency>
        <groupId>com.here.platform.pipeline</groupId>
        <artifactId>pipeline-interface_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.platform.data.client</groupId>
        <artifactId>spark-support_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.hrn</groupId>
        <artifactId>hrn_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
    </dependency>
</dependencies>

You must create both the input and output catalogs. You can perform these steps by following the steps outlined in the Organize your work in projects, using the OLP Command Line Interface (CLI).

You should use a unique identifier name for the catalog, for example {{YOUR_USERNAME}}-spark-connector-input.

Create a file called spark-connector-input.json with the contents below, replacing {{YOUR_INPUT_CATALOG_ID}} with the chosen identifier.

Note

All timestamps are in UTC milliseconds since epoch (Jan 1, 1970 00:00:00 AM UTC). If you run your application in another timezone please make sure that the timestamp is converted into UTC before you query or upload data. In Java or Scala you can do the conversion by using this function call: Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis()

{
  "id": "spark-connector-input",
  "name": "Simulated road topology data archive (From tutorial) spark-connector-input",
  "summary": "Archive of simulated road topology data",
  "description": "Archive of simulated road topology data.",
  "tags": ["Tutorial", "Simulated"],
  "layers": [
    {
      "id": "index-layer-avro-data",
      "name": "index-layer-avro-data",
      "summary": "Simulated data.",
      "description": "Simulated road topology data.\n\nThis data is meant to be used for demonstration purposes and \"playing\" with data.",
      "tags": ["Tutorial", "Simulated"],
      "contentType": "application/x-avro-binary",
      "layerType": "index",
      "indexProperties": {
        "indexDefinitions": [
          {
            "name": "ingestion_timestamp",
            "duration": 600000,
            "type": "timewindow"
          },
          {
            "name": "tile_id",
            "type": "heretile",
            "zoomLevel": 12
          }
        ],
        "ttl": "unlimited"
      },
      "volume": {
        "volumeType": "durable"
      },
      "partitioning": {
        "scheme": "generic"
      }
    },
    {
      "id": "versioned-layer-protobuf-data",
      "name": "versioned-layer-protobuf-data",
      "summary": "Simulated data.",
      "description": "Simulated road topology data for versioned-layer-protobuf-data",
      "contentType": "application/x-protobuf",
      "layerType": "versioned",
      "volume": {
        "volumeType": "durable"
      },
      "partitioning": {
        "scheme": "generic"
      },
      "schema": {
        "hrn": "hrn:here-cn:schema::olp-cn-here:com.here.schema.rib:topology-geometry_v2:2.32.0"
      }
    },
    {
      "id": "versioned-layer-custom-data",
      "name": "versioned-layer-custom-data",
      "summary": "Simulated data.",
      "description": "Simulated road topology data for versioned-layer-custom-data",
      "contentType": "application/octet-stream",
      "layerType": "versioned",
      "volume": {
        "volumeType": "durable"
      },
      "partitioning": {
        "scheme": "generic"
      }
    }
  ]
}

For the output catalog, you can name the file spark-connector-ouput.json as shown in the following:

{
  "id": "spark-connector-output",
  "name": "Simulated data archive (From tutorial) test-spark-connector-output",
  "summary": "Archive of simulated road topology data",
  "description": "Archive of simulated road topology data.",
  "tags": ["Tutorial", "Simulated"],
  "layers": [
    {
      "id": "volatile-layer-parquet-data",
      "name": "volatile-layer-parquet-data",
      "summary": "Simulated data.",
      "description": "Simulated road topology data.",
      "contentType": "application/x-parquet",
      "layerType": "volatile",
      "volume": {
        "volumeType": "durable"
      },
      "partitioning": {
        "scheme": "generic"
      }
    },
    {
      "id": "versioned-layer-custom-data",
      "name": "versioned-layer-custom-data",
      "summary": "Simulated data.",
      "description": "Simulated road topology data for versioned-layer-custom-data",
      "contentType": "application/octet-stream",
      "layerType": "versioned",
      "volume": {
        "volumeType": "durable"
      },
      "partitioning": {
        "scheme": "generic"
      }
    }
  ]
}

Replace {{YOUR_CATALOG_ID}} with your own identifier, as shown in the following code snippet. Also, replace {{YOUR_PROJECT_HRN}} with the HRN of your project from Organize your work in projects and then run the following command:

olp catalog create {{YOUR_CATALOG_ID}} \
    "Simulated road topology data input from tutorial ({{YOUR_USERNAME}})" \
    --config spark-connector-input.json \
    --scope {{YOUR_PROJECT_HRN}}

Note

If a billing tag is required in your realm, update the config file by adding the billingTags: ["YOUR_BILLING_TAG"] property to the layer section.

Set up the data sources

This step involves implementing an application for setting up the data sources that are used in the main part of this tutorial. The Spark Connector is used here to read data from a source containing data on road topology and geometry, and writing it to a catalog which is used as input in the next stage.

For this purpose, you must read the data from a dataset containing road topology and geometry data, and save it to 3 data sources, each one corresponding to one of the layer types and data formats defined above when creating the input catalogs.

Scala
Java

/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Row, SparkSession}
import com.here.platform.data.client.spark.scaladsl.{
  GroupedData,
  VersionedDataConverter,
  VersionedRowMetadata
}
import org.slf4j.LoggerFactory

object DataSourcesSetupScala extends App {

  private val logger = LoggerFactory.getLogger(DataSourcesSetupScala.getClass)

  private val pipelineContext = new PipelineContext

  // Source for the archived road topology data
  private val roadTopologyDataSourceCatalogHrn =
    pipelineContext.config.inputCatalogs("roadTopologySource")
  private val roadTopologyDataSourceLayerId = "topology-geometry"

  // Data sources that will be the input for the SparkConnector application, and will be written to in the setup step
  private val inputRoadTopologyCatalogHrn =
    pipelineContext.config.inputCatalogs("roadTopologyInput")
  private val inputIndexAvroLayerId = "index-layer-avro-data"
  private val inputVersionedProtobufLayerId = "versioned-layer-protobuf-data"
  private val inputversionedLayerCustomId = "versioned-layer-custom-data"

  val sparkSession: SparkSession =
    SparkSession.builder().appName("DataSourcesSetupScalaPipeline").getOrCreate()

  import sparkSession.implicits._

  // Partitions that will be retrieved from the archived weather dataset, corresponding to the cities of Berlin and Munich
  // For China, please use partitions 23543287 and 23551652
  private val berlinPartition = "23618361"
  private val munichPartition = "23611420"

  // Reads the data from the roadTopologySource catalog
  val sourceRoadTopologyData = sparkSession
    .readLayer(roadTopologyDataSourceCatalogHrn, roadTopologyDataSourceLayerId)
    .query(s"mt_partition=in=($berlinPartition, $munichPartition)")
    .load()

  // Performs some transformations on the dataset
  val transformedData = sourceRoadTopologyData
    .select(col("partition_name"), explode(col("segment")) as "road")
    .select(col("partition_name"),
            col("road.identifier") as "identifier",
            explode(col("road.geometry.point")) as "points",
            col("road.length") as "length")
    .select("partition_name", "identifier", "points.*", "length")

  // For the index layer, all index attributes (in this case tile_id and ingestion_timestamp) have to be present in the
  // result as DF row columns, and prefixed with "idx_"
  val indexLayerData = transformedData
    .withColumn("idx_tile_id", col("partition_name"))
    .withColumn("idx_ingestion_timestamp", unix_timestamp())

  /*
      The resulting schema to be written is the following:

      root
       |-- partition_name: string (nullable = true)
       |-- identifier: string (nullable = true)
       |-- latitude: double (nullable = true)
       |-- longitude: double (nullable = true)
       |-- z_level: integer (nullable = true)
       |-- elevation: double (nullable = true)
       |-- length: double (nullable = true)
       |-- idx_tile_id: string (nullable = true)
       |-- idx_ingestion_timestamp: long (nullable = true)
   */

  indexLayerData
    .writeLayer(inputRoadTopologyCatalogHrn, inputIndexAvroLayerId)
    .save()

  // For the versioned layer, both catalog version and partitionId attributes have to be present in the result as DF
  // row columns, and there needs to be a metadata column (mt_partition) containing the partition name
  // The data saved in protobuf format needs to have its schema specified in the respective layer definition.
  val versionedLayerData = sourceRoadTopologyData
    .withColumn("mt_partition", col("partition_name"))

  /*
    The resulting schema to be written is the following:

      root
       |-- partition_name: string (nullable = true)
       |-- node: array (nullable = true)
       |    |-- element: struct (containsNull = true)
       |    |    |-- identifier: string (nullable = true)
       |    |    |-- segment_ref: array (nullable = true)
       |    |    |    |-- element: struct (containsNull = true)
       |    |    |    |    |-- partition_name: string (nullable = true)
       |    |    |    |    |-- identifier: string (nullable = true)
       |    |    |-- geometry: struct (nullable = true)
       |    |    |    |-- latitude: double (nullable = true)
       |    |    |    |-- longitude: double (nullable = true)
       |    |    |    |-- z_level: integer (nullable = true)
       |    |    |    |-- elevation: double (nullable = true)
       |-- segment: array (nullable = true)
       |    |-- element: struct (containsNull = true)
       |    |    |-- identifier: string (nullable = true)
       |    |    |-- start_node_ref: struct (nullable = true)
       |    |    |    |-- partition_name: string (nullable = true)
       |    |    |    |-- identifier: string (nullable = true)
       |    |    |-- end_node_ref: struct (nullable = true)
       |    |    |    |-- partition_name: string (nullable = true)
       |    |    |    |-- identifier: string (nullable = true)
       |    |    |-- geometry: struct (nullable = true)
       |    |    |    |-- point: array (nullable = true)
       |    |    |    |    |-- element: struct (containsNull = true)
       |    |    |    |    |    |-- latitude: double (nullable = true)
       |    |    |    |    |    |-- longitude: double (nullable = true)
       |    |    |    |    |    |-- z_level: integer (nullable = true)
       |    |    |    |    |    |-- elevation: double (nullable = true)
       |    |    |-- length: double (nullable = true)
       |-- mt_partition: string (nullable = true)
   */

  versionedLayerData
    .writeLayer(inputRoadTopologyCatalogHrn, inputVersionedProtobufLayerId)
    .save()

  // For the custom layer, there needs to be a metadata column (mt_partition) containing the partition name, and another
  // column (data) containing the serialized data
  val customRawLayerId = transformedData
    .select(col("length"))
    .map(v => s"${lit("road-partition-")}:${lit(v.mkString).toString}".getBytes)
    .select(col("value") as "data")
    // A new column is added containing one partition id for each line in the input data, in the format "partition-{{number}}"
    .withColumn("mt_partition", concat(lit("partition-"), monotonically_increasing_id))

  /*
    The resulting schema to be written is the following:

    root
       |-- data: binary (nullable = true)
       |-- mt_partition: string (nullable = false)
   */

  customRawLayerId
    .writeLayer(inputRoadTopologyCatalogHrn, inputversionedLayerCustomId)
    .withDataConverter(new VersionedDataConverter {
      override def serializeGroup(rowMetadata: VersionedRowMetadata,
                                  rows: Iterator[Row]): GroupedData[VersionedRowMetadata] = {
        val bytes = rows.next().getAs[Array[Byte]]("data")
        GroupedData(rowMetadata, bytes)
      }
    })
    .save()

  logger.info(s"Finished setting up data sources in catalog: $inputRoadTopologyCatalogHrn")

  sparkSession.stop()
}



/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import com.here.platform.data.client.spark.javadsl.VersionedDataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.VersionedRowMetadata;
import com.here.platform.pipeline.PipelineContext;
import java.util.Iterator;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataSourcesSetup {

  private static PipelineContext pipelineContext = new PipelineContext();

  // Source for the archived road topology data
  private static HRN roadTopologyDataSourceCatalogHrn =
      pipelineContext.getConfig().getInputCatalogs().get("roadTopologySource");
  private static String roadTopologyDataSourceLayerId = "topology-geometry";

  // Data sources that will be the input for the SparkConnector application, and will be written to
  // in the setup step
  private static HRN inputRoadTopologyCatalogHrn =
      pipelineContext.getConfig().getInputCatalogs().get("roadTopologyInput");
  private static String inputIndexAvroLayerId = "index-layer-avro-data";
  private static String inputVersionedProtobufLayerId = "versioned-layer-protobuf-data";
  private static String inputversionedLayerCustomId = "versioned-layer-custom-data";

  private static SparkSession sparkSession =
      SparkSession.builder().appName("DataSourcesSetupPipeline").getOrCreate();

  public static void main(String[] args) {

    Logger logger = LoggerFactory.getLogger(DataSourcesSetup.class);

    // Partitions that will be retrieved from the archived weather dataset, corresponding to the
    // cities of Berlin and Munich
    // For China, please use partitions 23543287 and 23551652
    String berlinPartition = "23618361";
    String munichPartition = "23611420";

    // Reads the data from the roadTopologySource catalog
    Dataset<Row> sourceRoadTopologyData =
        JavaLayerDataFrameReader.create(sparkSession)
            .readLayer(roadTopologyDataSourceCatalogHrn, roadTopologyDataSourceLayerId)
            .query(String.format("mt_partition=in=(%s, %s)", berlinPartition, munichPartition))
            .load();

    // Performs some transformations on the dataset
    Dataset<Row> transformedData =
        sourceRoadTopologyData
            .select(col("partition_name"), explode(col("segment")).as("road"))
            .select(
                col("partition_name"),
                col("road.identifier").as("identifier"),
                explode(col("road.geometry.point")),
                col("road.length").as("length"))
            .select("partition_name", "identifier", "col.*", "length");

    // For the index layer, all index attributes (in this case tile_id and ingestion_timestamp) have
    // to be present in the result as DF row columns, and prefixed with "idx_"
    Dataset<Row> indexLayerData =
        transformedData
            .withColumn("idx_tile_id", col("partition_name"))
            .withColumn("idx_ingestion_timestamp", unix_timestamp());

    JavaLayerDataFrameWriter.create(indexLayerData)
        .writeLayer(inputRoadTopologyCatalogHrn, inputIndexAvroLayerId)
        .save();

    /*
       The resulting schema to be written is the following:

       root
        |-- partition_name: string (nullable = true)
        |-- identifier: string (nullable = true)
        |-- latitude: double (nullable = true)
        |-- longitude: double (nullable = true)
        |-- z_level: integer (nullable = true)
        |-- elevation: double (nullable = true)
        |-- length: double (nullable = true)
        |-- idx_tile_id: string (nullable = true)
        |-- idx_ingestion_timestamp: long (nullable = true)
    */

    // For the versioned layer, both catalog version and partitionId attributes have to be present
    // in the result as DF row columns, and there needs to be a metadata column (mt_partition)
    // containing the partition name.
    // The data saved in protobuf format needs to have its schema specified in the respective layer
    // definition.
    Dataset<Row> versionedLayerData =
        sourceRoadTopologyData.withColumn("mt_partition", col("partition_name"));

    JavaLayerDataFrameWriter.create(versionedLayerData)
        .writeLayer(inputRoadTopologyCatalogHrn, inputVersionedProtobufLayerId)
        .save();

    /*
     The resulting schema to be written is the following:

       root
        |-- partition_name: string (nullable = true)
        |-- node: array (nullable = true)
        |    |-- element: struct (containsNull = true)
        |    |    |-- identifier: string (nullable = true)
        |    |    |-- segment_ref: array (nullable = true)
        |    |    |    |-- element: struct (containsNull = true)
        |    |    |    |    |-- partition_name: string (nullable = true)
        |    |    |    |    |-- identifier: string (nullable = true)
        |    |    |-- geometry: struct (nullable = true)
        |    |    |    |-- latitude: double (nullable = true)
        |    |    |    |-- longitude: double (nullable = true)
        |    |    |    |-- z_level: integer (nullable = true)
        |    |    |    |-- elevation: double (nullable = true)
        |-- segment: array (nullable = true)
        |    |-- element: struct (containsNull = true)
        |    |    |-- identifier: string (nullable = true)
        |    |    |-- start_node_ref: struct (nullable = true)
        |    |    |    |-- partition_name: string (nullable = true)
        |    |    |    |-- identifier: string (nullable = true)
        |    |    |-- end_node_ref: struct (nullable = true)
        |    |    |    |-- partition_name: string (nullable = true)
        |    |    |    |-- identifier: string (nullable = true)
        |    |    |-- geometry: struct (nullable = true)
        |    |    |    |-- point: array (nullable = true)
        |    |    |    |    |-- element: struct (containsNull = true)
        |    |    |    |    |    |-- latitude: double (nullable = true)
        |    |    |    |    |    |-- longitude: double (nullable = true)
        |    |    |    |    |    |-- z_level: integer (nullable = true)
        |    |    |    |    |    |-- elevation: double (nullable = true)
        |    |    |-- length: double (nullable = true)
        |-- mt_partition: string (nullable = true)
    */

    // For the custom layer, there needs to be a metadata column (mt_partition) containing the
    // partition name, and another column (data) containing the serialized data
    Dataset<Row> customRawLayerId =
        transformedData
            .select(col("length"))
            .map((MapFunction<Row, byte[]>) row -> row.toString().getBytes(), Encoders.BINARY())
            .select(col("value").as("data"))
            // A new column is added containing one partition id for each line in the input data, in
            // the format "partition-{{number}}"
            .withColumn("mt_partition", concat(lit("partition-"), monotonically_increasing_id()));

    JavaLayerDataFrameWriter.create(customRawLayerId)
        .writeLayer(inputRoadTopologyCatalogHrn, inputversionedLayerCustomId)
        .withDataConverter(
            new VersionedDataConverter() {
              @Override
              public GroupedData<VersionedRowMetadata> serializeGroup(
                  VersionedRowMetadata rowMetadata, Iterator<Row> rows) {
                byte[] bytes = rows.next().getAs("data");
                return new GroupedData<>(rowMetadata, bytes);
              }
            })
        .save();

    /*
     The resulting schema to be written is the following:

     root
        |-- data: binary (nullable = true)
        |-- mt_partition: string (nullable = false)
    */

    logger.info(
        String.format(
            "Finished setting up data sources in catalog: %s", inputRoadTopologyCatalogHrn));

    sparkSession.stop();
  }
}

Implement the Spark connector application

This application uses the data sources created in the previous stage to read from different layers and data formats. This performs some transformations on the resulting data, and writes to the output layers from the previously created catalog.

Scala
Java

/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import com.here.platform.data.client.spark.scaladsl.{
  GroupedData,
  VersionedDataConverter,
  VersionedRowMetadata
}
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, Row, SparkSession}
import org.slf4j.LoggerFactory

object SparkConnectorScala extends App {

  private val logger = LoggerFactory.getLogger(SparkConnectorScala.getClass)

  private val pipelineContext = new PipelineContext

  // Defines the input / output catalogs to be read / written to
  private val inputRoadTopologyCatalogHrn =
    pipelineContext.config.inputCatalogs("roadTopologyInput")
  private val outputRoadTopologyCatalogHrn = pipelineContext.config.outputCatalog

  // Defines the layers used to read / write data
  private val inputIndexAvroLayerId = "index-layer-avro-data"
  private val inputVersionedProtobufLayerId = "versioned-layer-protobuf-data"
  private val versionedLayerCustomId = "versioned-layer-custom-data"
  private val outputVolatileParquetLayerId = "volatile-layer-parquet-data"

  val sparkSession: SparkSession =
    SparkSession.builder().appName("SparkConnectorScalaPipeline").getOrCreate()

  // This is needed for implicit conversions
  import sparkSession.implicits._

  // Values for partitions and tile id to be used in queries
  // For China, please use partitions 23543287 and 23551652
  private val berlinPartition = "23618361"
  private val munichPartition = "23611420"

  // Task 1: Reading data from index layer in the avro format, and printing schema and data
  // This step uses LayerDataFrameReader to read from the index layer. In this example, two index attributes
  // (tile_id and weather_timestamp) are present, and their respective rows are prefixed with "idx_"
  val avroData: DataFrame = sparkSession
    .readLayer(inputRoadTopologyCatalogHrn, inputIndexAvroLayerId)
    // A query needs to be specified, either selecting at least one tile_id, or an weather_timestamp.
    // In order to retrieve the full dataset, you can use "ingestion_timestamp > 0"
    .query(s"tile_id==$munichPartition")
    .load()

  // Displaying the schema and the content of the DataFrame
  avroData.printSchema()
  avroData.show()

  // Collecting the results into Array[Row]
  avroData.collect()

  /*
      The resulting schema is the following:

        root
         |-- partition_name: string (nullable = true)
         |-- identifier: string (nullable = true)
         |-- latitude: double (nullable = true)
         |-- longitude: double (nullable = true)
         |-- z_level: integer (nullable = true)
         |-- elevation: double (nullable = true)
         |-- length: double (nullable = true)
         |-- mt_dataHandle: string (nullable = true)
         |-- mt_metadata: map (nullable = true)
         |    |-- key: string
         |    |-- value: string (valueContainsNull = true)
         |-- mt_timestamp: long (nullable = true)
         |-- mt_checksum: string (nullable = true)
         |-- mt_crc: string (nullable = true)
         |-- mt_dataSize: long (nullable = true)
         |-- idx_tile_id: long (nullable = true)
         |-- idx_ingestion_timestamp: long (nullable = true)
   */

  // Task 2: Read data from versioned/protobuf layer, filter it, transform it (select, filter, groupBy, agg, etc.)
  // and write it to volatile/parquet layer.
  // In order to retrieve data from the versioned layer, we need to specify the partition(s) in the query (mt_partition).
  // Since this is protobuf data, the layer definition needs to include a field with the used schema
  val protobufData: DataFrame = sparkSession
    .readLayer(inputRoadTopologyCatalogHrn, inputVersionedProtobufLayerId)
    .query(s"mt_partition=in=($berlinPartition, $munichPartition)")
    .load()

  // We can create a case class that will be used to encode the data in a type-safe way
  case class RoadSegmentTopology(identifier: String,
                                 partition_name: String,
                                 points: Array[SegmentPoint],
                                 length: Double)

  case class SegmentPoint(latitude: String, longitude: String, z_level: Integer, elevation: Double)

  // In this step, we read the data as RoadTopology data
  val transformedData = protobufData
    .select(col("partition_name"), explode(col("segment")) as "road")
    .select(col("road.identifier") as "identifier",
            col("partition_name"),
            col("road.geometry.point") as "points",
            col("road.length") as "length")
    .as[RoadSegmentTopology]

  // We will calculate the average segment length per partition, as well as standard deviation
  // and total number of segments
  val statsByPartition: DataFrame = transformedData
    .groupBy("partition_name")
    .agg(round(avg("length")) as "average_length",
         round(stddev("length")) as "std_length",
         count("points") as "number_of_points")
    // In order to write to a volatile layer, we need a the column mt_partition that specifies the partition
    .withColumn("mt_partition", col("partition_name"))

  /*
      The resulting schema is the following:

      root
       |-- partition_name: string (nullable = true)
       |-- average_length: double (nullable = true)
       |-- std_length: double (nullable = true)
       |-- number_of_points: long (nullable = false)
       |-- mt_partition: string (nullable = true)
   */

  statsByPartition
    .writeLayer(outputRoadTopologyCatalogHrn, outputVolatileParquetLayerId)
    .save()

  // Task 3: Read data from versioned/custom layer (with custom format), filter augment it and write it to
  // another versioned/custom layer.
  val customReadData: Dataset[String] = sparkSession
    .readLayer(inputRoadTopologyCatalogHrn, versionedLayerCustomId)
    .query(s"mt_partition=in=(partition-1, partition-2, partition-3)")
    .load()
    .map { row =>
      row.getAs[Array[Byte]]("data").map(_.toChar).mkString
    }(Encoders.STRING)

  val customData: DataFrame = customReadData
    .map(_.getBytes)
    .toDF
    .withColumn("data", col("value"))
    .select(col("value") as "data")
    // A new column is added containing one partition id for each line in the input data, in the format "partition-{{number}}"
    .withColumn("mt_partition", concat(lit("id-"), monotonically_increasing_id))

  /*
      The resulting schema is the following:

      root
          |-- data: binary (nullable = true)
          |-- mt_partition: string (nullable = false)
   */

  customData
    .writeLayer(outputRoadTopologyCatalogHrn, versionedLayerCustomId)
    // A DataConverter needs to be provided for the write operation to succeed
    .withDataConverter(new VersionedDataConverter {
      override def serializeGroup(rowMetadata: VersionedRowMetadata,
                                  rows: Iterator[Row]): GroupedData[VersionedRowMetadata] = {
        val bytes = rows.next().getAs[Array[Byte]]("data")
        GroupedData(rowMetadata, bytes)
      }
    })
    .save()

  logger.info(s"Finished setting up data sources in catalog: $outputRoadTopologyCatalogHrn")

  sparkSession.stop()

}



/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import com.here.platform.data.client.spark.javadsl.VersionedDataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.VersionedRowMetadata;
import com.here.platform.pipeline.PipelineContext;
import java.io.Serializable;
import java.util.Iterator;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkConnector {

  private static final PipelineContext pipelineContext = new PipelineContext();

  // Defines the input / output catalogs to be read / written to
  private static final HRN inputRoadTopologyCatalogHrn =
      pipelineContext.getConfig().getInputCatalogs().get("roadTopologyInput");
  private static final HRN outputRoadTopologyCatalogHrn =
      pipelineContext.getConfig().getOutputCatalog();

  // Defines the layers used to read / write data
  private static final String INPUT_INDEX_AVRO_LAYER_ID = "index-layer-avro-data";
  private static final String INPUT_VERSIONED_PROTOBUF_LAYER_ID = "versioned-layer-protobuf-data";
  private static final String VERSIONED_LAYER_CUSTOM_ID = "versioned-layer-custom-data";
  private static final String OUTPUT_VOLATILE_PARQUET_LAYER_ID = "volatile-layer-parquet-data";

  public static void main(String[] args) {
    SparkSession sparkSession =
        SparkSession.builder().appName("SparkConnectorPipeline").getOrCreate();

    Logger logger = LoggerFactory.getLogger(SparkConnector.class);

    // Values for partitions and tile id to be used in queries
    // For China, please use partitions 23543287 and 23551652
    String berlinPartition = "23618361";
    String munichPartition = "23611420";

    // Task 1: Reading data from index layer in the avro format, and printing schema and data
    // This step uses LayerDataFrameReader to read from the index layer. In this example, two index
    // attributes
    // (tile_id and weather_timestamp) are present, and their respective rows are prefixed with
    // "idx_"
    Dataset<Row> avroData =
        JavaLayerDataFrameReader.create(sparkSession)
            .readLayer(inputRoadTopologyCatalogHrn, INPUT_INDEX_AVRO_LAYER_ID)
            // A query needs to be specified, either selecting at least one tile_id, or an
            // weather_timestamp.
            // In order to retrieve the full dataset, you can use "ingestion_timestamp > 0"
            .query(String.format("tile_id==%s", munichPartition))
            .load();

    // Displaying the schema and the content of the DataFrame
    avroData.printSchema();
    avroData.show();

    // Collecting the results into Array[Row]
    avroData.collect();

    /*
      The resulting schema is the following:

        root
         |-- partition_name: string (nullable = true)
         |-- identifier: string (nullable = true)
         |-- latitude: double (nullable = true)
         |-- longitude: double (nullable = true)
         |-- z_level: integer (nullable = true)
         |-- elevation: double (nullable = true)
         |-- length: double (nullable = true)
         |-- mt_dataHandle: string (nullable = true)
         |-- mt_metadata: map (nullable = true)
         |    |-- key: string
         |    |-- value: string (valueContainsNull = true)
         |-- mt_timestamp: long (nullable = true)
         |-- mt_checksum: string (nullable = true)
         |-- mt_crc: string (nullable = true)
         |-- mt_dataSize: long (nullable = true)
         |-- idx_tile_id: long (nullable = true)
         |-- idx_ingestion_timestamp: long (nullable = true)
    */

    // Task 2: Read data from versioned/protobuf layer, filter it, transform it (select, filter,
    // groupBy, agg, etc.)
    // and write it to volatile/parquet layer.
    // In order to retrieve data from the versioned layer, we need to specify the partition(s) in
    // the query (mt_partition).
    // Since this is protobuf data, the layer definition needs to include a field with the used
    // schema
    Dataset<Row> protobufData =
        JavaLayerDataFrameReader.create(sparkSession)
            .readLayer(inputRoadTopologyCatalogHrn, INPUT_VERSIONED_PROTOBUF_LAYER_ID)
            .query(String.format("mt_partition=in=(%s, %s)", berlinPartition, munichPartition))
            .load();

    // We can create a class that will be used to encode the data in a type-safe way
    Encoder<RoadSegmentTopology> roadSegmentTopologyEncoder =
        Encoders.bean(RoadSegmentTopology.class);

    // In this step, we read the data as RoadTopology data
    Dataset<RoadSegmentTopology> transformedData =
        protobufData
            .select(col("partition_name"), explode(col("segment")).as("road"))
            .select(
                col("road.identifier").as("identifier"),
                col("partition_name"),
                col("road.geometry.point").as("points"),
                col("road.length").as("length"))
            .as(roadSegmentTopologyEncoder);

    // We will calculate the average segment length per partition, as well as standard deviation
    // and total number of segments
    Dataset<Row> statsByPartition =
        transformedData
            .groupBy("partition_name")
            .agg(
                round(avg("length")).as("average_length"),
                round(stddev("length")).as("std_length"),
                count("points").as("number_of_points"))
            // In order to write to a volatile layer, we need a the column mt_partition that
            // specifies the partition
            .withColumn("mt_partition", col("partition_name"));

    /*
       The resulting schema is the following:

       root
        |-- partition_name: string (nullable = true)
        |-- average_length: double (nullable = true)
        |-- std_length: double (nullable = true)
        |-- number_of_points: long (nullable = false)
        |-- mt_partition: string (nullable = true)
    */

    JavaLayerDataFrameWriter.create(statsByPartition)
        .writeLayer(outputRoadTopologyCatalogHrn, OUTPUT_VOLATILE_PARQUET_LAYER_ID)
        .save();

    // Task 3: Read data from versioned/custom layer (with custom format), filter augment it and
    // write it to
    // another versioned/custom layer.
    Dataset<Row> customData =
        JavaLayerDataFrameReader.create(sparkSession)
            .readLayer(inputRoadTopologyCatalogHrn, VERSIONED_LAYER_CUSTOM_ID)
            .query("mt_partition=in=(partition-1, partition-2, partition-3)")
            .load();

    // A new column is added containing one partition id for each line in the input data, in the
    // format "partition-{{number}}"
    Dataset<Row> partitionedData =
        customData.withColumn(
            "mt_partition", concat(lit("partition-"), monotonically_increasing_id()));

    /*
    The resulting schema is the following:

    root
        |-- data: binary (nullable = true)
        |-- mt_partition: string (nullable = false)
    */

    JavaLayerDataFrameWriter.create(partitionedData)
        .writeLayer(outputRoadTopologyCatalogHrn, VERSIONED_LAYER_CUSTOM_ID)
        // A DataConverter needs to be provided for the write operation to succeed
        .withDataConverter(
            new VersionedDataConverter() {
              @Override
              public GroupedData<VersionedRowMetadata> serializeGroup(
                  VersionedRowMetadata rowMetadata, Iterator<Row> rows) {
                byte[] bytes = rows.next().getAs("data");
                return new GroupedData<>(rowMetadata, bytes);
              }
            })
        .save();

    logger.info(
        String.format(
            "Finished setting up data sources in catalog: %s", outputRoadTopologyCatalogHrn));

    sparkSession.stop();
  }

  public static class RoadSegmentTopology implements Serializable {
    private String identifier;
    private String partition_name;
    private SegmentPoint[] points;
    private Double length;
  }

  public static class SegmentPoint implements Serializable {
    private String partition_name;
    private String longitude;
    private Integer z_level;
    private Double elevation;
  }
}

Compile and run locally

To run the application locally, execute the following command:

Scala
Java

mvn compile exec:java \
    -Dexec.mainClass=DataSourcesSetupScala \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf \
    -Dexec.cleanupDaemonThreads=false \
    -Dspark.master="local[*]" \
    -Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
    

mvn compile exec:java \
    -Dexec.mainClass=DataSourcesSetup \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf \
    -Dexec.cleanupDaemonThreads=false \
    -Dspark.master="local[*]" \
    -Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}

For the main part of the tutorial, and after having the data sources in place, you must run:

Scala
Java

mvn compile exec:java \
    -Dexec.mainClass=SparkConnectorScala \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf \
    -Dexec.cleanupDaemonThreads=false \
    -Dspark.master="local[*]" \
    -Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
    

mvn compile exec:java \
    -Dexec.mainClass=SparkConnector \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf \
    -Dexec.cleanupDaemonThreads=false \
    -Dspark.master="local[*]" \
    -Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}

Further information

For additional details on the topics covered in this tutorial, you can refer to the following sources:

results matching ""

    No results matching ""