バージョン付レイヤーデータを書き込みます

このspark-supportモジュールは、 バージョン管理されたレイヤーに DataFrame を書き込むためのカスタム Spark DataFrameWriter であるクラスLayerDataFrameWriterを提供します。

プロジェクトの依存関係

HERE platform Spark コネクターを使用してバージョン付レイヤーにデータを書き込むアプリケーションを作成する場合 は、「 Spark コネクターの依存関係」の章の説明に従って、必要な依存関係をプロジェクトに追加してください。

形式

Spark コネクタは、次の形式の書き込み機能を提供します。

  • Protobuf
  • Avro
  • 寄木細工
  • 未加工
  • JSON
  • テキスト
  • csv

注 : 使用状況の詳細

Raw形式については、データコンバータをDataConverter特性 / インターフェイスから実装し、withDataConverterメソッドを使用して設定する必要があります。 format メソッドは必須ではありません。

書き込みプロセス

バージョン管理されたレイヤーの DataFrame 場合 mt_partition 、行は列でグループ化され、パーティション データが作成されます。 データは書き込みエンジンを使用してバージョン付レイヤーにアップロードされ、後で公開 API を使用して公開されます。 DataFrame また、追加のメタデータを含めることもできます。

同じパーティションに複数の行を含めることは Avro 、および Parquet の形式にのみ許可されています。 その他の形式の場合、エラーがスローされます。

メタデータフィールド

バージョン付レイヤーに用意されているすべてのメタデータ列 :

列名 データ型 意味 必須
mt_partition 文字列 HERE platform のパーティションの ID はい
mt_timestamp 長い 作成のタイムスタンプ いいえ
mt_checksum 文字列 ペイロードのチェックサム いいえ
mt_crc 文字列 ペイロードの CRC いいえ
mt_dataSize 長い ペイロードのサイズ いいえ
mt_compressedDataSize 長い ペイロードの圧縮サイズ いいえ
mt_dataHandle 文字列 データのハンドラ いいえ

注 : メタデータの制約事項

メタデータ列の種類は、上の表のとおりにする必要 IllegalArgumentException があります。そうしないと、例外がスローされます。 で開始 mt_ されたが提供されていない追加のフィールドは、書き込み時に無視されます。

Protobuf 形式でデータを書き込みます

次のスニペットは、Protobuf でエンコードされたデータファイルとしてDataFrame[Row](Dataset<Row>) を操作および書き込みする方法を示しています。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, count, first, lit}
import org.apache.spark.sql.types.IntegerType
val sparkSession: SparkSession =
  SparkSession.builder().master("local").appName("write-protobuf").getOrCreate()

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a versioned layer containing protobuf schema
// with the required fields 'text' in String Type and 'count' in Integer Type)
val inputDataFrame = loadDataFrame(sparkSession)
inputDataFrame.show()
// +-------+--------------+
// |   text|partition_name|
// +-------+--------------+
// |value-1|   partition-1|
// |value-2|   partition-2|
// |value-1|   partition-1|
// +-------+--------------+

// Some computations
val computationDataFrame = inputDataFrame
  .groupBy("text")
  .agg(count("text").as("count"), first("partition_name").as("mt_partition"))

// Casting column type and adding extra mt_timestamp column
val outDataFrame = computationDataFrame
  .withColumn("count", col("count").cast(IntegerType))
  .withColumn("mt_timestamp", lit(System.currentTimeMillis))
outDataFrame.show()
//+-------+-----+------------+-------------+
//|   text|count|mt_partition| mt_timestamp|
//+-------+-----+------------+-------------+
//|value-2|    1| partition-1|1569598607978|
//|value-1|    2| partition-2|1569598607978|
//+-------+-----+------------+-------------+

outDataFrame
  .writeLayer(catalogHrn, layerId)
  .option("olp.connector.metadata-columns", true)
  .save()

sparkSession.stop()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.first;
import static org.apache.spark.sql.functions.lit;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

SparkSession sparkSession =
    SparkSession.builder().master("local").appName("write-protobuf").getOrCreate();

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a versioned layer containing protobuf schema
// with the required fields 'text' in String Type and 'count' in Integer Type)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "versioned-protobuf-layer";
Dataset<Row> inputDataset = loadDataFrame(sparkSession);
inputDataset.show();
// ------------------------
// |   text|partition_name|
// ------------------------
// |value-1|   partition-1|
// |value-2|   partition-2|
// |value-1|   partition-1|
// ------------------------

