Spark Connector を使用して csv データを Protobuf に移行します
目的: Spark Connector を使用して、ディレクトリ内の csv ファイルから Protobuf レイヤーにデータをアップロードする方法について理解します。
複雑さ: 初心者向け
所要時間: 30 分
前提条件: Spark Connector を使用してデータの読み取りと書き込みを行います
ソースコード: ダウンロード
このチュートリアルの例では 、データ クライアント ライブラリが提供する Spark コネクタを使用し て、ディレクトリに保存されている csv データを Protobuf 形式を使用するレイヤーにアップロードする方法を示します。 データ処理アプリケーションを使用している場合に、より適切な帯域幅を使用することをお勧めします。
このチュートリアルの主な部分では、次の用途について説明します。
- ディレクトリから 1 つのデータフレームに csv データを読み取ります。
- レイヤースキーマと互換性があるようにデータフレームを再構築しています ;
- 結果のデータフレームをレイヤーにアップロードしています。
準備のステップとして resources/csv
、フォルダに CSV ファイルを作成し、 csv スキーマに対応する出力レイヤーを作成する必要があります。 resources/csv
フォルダーにある csv データの形式が次のようになっているとします。
tileId, tag, latitute, longitude
123456789,806553341,33.5230503,-113.0826719
123456789,806553341,33.5231036,-113.08299199999999
123456789,806553341,33.523157,-113.0833119
123456789,806553341,33.5232106,-113.0836317
123456789,806553341,33.5232646,-113.0839515
123456789,806553341,33.5233187,-113.08427119999999
123456789,806553341,33.5233725,-113.08459099999999
....
データクライアントの Spark コネクタでは、データフレーム行でパーティション ID を複数回使用することはできません。 つまり、 Protobuf レイヤーにアップロードするには、パーティション ID でデータをグループ化する必要があります。このスキーマを使用する代わりに、次のことを行います。
(partition, tag, lat, lon)
次の条件を満たすには、 IT を再構築する必要があります。
(partition, Array[(tag, lat, lon)])
対応するレイヤースキーマを作成します
まず、スキーマプロジェクトを生成し、次の proto ファイルを使用して展開します。
syntax = "proto3";
package com.here.platform.data.csv.v1;
// Declare any dependent resources the main POM file and add the import statements here:
//import "com/company/dependentGroupId/filename.proto";
// MainProtobufMessage is a placeholder, this value must match the package/messagename in the mainMessage tag of the layer-manifest-plugin in the schema_ds module.
message MainProtobufMessage {
repeated TaggedPoint shape_points = 1;
}
message TaggedPoint {
string tag = 1;
double lat = 2;
double lon = 3;
}
スキーマ の作成と展開については 、『スキーマ の作成と展開』を参照してください。
Maven プロジェクトを設定します
プロジェクトの次のフォルダー構造を作成します。
csv-to-protobuf
└── src
└── main
├── java
└── resources
└── scala
この操作は、次の bash
1 つのコマンドで実行できます。
mkdir -p csv-to-protobuf/src/main/{java,resources,scala}
出力カタログを作成します
pipeline-config.conf
という名前のファイルを作成し、次の内容を入力します。 {{YOUR_OUTPUT_CATALOG_HRN}}
は、 Protobuf レイヤーを含む出力カタログの HERE リソースネームで置き換えられます。
pipeline.config {
output-catalog { hrn = "{{YOUR_OUTPUT_CATALOG_HRN}}" }
input-catalogs { }
}
この例の POM は、 Spark チュートリアルのパスマッチングの POM と同じです。
親 POM :
<parent>
<groupId>com.here.platform</groupId>
<artifactId>sdk-batch-bom_2.12</artifactId>
<version>2.54.3</version>
<relativePath/>
</parent>
依存関係 :
<dependencies>
<dependency>
<groupId>com.here.platform.pipeline</groupId>
<artifactId>pipeline-interface_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.data.client</groupId>
<artifactId>spark-support_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.hrn</groupId>
<artifactId>hrn_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
</dependencies>
これらの手順を実行するに は、チュートリアル「 OLP コマンド ライン インターフェース ( CLI ) を使用してプロジェクトで作業内容を整理する」を参照してください。
たとえば、カタログには一意の識別子名を使用する必要 {{YOUR_USERNAME}}-csv-to-protobuf-output
があります。
csv-to-protobuf-output.json
以下の内容のファイルを作成 {{DEPLOYED_SCHEMA_HRN}}
し、配置されているスキーマ HERE リソースネーム に置き換えます。
以前に展開したスキーマを含む出力カタログを、 Protobuf として設定されたレイヤー形式で作成する必要があります。
注
すべてのタイムスタンプは、エポック( 1970 年 1 月 1 日 00:00 UTC )からの UTC ミリ秒数です。 別のタイムゾーンでアプリケーションを実行する場合は、データのクエリーまたはアップロードを行う前に、タイムスタンプが UTC に変換されていることを確認してください。 Java または Scala では、次の関数呼び出しを使用して変換を実行できます。 Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis()
{
"id": "csv-to-protobuf-output",
"name": "Uploaded CSV data (From tutorial)",
"summary": "Catalog for Tutorial CSV to Protobuf",
"description": "Archive of simulated road topology data.",
"tags": ["Tutorial", "Converted"],
"layers": [
{
"id": "versioned-layer-protobuf-data",
"name": "versioned-layer-protobuf-data",
"summary": "CSV converted data to protobuf.",
"description": "CSV converted data to protobuf.",
"contentType": "application/x-protobuf",
"layerType": "versioned",
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "generic"
},
"schema": {
"hrn": "{{DEPLOYED_SCHEMA_HRN}}"
}
}
]
}
csv アップロードアプリケーションを実装します
このアプリケーションは、前のステージで作成したデータソースを使用して、ディレクトリから csv ファイルを読み取り、結果のデータの一部の変換を実行して Protobuf レイヤースキーマに適応させ、前に作成したカタログの Protobuf レイヤーに書き込みます。
import org.slf4j.LoggerFactory
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.functions._
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
object SparkSqlUploadCsvScala extends App {
private val logger = LoggerFactory.getLogger(SparkSqlUploadCsvScala.getClass)
private val pipelineContext = new PipelineContext()
private val outputCatalog = pipelineContext.config.outputCatalog
private val outputLayerId = "versioned-layer-protobuf-data"
val spark = SparkSession
.builder()
.appName("spark-sql-csv-upload-to-protobuf")
.getOrCreate()
val csvPath = getClass.getResource("/csv").getPath
val csvFiles = csvPath + "/*.csv"
val csvSchema = new StructType()
.add(StructField("tileId", DataTypes.StringType))
.add(StructField("tag", DataTypes.StringType))
.add(StructField("lat", DataTypes.DoubleType))
.add(StructField("lon", DataTypes.DoubleType))
val csvDf = spark.read
.format("com.databricks.spark.csv")
.option("header", "false")
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.schema(csvSchema)
.load(csvFiles)
csvDf.show()
import spark.implicits._
val df = csvDf
.select(
struct($"tag", $"lat", $"lon").as("shape_point"),
$"tileId".as("mt_partition")
)
.groupBy("mt_partition")
.agg(collect_list($"shape_point").as("shape_points"))
df.printSchema()
df.show()
df.writeLayer(outputCatalog, outputLayerId).save()
logger.info(s"Finished uplaoding CSV data to Protobuf to $outputCatalog")
spark.stop()
}
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import com.here.platform.pipeline.PipelineContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SparkSqlUploadCsvJava {
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger(SparkSqlUploadCsvJava.class);
PipelineContext pipelineContext = new PipelineContext();
HRN outputCatalog = pipelineContext.getConfig().getOutputCatalog();
String outputLayerId = "versioned-layer-protobuf-data";
SparkSession spark =
SparkSession.builder().appName("spark-sql-csv-upload-to-protobuf").getOrCreate();
String csvPath = SparkSqlUploadCsvJava.class.getResource("/csv").getPath();
String csvFiles = csvPath + "/*.csv";
StructType csvSchema =
new StructType()
.add(new StructField("tileId", DataTypes.StringType, false, Metadata.empty()))
.add(new StructField("tag", DataTypes.StringType, false, Metadata.empty()))
.add(new StructField("lat", DataTypes.DoubleType, false, Metadata.empty()))
.add(new StructField("lon", DataTypes.DoubleType, false, Metadata.empty()));
Dataset<Row> csvDf =
spark
.read()
.format("com.databricks.spark.csv")
.option("header", "false")
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.schema(csvSchema)
.load(csvFiles);
csvDf.show();
Dataset<Row> df =
csvDf
.select(
struct(col("tag"), col("lat"), col("lon")).as("shape_point"),
col("tileId").as("mt_partition"))
.groupBy("mt_partition")
.agg(collect_list(col("shape_point")).as("shape_points"));
df.printSchema();
df.show();
JavaLayerDataFrameWriter.create(df).writeLayer(outputCatalog, outputLayerId).save();
logger.info(String.format("Finished uplaoding CSV data to Protobuf to %s", outputCatalog));
spark.stop();
}
}
ローカルでコンパイルおよび実行します
アプリケーションをローカルで実行するには、次のコマンドを実行します。
mvn compile exec:java \
-Dexec.mainClass=SparkSqlUploadCsvScala \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf \
-Dexec.cleanupDaemonThreads=false \
-Dspark.master="local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
mvn compile exec:java \
-Dexec.mainClass=SparkSqlUploadCsvJava \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf \
-Dexec.cleanupDaemonThreads=false \
-Dspark.master="local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
このチュートリアルで扱うトピックの詳細については、次のソースを参照してください。