Spark Connector を使用して csv データを Protobuf に移行します

目的: Spark Connector を使用して、ディレクトリ内の csv ファイルから Protobuf レイヤーにデータをアップロードする方法について理解します。

複雑さ: 初心者向け

所要時間: 30 分

前提条件: Spark Connector を使用してデータの読み取りと書き込みを行います

ソースコード: ダウンロード

このチュートリアルの例では 、データ クライアント ライブラリが提供する Spark コネクタを使用し て、ディレクトリに保存されている csv データを Protobuf 形式を使用するレイヤーにアップロードする方法を示します。 データ処理アプリケーションを使用している場合に、より適切な帯域幅を使用することをお勧めします。

このチュートリアルの主な部分では、次の用途について説明します。

  • ディレクトリから 1 つのデータフレームに csv データを読み取ります。
  • レイヤースキーマと互換性があるようにデータフレームを再構築しています ;
  • 結果のデータフレームをレイヤーにアップロードしています。

準備のステップとして resources/csv 、フォルダに CSV ファイルを作成し、 csv スキーマに対応する出力レイヤーを作成する必要があります。 resources/csv フォルダーにある csv データの形式が次のようになっているとします。

tileId, tag, latitute, longitude
123456789,806553341,33.5230503,-113.0826719
123456789,806553341,33.5231036,-113.08299199999999
123456789,806553341,33.523157,-113.0833119
123456789,806553341,33.5232106,-113.0836317
123456789,806553341,33.5232646,-113.0839515
123456789,806553341,33.5233187,-113.08427119999999
123456789,806553341,33.5233725,-113.08459099999999
....

データクライアントの Spark コネクタでは、データフレーム行でパーティション ID を複数回使用することはできません。 つまり、 Protobuf レイヤーにアップロードするには、パーティション ID でデータをグループ化する必要があります。このスキーマを使用する代わりに、次のことを行います。

(partition, tag, lat, lon)

次の条件を満たすには、 IT を再構築する必要があります。

(partition, Array[(tag, lat, lon)])

対応するレイヤースキーマを作成します

まず、スキーマプロジェクトを生成し、次の proto ファイルを使用して展開します。

syntax = "proto3";

package com.here.platform.data.csv.v1;

// Declare any dependent resources the main POM file and add the import statements here:
//import "com/company/dependentGroupId/filename.proto";

// MainProtobufMessage is a placeholder, this value must match the package/messagename in the mainMessage tag of the layer-manifest-plugin in the schema_ds module.
message MainProtobufMessage {
    repeated TaggedPoint shape_points = 1;
}

message TaggedPoint {
    string tag = 1;
    double lat = 2;
    double lon = 3;
}

スキーマ の作成と展開については 、『スキーマ の作成と展開』を参照してください。

Maven プロジェクトを設定します

プロジェクトの次のフォルダー構造を作成します。

csv-to-protobuf
└── src
    └── main
        ├── java
        └── resources
        └── scala

この操作は、次の bash 1 つのコマンドで実行できます。

mkdir -p csv-to-protobuf/src/main/{java,resources,scala}

出力カタログを作成します

pipeline-config.conf という名前のファイルを作成し、次の内容を入力します。 {{YOUR_OUTPUT_CATALOG_HRN}} は、 Protobuf レイヤーを含む出力カタログの HERE リソースネームで置き換えられます。

pipeline.config {

  output-catalog { hrn = "{{YOUR_OUTPUT_CATALOG_HRN}}" }

  input-catalogs { }
}

この例の POM は、 Spark チュートリアルのパスマッチングの POM と同じです。

親 POM :

<parent>
    <groupId>com.here.platform</groupId>
    <artifactId>sdk-batch-bom_2.12</artifactId>
    <version>2.54.3</version>
    <relativePath/>
</parent>

依存関係 :

<dependencies>
    <dependency>
        <groupId>com.here.platform.pipeline</groupId>
        <artifactId>pipeline-interface_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.platform.data.client</groupId>
        <artifactId>spark-support_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.hrn</groupId>
        <artifactId>hrn_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
    </dependency>
</dependencies>

これらの手順を実行するに は、チュートリアル「 OLP コマンド ライン インターフェース ( CLI ) を使用してプロジェクトで作業内容を整理する」を参照してください。

たとえば、カタログには一意の識別子名を使用する必要 {{YOUR_USERNAME}}-csv-to-protobuf-outputがあります。