// Some computations
Dataset<Row> computationDataset =
    inputDataset
        .groupBy("text")
        .agg(count("text").as("count"), first("partition_name").as("mt_partition"));

// Casting column type and adding extra mt_timestamp column
Dataset<Row> outDataset =
    computationDataset
        .withColumn("count", col("count").cast(IntegerType))
        .withColumn("mt_timestamp", lit(System.currentTimeMillis()));
outDataset.show();
// ------------------------------------------
// |   text|count|mt_partition| mt_timestamp|
// ------------------------------------------
// |value-2|    1| partition-1|1569598607978|
// |value-1|    2| partition-2|1569598607978|
// ------------------------------------------

JavaLayerDataFrameWriter.create(outDataset)
    .writeLayer(catalogHrn, layerId)
    .option("olp.connector.metadata-columns", true)
    .save();

sparkSession.stop();

注 : 制限事項

メタデータ mt_partition 列には一意の値が必要 IllegalArgumentException です。一意でない場合、例外がスローされます。 レイヤーに application/x-protobuf はコンテンツタイプが必要です。

Avro 形式および Parquet 形式でデータを書き込みます

次のスニペットで DataFrame[Row]は、 () をDataset<Row>Avro または寄木細工でエンコードされたデータファイルとして記述する方法を示します。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
val sparkSession: SparkSession =
  SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate()

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame: DataFrame = loadDataFrame(sparkSession)
inputDataFrame.show()
//  +--------------------+------+-----+
//  |        mt_partition|tileId|index|
//  +--------------------+------+-----+
//  |6c6a6eb5-e631-467...|     5|    1|
//  |6c6a6eb5-e631-467...|     6|    2|
//  |4ce8fb63-3cc2-420...|     7|   10|
//  |4ce8fb63-3cc2-420...|     8|   20|
//  +--------------------+------+-----+

inputDataFrame
  .writeLayer(catalogHrn, layerId)
  .save()

sparkSession.stop()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.first;
import static org.apache.spark.sql.functions.lit;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

SparkSession sparkSession =
    SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate();

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a versioned layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "versioned-parquet-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);

JavaLayerDataFrameWriter.create(inputDataFrame).writeLayer(catalogHrn, layerId).save();
sparkSession.stop();

注 : 使用状況の詳細

Avro と Parquet は列に並んだ収納形式です。 そのため、DataFramemt_partition列に重複を含めることができます。 行はグループ化さ mt_partition れ、各グループが 1 つのプラットフォームパーティションに保存されます。 レイヤーは、タイプに応じて必須application/x-parquetまたはapplication/x-avro-binaryコンテンツタイプになります。

生データ形式でデータを書き込みます

次のスニペットは、を DataFrame 任意の形式のファイルとして書き込む方法を示しています。 この例では、入力 DataFramedata message が文字列である列が含まれています。

注 : 制限事項

メタデータ mt_partition 列には一意の値が必要 IllegalArgumentException です。一意でない場合、例外がスローされます。 レイヤーに application/octet-stream はコンテンツタイプが必要です。

Scala
Java
import com.here.platform.data.client.spark.scaladsl.{
  GroupedData,
  VersionedDataConverter,
  VersionedRowMetadata
}
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
val sparkSession: SparkSession =
  SparkSession.builder().master("local").appName("write-raw").getOrCreate()

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame = loadDataFrame(sparkSession)
// -------------------
// |mt_partition| raw|
// -------------------
// | partition-1|[31]|
// | partition-2|[32]|
// | partition-3|[33]|
// -------------------

inputDataFrame
  .writeLayer(catalogHrn, layerId)
  .withDataConverter(new VersionedDataConverter {
    override def serializeGroup(
        rowMetadata: VersionedRowMetadata,
        rows: Iterator[Row]
    ): GroupedData[VersionedRowMetadata] = {
      // One row per partition Id so we have just to process first element of the iterator
      val bytes = "serializeGroup=>".getBytes ++ rows
        .next()
        .getAs[Array[Byte]]("data")
      GroupedData(rowMetadata, bytes)
    }
  })
  .save()

sparkSession.stop()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.first;
import static org.apache.spark.sql.functions.lit;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

SparkSession sparkSession =
    SparkSession.builder().master("local").appName("write-raw").getOrCreate();

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a versioned layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "versioned-raw-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
// -------------------
// |mt_partition| raw|
// -------------------
// | partition-1|[31]|
// | partition-2|[32]|
// | partition-3|[33]|
// -------------------

