ボラタイル レイヤーデータを書き込みます

このspark-supportモジュールは、 揮発性レイヤーにデータフレームを書き込むためのカスタム Spark DataFrameWriter であるクラスLayerDataFrameWriterを提供します。

プロジェクトの依存関係

HERE platform Spark コネクターを使用してボラタイル レイヤーにデータを書き込むアプリケーションを作成する場合 は、「 Spark コネクターの依存関係」の章の説明に従って、必要な依存関係をプロジェクトに追加してください。

形式

Spark コネクタは、次の形式の書き込み機能を提供します。

  • Protobuf
  • Avro
  • 寄木細工
  • 未加工
  • JSON
  • テキスト
  • csv

注 : 使用状況の詳細

Raw形式については、データコンバータをDataConverter特性 / インターフェイスから実装し、withDataConverterメソッドを使用して設定する必要があります。 format メソッドは必須ではありません。

書き込みプロセス

揮発性レイヤーの DataFrame 場合 mt_partition 、行は列によってグループ化され、パーティション データが作成されます。 データは書き込みエンジンを使用してボラタイル レイヤーにアップロードされ、後で公開 API を使用して公開されます。 DataFrame また、追加のメタデータを含めることもできます。

同じパーティションに複数の行を含めることは Avro 、および Parquet の形式にのみ許可されています。 その他の形式の場合、エラーがスローされます。

メタデータフィールド

ボラタイル レイヤーに用意されているすべてのメタデータ列 :

列名 データ型 意味 必須
mt_partition 文字列 HERE platform のパーティションの ID はい
mt_timestamp 長い 作成のタイムスタンプ いいえ
mt_checksum 文字列 ペイロードのチェックサム いいえ
mt_crc 文字列 ペイロードの CRC いいえ
mt_dataSize 長い ペイロードのサイズ いいえ
mt_compressedDataSize 長い ペイロードの圧縮サイズ いいえ

注 : メタデータの制約事項

メタデータの列の種類は、上の表のとおりにする必要 IllegalArgumentException があります。そうしないと、例外がスローされます。 mt_ プレフィックスで始まるが提供されていない追加のフィールドは、書き込み操作中に無視されます。

Protobuf 形式でデータを書き込みます

次のスニペットは、Protobuf でエンコードされたデータファイルとしてDataFrame[Row](Dataset<Row>) を操作および書き込みする方法を示しています。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, count, first, lit}
import org.apache.spark.sql.types.IntegerType
val sparkSession: SparkSession =
  SparkSession.builder().master("local").appName("write-protobuf").getOrCreate()

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a volatile layer containing protobuf schema
// with the required fields 'text' in String Type and 'count' in Integer Type)
val inputDataFrame = loadDataFrame(sparkSession)
inputDataFrame.show()
// ------------------------
// |   text|partition_name|
// ------------------------
// |value-1|   partition-1|
// |value-2|   partition-2|
// |value-1|   partition-1|
// ------------------------

// Some computations
val computationDataFrame = inputDataFrame
  .groupBy("text")
  .agg(count("text").as("count"), first("partition_name").as("mt_partition"))

// Casting column type and adding extra mt_timestamp column
val outDataFrame = computationDataFrame
  .withColumn("count", col("count").cast(IntegerType))
  .withColumn("mt_timestamp", lit(System.currentTimeMillis))
outDataFrame.show()
// ------------------------------------------
// |   text|count|mt_partition| mt_timestamp|
// ------------------------------------------
// |value-2|    1| partition-1|1569598607978|
// |value-1|    2| partition-2|1569598607978|
// ------------------------------------------

outDataFrame
  .writeLayer(catalogHrn, layerId)
  .save()

sparkSession.stop()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

SparkSession sparkSession =
    SparkSession.builder().master("local").appName("write-protobuf").getOrCreate();

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a volatile layer containing protobuf schema
// with the required fields 'text' in String Type and 'count' in Integer Type)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "volatile-protobuf-layer";
Dataset<Row> inputDataset = loadDataFrame(sparkSession);
inputDataset.show();
// ------------------------
// |   text|partition_name|
// ------------------------
// |value-1|   partition-1|
// |value-2|   partition-2|
// |value-1|   partition-1|
// ------------------------

// Some computations
Dataset<Row> computationDataset =
    inputDataset
        .groupBy("text")
        .agg(count("text").as("count"), first("partition_name").as("mt_partition"));