csv-to-protobuf-output.json 以下の内容のファイルを作成 {{DEPLOYED_SCHEMA_HRN}} し、配置されているスキーマ HERE リソースネーム に置き換えます。

以前に展開したスキーマを含む出力カタログを、 Protobuf として設定されたレイヤー形式で作成する必要があります。

すべてのタイムスタンプは、エポック( 1970 年 1 月 1 日 00:00 UTC )からの UTC ミリ秒数です。 別のタイムゾーンでアプリケーションを実行する場合は、データのクエリーまたはアップロードを行う前に、タイムスタンプが UTC に変換されていることを確認してください。 Java または Scala では、次の関数呼び出しを使用して変換を実行できます。 Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis()

{
  "id": "csv-to-protobuf-output",
  "name": "Uploaded CSV data (From tutorial)",
  "summary": "Catalog for Tutorial CSV to Protobuf",
  "description": "Archive of simulated road topology data.",
  "tags": ["Tutorial", "Converted"],
  "layers": [
    {
      "id": "versioned-layer-protobuf-data",
      "name": "versioned-layer-protobuf-data",
      "summary": "CSV converted data to protobuf.",
      "description": "CSV converted data to protobuf.",
      "contentType": "application/x-protobuf",
      "layerType": "versioned",
      "volume": {
        "volumeType": "durable"
      },
      "partitioning": {
        "scheme": "generic"
      },
      "schema": {
        "hrn": "{{DEPLOYED_SCHEMA_HRN}}"
      }
    }
  ]
}

csv アップロードアプリケーションを実装します

このアプリケーションは、前のステージで作成したデータソースを使用して、ディレクトリから csv ファイルを読み取り、結果のデータの一部の変換を実行して Protobuf レイヤースキーマに適応させ、前に作成したカタログの Protobuf レイヤーに書き込みます。

Scala
Java

/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import org.slf4j.LoggerFactory
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.types.StructField

import org.apache.spark.sql.functions._
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt

object SparkSqlUploadCsvScala extends App {
  private val logger = LoggerFactory.getLogger(SparkSqlUploadCsvScala.getClass)

  private val pipelineContext = new PipelineContext()
  private val outputCatalog = pipelineContext.config.outputCatalog
  private val outputLayerId = "versioned-layer-protobuf-data"

  val spark = SparkSession
    .builder()
    .appName("spark-sql-csv-upload-to-protobuf")
    .getOrCreate()

  val csvPath = getClass.getResource("/csv").getPath
  val csvFiles = csvPath + "/*.csv"
  val csvSchema = new StructType()
    .add(StructField("tileId", DataTypes.StringType))
    .add(StructField("tag", DataTypes.StringType))
    .add(StructField("lat", DataTypes.DoubleType))
    .add(StructField("lon", DataTypes.DoubleType))

  val csvDf = spark.read
    .format("com.databricks.spark.csv")
    .option("header", "false")
    .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
    .schema(csvSchema)
    .load(csvFiles)

  csvDf.show()

  /*
        The CSV dataframe is:
            +--------+---------+------------------+-------------------+
            |  tileId|  tag    |               lat|                lon|
            +--------+---------+------------------+-------------------+
            |19300322|806553341|        33.5231036|-113.08299199999999|
            |19300322|806553341|         33.523157|       -113.0833119|
            |19300322|806553341|        33.5232106|       -113.0836317|
            |19300322|806553341|        33.5232646|       -113.0839515|
            |19300322|806553341|        33.5233187|-113.08427119999999|
            |19300322|806553341|        33.5233725|-113.08459099999999|
            |19300322|806553341|33.523426199999996|       -113.0849126|
            |19300322|806553341|33.523480899999996|       -113.0852401|
        ...
   */

  import spark.implicits._

  val df = csvDf
    .select(
      struct($"tag", $"lat", $"lon").as("shape_point"),
      $"tileId".as("mt_partition")
    )
    .groupBy("mt_partition")
    .agg(collect_list($"shape_point").as("shape_points"))

  df.printSchema()
  /*
        The resulting dataframe schema that will be written to the layer is:
        root
        |-- mt_partition: string (nullable = true)
        |-- shape_points: array (nullable = true)
        |    |-- element: struct (containsNull = true)
        |    |    |-- tag: string (nullable = true)
        |    |    |-- lat: double (nullable = true)
        |    |    |-- lon: double (nullable = true)
   */

