Use Spark Connector to migrate CSV data to Protobuf

Objectives: Understand how to use the Spark Connector to upload your data from CSV files within a directory to a Protobuf layer.

Complexity: Beginner

Time to complete: 30 min

Prerequisites: Use Spark connector to read and write data

Source code: Download

The example in this tutorial demonstrates how to use the Spark Connector provided by the Data Client Library to upload CSV data stored in a directory into a layer which uses the Protobuf format. One of the possible scenarios where you might like to do this is when you have a data processing application for which you want to achieve better bandwidth consumption.

In the main part of the tutorial, we will cover the following usages:

  • Reading the CSV data from the directory into one data frame;
  • Restructuring the dataframe to be compatible with the layer schema;
  • Uploading the resulting dataframe to the layer.

As a preparation step, you will need to create CSVs files in the resources/csv folder and create the output layer corresponding to your CSV schema. Let's assume the CSV data, located in the resources/csv folder, is in the following format:

tileId, tag, latitute, longitude
123456789,806553341,33.5230503,-113.0826719
123456789,806553341,33.5231036,-113.08299199999999
123456789,806553341,33.523157,-113.0833119
123456789,806553341,33.5232106,-113.0836317
123456789,806553341,33.5232646,-113.0839515
123456789,806553341,33.5233187,-113.08427119999999
123456789,806553341,33.5233725,-113.08459099999999
....

Data Client's Spark connector does not allow to use a partition ID multiple times in dataframe rows. This means that in order to upload it to a Protobuf layer you need to group the data by partition ID. So instead of having this schema:

(partition, tag, lat, lon)

it needs to be restructured to become:

(partition, Array[(tag, lat, lon)])

Create corresponding layer schema

First generate the schema project, then deploy it with the following proto file:

syntax = "proto3";

package com.here.platform.data.csv.v1;

// Declare any dependent resources the main POM file and add the import statements here:
//import "com/company/dependentGroupId/filename.proto";

// MainProtobufMessage is a placeholder, this value must match the package/messagename in the mainMessage tag of the layer-manifest-plugin in the schema_ds module.
message MainProtobufMessage {
    repeated TaggedPoint shape_points = 1;
}

message TaggedPoint {
    string tag = 1;
    double lat = 2;
    double lon = 3;
}

For information about creating and deploying a schema, refer to creating and deploying a schema.

Set up the Maven project

Create the following folder structure for the project:

csv-to-protobuf
└── src
    └── main
        ├── java
        └── resources
        └── scala

You can do this with a single bash command:

mkdir -p csv-to-protobuf/src/main/{java,resources,scala}

Create the output catalog

Create a file named pipeline-config.conf, and populate it with the following contents, replacing {{YOUR_OUTPUT_CATALOG_HRN}} with the HRN of the output catalog containing the Protobuf layer.

pipeline.config {

  output-catalog { hrn = "{{YOUR_OUTPUT_CATALOG_HRN}}" }

  input-catalogs { }
}

The POM for this example is identical to that in the Path Matching in Spark Tutorial.

Parent POM:

<parent>
    <groupId>com.here.platform</groupId>
    <artifactId>sdk-batch-bom_2.12</artifactId>
    <version>2.54.3</version>
    <relativePath/>
</parent>

Dependencies:

<dependencies>
    <dependency>
        <groupId>com.here.platform.pipeline</groupId>
        <artifactId>pipeline-interface_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.platform.data.client</groupId>
        <artifactId>spark-support_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.hrn</groupId>
        <artifactId>hrn_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
    </dependency>
</dependencies>

You can perform these steps by following the tutorial Organize your work in projects, using the OLP Command Line Interface (CLI).

You should use a unique identifier name for the catalog, for example {{YOUR_USERNAME}}-csv-to-protobuf-output.

Create a file called csv-to-protobuf-output.json with the contents below, replacing {{DEPLOYED_SCHEMA_HRN}} with the deployed schema HRN.

You must create the output catalog containing the previously deployed schema with the layer format configured as Protobuf.

Note