JavaLayerDataFrameWriter.create(inputDataFrame)
    .writeLayer(catalogHrn, layerId)
    .withDataConverter(
        new VersionedDataConverter() {
          @Override
          public GroupedData<VersionedRowMetadata> serializeGroup(
              VersionedRowMetadata rowMetadata, Iterator<Row> rows) {
            byte[] bytes =
                ArrayUtils.addAll("serializeGroup=>".getBytes(), rows.next().getAs("data"));
            return new GroupedData<>(rowMetadata, bytes);
          }
        })
    .save();

sparkSession.stop();

JSON 形式でデータを書き込みます

次のスニペット は DataFrame[Row]、 ()Dataset<Row>を JSON 行として記述する方法を示しています。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
val sparkSession: SparkSession =
  SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate()

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame: DataFrame = loadDataFrame(sparkSession)
inputDataFrame.show()
//  +--------------------+------+-----+
//  |        mt_partition|tileId|index|
//  +--------------------+------+-----+
//  |6c6a6eb5-e631-467...|     5|    1|
//  |6c6a6eb5-e631-467...|     6|    2|
//  |4ce8fb63-3cc2-420...|     7|   10|
//  |4ce8fb63-3cc2-420...|     8|   20|
//  +--------------------+------+-----+

inputDataFrame
  .writeLayer(catalogHrn, layerId)
  .save()

sparkSession.stop()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.first;
import static org.apache.spark.sql.functions.lit;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

SparkSession sparkSession =
    SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate();

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a versioned layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "versioned-json-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);

JavaLayerDataFrameWriter.create(inputDataFrame).writeLayer(catalogHrn, layerId).save();
sparkSession.stop();

テキスト形式でデータを書き込みます

次のスニペットは、を DataFrame 任意の形式のファイルとして書き込む方法を示しています。 この例では、入力 DataFrametext message が文字列である列が含まれています。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
val sparkSession: SparkSession =
  SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate()

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame: DataFrame = loadDataFrame(sparkSession)
inputDataFrame.show()
//  +--------------------+-----+
//  |        mt_partition| text|
//  +--------------------+-----+
//  |6c6a6eb5-e631-467...|  5_1|
//  |6c6a6eb5-e631-467...|  6_2|
//  |4ce8fb63-3cc2-420...| 7_10|
//  |4ce8fb63-3cc2-420...| 8_20|
//  +--------------------+-----+

inputDataFrame
  .writeLayer(catalogHrn, layerId)
  .save()

sparkSession.stop()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.first;
import static org.apache.spark.sql.functions.lit;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

SparkSession sparkSession =
    SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate();

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a versioned layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "versioned-text-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);

JavaLayerDataFrameWriter.create(inputDataFrame).writeLayer(catalogHrn, layerId).save();
sparkSession.stop();

csv 形式でデータを書き込みます

次のスニペット は DataFrame[Row]、 ()Dataset<Row>を csv 行として記述する方法を示しています。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
val sparkSession: SparkSession =
  SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate()

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame: DataFrame = loadDataFrame(sparkSession)
inputDataFrame.show()
//  +--------------------+------+-----+
//  |        mt_partition|tileId|index|
//  +--------------------+------+-----+
//  |6c6a6eb5-e631-467...|     5|    1|
//  |6c6a6eb5-e631-467...|     6|    2|
//  |4ce8fb63-3cc2-420...|     7|   10|
//  |4ce8fb63-3cc2-420...|     8|   20|
//  +--------------------+------+-----+

inputDataFrame
  .writeLayer(catalogHrn, layerId)
  .option("header", "true")
  .save()

sparkSession.stop()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.first;
import static org.apache.spark.sql.functions.lit;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

SparkSession sparkSession =
    SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate();

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a versioned layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "versioned-csv-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);

JavaLayerDataFrameWriter.create(inputDataFrame)
    .writeLayer(catalogHrn, layerId)
    .option("header", "true")
    .save();
sparkSession.stop();

注 : 使用状況の詳細

コネクタは列 mt_partition の値で行をグループ化します。 この場合、 DataConverter は trait から継承された DataConverter を実装する必要があります。 また、バージョン付レイヤーに application/octet-stream はコンテンツタイプが必要です。

」に一致する結果は 件です

    」に一致する結果はありません