  df.show()
  /*
        The resulting dataframe data that will be written to the layer is:
        +------------+--------------------+
        |mt_partition|        shape_points|
        +------------+--------------------+
        |    19300322|[[806553341, 33.5...|
        |    19300324|[[806553341, 33.5...|
        +------------+--------------------+
   */

  df.writeLayer(outputCatalog, outputLayerId).save()

  logger.info(s"Finished uplaoding CSV data to Protobuf to $outputCatalog")

  spark.stop()
}



/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import com.here.platform.pipeline.PipelineContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkSqlUploadCsvJava {
  public static void main(String[] args) {
    Logger logger = LoggerFactory.getLogger(SparkSqlUploadCsvJava.class);
    PipelineContext pipelineContext = new PipelineContext();
    HRN outputCatalog = pipelineContext.getConfig().getOutputCatalog();
    String outputLayerId = "versioned-layer-protobuf-data";
    SparkSession spark =
        SparkSession.builder().appName("spark-sql-csv-upload-to-protobuf").getOrCreate();

    String csvPath = SparkSqlUploadCsvJava.class.getResource("/csv").getPath();
    String csvFiles = csvPath + "/*.csv";
    StructType csvSchema =
        new StructType()
            .add(new StructField("tileId", DataTypes.StringType, false, Metadata.empty()))
            .add(new StructField("tag", DataTypes.StringType, false, Metadata.empty()))
            .add(new StructField("lat", DataTypes.DoubleType, false, Metadata.empty()))
            .add(new StructField("lon", DataTypes.DoubleType, false, Metadata.empty()));
    Dataset<Row> csvDf =
        spark
            .read()
            .format("com.databricks.spark.csv")
            .option("header", "false")
            .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
            .schema(csvSchema)
            .load(csvFiles);
    csvDf.show();
    /*
            The CSV dataframe is:
            +--------+---------+------------------+-------------------+
            |  tileId|  tag    |               lat|                lon|
            +--------+---------+------------------+-------------------+
            |19300322|806553341|        33.5231036|-113.08299199999999|
            |19300322|806553341|         33.523157|       -113.0833119|
            |19300322|806553341|        33.5232106|       -113.0836317|
            |19300322|806553341|        33.5232646|       -113.0839515|
            |19300322|806553341|        33.5233187|-113.08427119999999|
            |19300322|806553341|        33.5233725|-113.08459099999999|
            |19300322|806553341|33.523426199999996|       -113.0849126|
            |19300322|806553341|33.523480899999996|       -113.0852401|
            ...
    */

    Dataset<Row> df =
        csvDf
            .select(
                struct(col("tag"), col("lat"), col("lon")).as("shape_point"),
                col("tileId").as("mt_partition"))
            .groupBy("mt_partition")
            .agg(collect_list(col("shape_point")).as("shape_points"));
    df.printSchema();
    /*
        The resulting dataframe schema that will be written to the layer is:
        root
        |-- mt_partition: string (nullable = true)
        |-- shape_points: array (nullable = true)
        |    |-- element: struct (containsNull = true)
        |    |    |-- tag: string (nullable = true)
        |    |    |-- lat: double (nullable = true)
        |    |    |-- lon: double (nullable = true)
    */

    df.show();
    /*
        The resulting dataframe data that will be written to the layer is:
        +------------+--------------------+
        |mt_partition|        shape_points|
        +------------+--------------------+
        |    19300322|[[806553341, 33.5...|
        |    19300324|[[806553341, 33.5...|
        +------------+--------------------+
    */

    JavaLayerDataFrameWriter.create(df).writeLayer(outputCatalog, outputLayerId).save();

    logger.info(String.format("Finished uplaoding CSV data to Protobuf to %s", outputCatalog));

    spark.stop();
  }
}

ローカルでコンパイルおよび実行します

アプリケーションをローカルで実行するには、次のコマンドを実行します。

Scala
Java

mvn compile exec:java \
    -Dexec.mainClass=SparkSqlUploadCsvScala \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf \
    -Dexec.cleanupDaemonThreads=false \
    -Dspark.master="local[*]" \
    -Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
    

mvn compile exec:java \
    -Dexec.mainClass=SparkSqlUploadCsvJava \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf \
    -Dexec.cleanupDaemonThreads=false \
    -Dspark.master="local[*]" \
    -Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}

詳細情報

このチュートリアルで扱うトピックの詳細については、次のソースを参照してください。

」に一致する結果は 件です

    」に一致する結果はありません