All timestamps are in UTC milliseconds since epoch (Jan 1, 1970 00:00:00 AM UTC). If you run your application in another timezone, verify that the timestamp is converted into UTC before you query or upload data. In Java or Scala you can do the conversion by using this function call: Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis()

{
  "id": "csv-to-protobuf-output",
  "name": "Uploaded CSV data (From tutorial)",
  "summary": "Archive of simulated road topology data",
  "description": "Archive of simulated road topology data.",
  "tags": ["Tutorial", "Converted"],
  "layers": [
    {
      "id": "versioned-layer-protobuf-data",
      "name": "versioned-layer-protobuf-data",
      "summary": "CSV converted data to protobuf.",
      "description": "CSV converted data to protobuf.",
      "contentType": "application/x-protobuf",
      "layerType": "versioned",
      "volume": {
        "volumeType": "durable"
      },
      "partitioning": {
        "scheme": "generic"
      },
      "schema": {
        "hrn": "{{CN_DEPLOYED_SCHEMA_HRN}}"
      }
    }
  ]
}

Implement the CSV upload application

This application uses the data sources created in the previous stage to read CSV files from the directory, performing some transformations on the resulting data to adapt it to the Protobuf layer schema, and writing to the Protobuf layer in the previously created catalog.

Scala
Java

/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import org.slf4j.LoggerFactory
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.types.StructField

import org.apache.spark.sql.functions._
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt

object SparkSqlUploadCsvScala extends App {
  private val logger = LoggerFactory.getLogger(SparkSqlUploadCsvScala.getClass)

  private val pipelineContext = new PipelineContext()
  private val outputCatalog = pipelineContext.config.outputCatalog
  private val outputLayerId = "versioned-layer-protobuf-data"

  val spark = SparkSession
    .builder()
    .appName("spark-sql-csv-upload-to-protobuf")
    .getOrCreate()

  val csvPath = getClass.getResource("/csv").getPath
  val csvFiles = csvPath + "/*.csv"
  val csvSchema = new StructType()
    .add(StructField("tileId", DataTypes.StringType))
    .add(StructField("tag", DataTypes.StringType))
    .add(StructField("lat", DataTypes.DoubleType))
    .add(StructField("lon", DataTypes.DoubleType))

  val csvDf = spark.read
    .format("com.databricks.spark.csv")
    .option("header", "false")
    .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
    .schema(csvSchema)
    .load(csvFiles)

  csvDf.show()

  /*
        The CSV dataframe is:
            +--------+---------+------------------+-------------------+
            |  tileId|  tag    |               lat|                lon|
            +--------+---------+------------------+-------------------+
            |19300322|806553341|        33.5231036|-113.08299199999999|
            |19300322|806553341|         33.523157|       -113.0833119|
            |19300322|806553341|        33.5232106|       -113.0836317|
            |19300322|806553341|        33.5232646|       -113.0839515|
            |19300322|806553341|        33.5233187|-113.08427119999999|
            |19300322|806553341|        33.5233725|-113.08459099999999|
            |19300322|806553341|33.523426199999996|       -113.0849126|
            |19300322|806553341|33.523480899999996|       -113.0852401|
        ...
   */

  import spark.implicits._

  val df = csvDf
    .select(
      struct($"tag", $"lat", $"lon").as("shape_point"),
      $"tileId".as("mt_partition")
    )
    .groupBy("mt_partition")
    .agg(collect_list($"shape_point").as("shape_points"))

  df.printSchema()
  /*
        The resulting dataframe schema that will be written to the layer is:
        root
        |-- mt_partition: string (nullable = true)
        |-- shape_points: array (nullable = true)
        |    |-- element: struct (containsNull = true)
        |    |    |-- tag: string (nullable = true)
        |    |    |-- lat: double (nullable = true)
        |    |    |-- lon: double (nullable = true)
   */

  df.show()
  /*
        The resulting dataframe data that will be written to the layer is:
        +------------+--------------------+
        |mt_partition|        shape_points|
        +------------+--------------------+
        |    19300322|[[806553341, 33.5...|
        |    19300324|[[806553341, 33.5...|
        +------------+--------------------+
   */