// Casting column type and adding extra mt_timestamp column
Dataset<Row> outDataset =
    computationDataset
        .withColumn("count", col("count").cast(IntegerType))
        .withColumn("mt_timestamp", lit(System.currentTimeMillis()));
outDataset.show();
// ------------------------------------------
// |   text|count|mt_partition| mt_timestamp|
// ------------------------------------------
// |value-2|    1| partition-1|1569598607978|
// |value-1|    2| partition-2|1569598607978|
// ------------------------------------------

JavaLayerDataFrameWriter.create(outDataset).writeLayer(catalogHrn, layerId).save();

sparkSession.stop();

注 : 制限事項

メタデータ mt_partition 列には一意の値が必要 IllegalArgumentException です。一意でない場合、例外がスローされます。 レイヤー application/x-protobuf にはコンテンツタイプが必要です。

Avro 形式および Parquet 形式でデータを書き込みます

次のスニペットで DataFrame[Row]は、 () をDataset<Row>Avro または寄木細工でエンコードされたデータファイルとして記述する方法を示します。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
val sparkSession: SparkSession =
  SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate()

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of volatile layer)
val inputDataFrame: DataFrame = loadDataFrame(sparkSession)
inputDataFrame.show()
// ---------------------------
// |tileId|index|mt_partition|
// ---------------------------
// |     5|    1| partition-1|
// |     6|    2| partition-1|
// |     7|   10| partition-2|
// |     8|   20| partition-2|
// ---------------------------

inputDataFrame
  .writeLayer(catalogHrn, layerId)
  .save()

sparkSession.stop()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

SparkSession sparkSession =
    SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate();

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a versioned layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "volatile-parquet-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);

JavaLayerDataFrameWriter.create(inputDataFrame).writeLayer(catalogHrn, layerId).save();
sparkSession.stop();

注 : 使用状況の詳細

Avro と Parquet は列に並んだ収納形式です。 そのため、DataFramemt_partition列に重複を含めることができます。 行はグループ化さ mt_partition れ、各グループが 1 つのプラットフォームパーティションに保存されます。 レイヤーには、タイプに応じてapplication/x-parquetまたはapplication/x-avro-binaryのコンテンツタイプが必要です。

生データ形式でデータを書き込みます

次のスニペットは、を DataFrame 任意の形式のファイルとして書き込む方法を示しています。 この例では、入力 DataFramedata message が文字列である列が含まれています。

注 : 制限事項

メタデータ mt_partition 列には一意の値を指定する必要 IllegalArgumentException があります。そうしないと、例外がスローされます。 レイヤー application/octet-stream にはコンテンツタイプが必要です。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
val sparkSession: SparkSession =
  SparkSession.builder().master("local").appName("write-raw").getOrCreate()

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame = loadDataFrame(sparkSession)
// -------------------
// |mt_partition| raw|
// -------------------
// | partition-1|[31]|
// | partition-2|[32]|
// | partition-3|[33]|
// -------------------

inputDataFrame
  .writeLayer(catalogHrn, layerId)
  .withDataConverter(new VolatileDataConverter {
    override def serializeGroup(rowMetadata: VolatileRowMetadata,
                                rows: Iterator[Row]): GroupedData[VolatileRowMetadata] = {
      // One row per partition Id so we have just to process first element of the iterator
      val bytes = "serializeGroup=>".getBytes ++ rows.next().getAs[Array[Byte]]("data")
      GroupedData(rowMetadata, bytes)
    }
  })
  .save()

sparkSession.stop()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

SparkSession sparkSession =
    SparkSession.builder().master("local").appName("write-raw").getOrCreate();

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a volatile layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "volatile-raw-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
// -------------------
// |mt_partition| raw|
// -------------------
// | partition-1|[31]|
// | partition-2|[32]|
// | partition-3|[33]|
// -------------------

JavaLayerDataFrameWriter.create(inputDataFrame)
    .writeLayer(catalogHrn, layerId)
    .withDataConverter(
        new VolatileDataConverter() {
          @Override
          public GroupedData<VolatileRowMetadata> serializeGroup(
              VolatileRowMetadata rowMetadata, Iterator<Row> rows) {
            byte[] bytes =
                ArrayUtils.addAll("serializeGroup=>".getBytes(), rows.next().getAs("data"));
            return new GroupedData<>(rowMetadata, bytes);
          }
        })
    .save();

sparkSession.stop();

JSON 形式でデータを書き込みます

次のスニペットは、を DataFrame 任意の形式のファイルとして書き込む方法を示しています。 この例では、入力 DataFrame に整数として列 intVal が、文字列として strVal のが含まれています。

注 : 制限事項

メタデータ mt_partition 列には一意の値を指定する必要 IllegalArgumentException があります。そうしないと、例外がスローされます。 レイヤー application/json にはコンテンツタイプが必要です。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter._
val sparkSession: SparkSession =
  SparkSession.builder().master("local").appName("write-json").getOrCreate()

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame = loadDataFrame(sparkSession)
// ------------------------------
// |mt_partition| intVal| strVal|
// ------------------------------
// | partition-1|[1]|str1|
// | partition-2|[2]|str2|
// | partition-3|[3]|str3|
// ------------------------------

inputDataFrame
  .writeLayer(catalogHrn, layerId)
  .save()

sparkSession.stop()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

SparkSession sparkSession =
    SparkSession.builder().master("local").appName("write-json").getOrCreate();

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a volatile layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "volatile-json-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);

JavaLayerDataFrameWriter.create(inputDataFrame).writeLayer(catalogHrn, layerId).save();
sparkSession.stop();

テキスト形式でデータを書き込みます

次のスニペットは、を DataFrame 任意の形式のファイルとして書き込む方法を示しています。 この例では、入力 DataFramedata message が文字列である列が含まれています。

注 : 制限事項

メタデータ mt_partition 列には一意の値を指定する必要 IllegalArgumentException があります。そうしないと、例外がスローされます。 レイヤー text/plain にはコンテンツタイプが必要です。 データの書き込み中、テキストデータソースは 1 行につき 1 つの列のみをサポートします。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter._
val sparkSession: SparkSession =
  SparkSession.builder().master("local").appName("write-text").getOrCreate()

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame = loadDataFrame(sparkSession)
// -------------------
// |mt_partition| data|
// -------------------
// | partition-1|[1]|
// | partition-2|[2]|
// | partition-3|[3]|
// -------------------

inputDataFrame
  .writeLayer(catalogHrn, layerId)
  .save()
sparkSession.stop()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

SparkSession sparkSession =
    SparkSession.builder().master("local").appName("write-text").getOrCreate();

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a volatile layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "volatile-text-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);

JavaLayerDataFrameWriter.create(inputDataFrame).writeLayer(catalogHrn, layerId).save();
sparkSession.stop();

csv 形式でデータを書き込みます

次のスニペットは、を DataFrame 任意の形式のファイルとして書き込む方法を示しています。 この例では、入力 DataFrame に整数として列 field1 が、文字列として field2 のが含まれています。

注 : 制限事項

メタデータ mt_partition 列には一意の値を指定する必要 IllegalArgumentException があります。そうしないと、例外がスローされます。 レイヤー text/csv にはコンテンツタイプが必要です。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter._
val sparkSession: SparkSession =
  SparkSession.builder().master("local").appName("write-csv").getOrCreate()

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame = loadDataFrame(sparkSession)
// ------------------------------
// |mt_partition| field1| field2|
// ------------------------------
// | partition-1|[1]|str1|
// | partition-2|[2]|str2|
// | partition-3|[3]|str3|
// ------------------------------

inputDataFrame
  .writeLayer(catalogHrn, layerId)
  .option("header", "true")
  .save()
sparkSession.stop()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

SparkSession sparkSession =
    SparkSession.builder().master("local").appName("write-csv").getOrCreate();

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a volatile layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "volatile-csv-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);

JavaLayerDataFrameWriter.create(inputDataFrame)
    .writeLayer(catalogHrn, layerId)
    .option("header", "true")
    .save();
sparkSession.stop();

注 : 使用状況の詳細

コネクタは列 mt_partition の値で行をグループ化します。 この場合、 DataConverter 特性 / インターフェイスからデータコンバータを実装し withDataConverter 、メソッドを使用して設定する必要があります。 レイヤーに application/octet-stream はコンテンツタイプが必要です。

Raw形式については、データコンバータをDataConverter特性 / インターフェイスから実装し、withDataConverterメソッドを使用して設定する必要があります。 format メソッドは必須ではありません。

」に一致する結果は 件です

    」に一致する結果はありません