  df.writeLayer(outputCatalog, outputLayerId).save()

  logger.info(s"Finished uplaoding CSV data to Protobuf to $outputCatalog")

  spark.stop()
}



/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import com.here.platform.pipeline.PipelineContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkSqlUploadCsvJava {
  public static void main(String[] args) {
    Logger logger = LoggerFactory.getLogger(SparkSqlUploadCsvJava.class);
    PipelineContext pipelineContext = new PipelineContext();
    HRN outputCatalog = pipelineContext.getConfig().getOutputCatalog();
    String outputLayerId = "versioned-layer-protobuf-data";
    SparkSession spark =
        SparkSession.builder().appName("spark-sql-csv-upload-to-protobuf").getOrCreate();

    String csvPath = SparkSqlUploadCsvJava.class.getResource("/csv").getPath();
    String csvFiles = csvPath + "/*.csv";
    StructType csvSchema =
        new StructType()
            .add(new StructField("tileId", DataTypes.StringType, false, Metadata.empty()))
            .add(new StructField("tag", DataTypes.StringType, false, Metadata.empty()))
            .add(new StructField("lat", DataTypes.DoubleType, false, Metadata.empty()))
            .add(new StructField("lon", DataTypes.DoubleType, false, Metadata.empty()));
    Dataset<Row> csvDf =
        spark
            .read()
            .format("com.databricks.spark.csv")
            .option("header", "false")
            .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
            .schema(csvSchema)
            .load(csvFiles);
    csvDf.show();
    /*
            The CSV dataframe is:
            +--------+---------+------------------+-------------------+
            |  tileId|  tag    |               lat|                lon|
            +--------+---------+------------------+-------------------+
            |19300322|806553341|        33.5231036|-113.08299199999999|
            |19300322|806553341|         33.523157|       -113.0833119|
            |19300322|806553341|        33.5232106|       -113.0836317|
            |19300322|806553341|        33.5232646|       -113.0839515|
            |19300322|806553341|        33.5233187|-113.08427119999999|
            |19300322|806553341|        33.5233725|-113.08459099999999|
            |19300322|806553341|33.523426199999996|       -113.0849126|
            |19300322|806553341|33.523480899999996|       -113.0852401|
            ...
    */

    Dataset<Row> df =
        csvDf
            .select(
                struct(col("tag"), col("lat"), col("lon")).as("shape_point"),
                col("tileId").as("mt_partition"))
            .groupBy("mt_partition")
            .agg(collect_list(col("shape_point")).as("shape_points"));
    df.printSchema();
    /*
        The resulting dataframe schema that will be written to the layer is:
        root
        |-- mt_partition: string (nullable = true)
        |-- shape_points: array (nullable = true)
        |    |-- element: struct (containsNull = true)
        |    |    |-- tag: string (nullable = true)
        |    |    |-- lat: double (nullable = true)
        |    |    |-- lon: double (nullable = true)
    */

    df.show();
    /*
        The resulting dataframe data that will be written to the layer is:
        +------------+--------------------+
        |mt_partition|        shape_points|
        +------------+--------------------+
        |    19300322|[[806553341, 33.5...|
        |    19300324|[[806553341, 33.5...|
        +------------+--------------------+
    */

    JavaLayerDataFrameWriter.create(df).writeLayer(outputCatalog, outputLayerId).save();

    logger.info(String.format("Finished uplaoding CSV data to Protobuf to %s", outputCatalog));

    spark.stop();
  }
}

Compile and run locally

To run the application locally, execute the following command:

Scala
Java

mvn compile exec:java \
    -Dexec.mainClass=SparkSqlUploadCsvScala \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf \
    -Dexec.cleanupDaemonThreads=false \
    -Dspark.master="local[*]" \
    -Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
    

mvn compile exec:java \
    -Dexec.mainClass=SparkSqlUploadCsvJava \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf \
    -Dexec.cleanupDaemonThreads=false \
    -Dspark.master="local[*]" \
    -Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}

Further information

For additional details on the topics covered in this tutorial, you can refer to the following sources:

results matching ""

    No results matching ""