インデックス レイヤーデータを書き込みます

データ クライアント ライブラリには、 インデックスレイヤーにデータフレームを書き込むためのカスタム Spark DataFrameWriter であるクラスLayerDataFrameWriterが用意されています。

インデックスレイヤーの場合、データはレイヤーに定義されているインデックス属性の値でグループ化されます。 Protobuf でエンコードされたレイヤーに書き込む場合 Row 、インデックス属性のセットごとに 1 つだけ記述する必要があります。

プロジェクトの依存関係

HERE platform Spark コネクターを使用してインデックス レイヤーにデータを書き込むアプリケーションを作成する場合 は、「 Spark コネクターの依存関係」の章の説明に従って、必要な依存関係をプロジェクトに追加してください。

書き込みプロセス

インデックスレイヤーで DataFrame は、インデックスデータを作成するために、行がインデックス属性でグループ化されます。 データは書き込みエンジンを使用してインデックス レイヤーにアップロードされ、後で公開 API を使用して公開されます。 DataFrame また、追加のメタデータを含めることもできます。

メタデータフィールド

インデックス レイヤーに用意されているすべてのメタデータ列 :

列名 データ型 意味 必須
mt_metadata 地図 [ 文字列 , 文字列 ] パーティションのメタデータ いいえ
mt_timestamp 長い 作成のタイムスタンプ (UTC) いいえ
mt_checksum 文字列 ペイロードのチェックサム いいえ
mt_crc 文字列 ペイロードの CRC いいえ
mt_dataSize 長い ペイロードのサイズ いいえ
mt_compressedDataSize 長い ペイロードの圧縮サイズ いいえ

Avro 、 Parquet 、または Protobuf でエンコードされたファイルとしてデータを書き込みます

次のスニペット は、を DataFrame Avro 、 Parquet 、または Protobuf でエンコードされたデータファイルとしてカタログのインデックス レイヤー に書き込む方法を示しています。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.{DataFrame, SparkSession}
// val inputDF: DataFrame (Input data stored as a DataFrame)
// val outputCatalogHrn: HRN (HRN of the output catalog that contains the layer $outputLayerId)
// val outputLayerId: String (ID of the output index layer. If protobuf, the schema should match the non-indexed columns of $inputDF)
inputDF
  .writeLayer(outputCatalogHrn, outputLayerId)
  .save()
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Dataset<Row> inputDF (Input data stored as a DataFrame)
// HRN outputCatalogHrn (HRN of a catalog that contains the layer $outputLayerId)
// String outputLayerId (ID of the output index layer. If protobuf, the schema should match the
// non-indexed columns of $inputDF)
JavaLayerDataFrameWriter.create(inputDF)
    .writeLayer(outputCatalogHrn, outputLayerId)
    .option("olp.connector.metadata-columns", true)
    .save();

csv 形式でデータを書き込みます

次のスニペットは、を DataFrame 任意の形式のファイルとして書き込む方法を示しています。 この例では、入力 DataFrame に整数として列 field1 が、文字列として field2 のが含まれています。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter._
val sparkSession: SparkSession =
  SparkSession.builder().master("local").appName("write-csv").getOrCreate()

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame = loadDataFrame(sparkSession)

inputDataFrame
  .writeLayer(catalogHrn, layerId)
  .option("header", "true")
  .save()
sparkSession.stop()
import com.here.platform.data.client.spark.scaladsl.DataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.SerializableField;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

SparkSession sparkSession =
    SparkSession.builder().master("local").appName("write-csv").getOrCreate();

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "index-csv-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);

JavaLayerDataFrameWriter.create(inputDataFrame)
    .writeLayer(catalogHrn, layerId)
    .option("header", "true")
    .save();
sparkSession.stop();

テキスト形式でデータを書き込みます

次のスニペットは、を DataFrame 任意の形式のファイルとして書き込む方法を示しています。 この例では、入力 DataFramedata message が文字列である列が含まれています。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter._
val sparkSession: SparkSession =
  SparkSession.builder().master("local").appName("write-text").getOrCreate()

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame = loadDataFrame(sparkSession)

inputDataFrame
  .writeLayer(catalogHrn, layerId)
  .option("header", "true")
  .save()
sparkSession.stop()
import com.here.platform.data.client.spark.scaladsl.DataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.SerializableField;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

SparkSession sparkSession =
    SparkSession.builder().master("local").appName("write-text").getOrCreate();

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "index-json-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);

JavaLayerDataFrameWriter.create(inputDataFrame)
    .writeLayer(catalogHrn, layerId)
    .option("header", "true")
    .save();
sparkSession.stop();

JSON 形式でデータを書き込みます

次のスニペットは、を DataFrame 任意の形式のファイルとして書き込む方法を示しています。 この例では、入力 DataFrame に整数として列 intVal が、文字列として strVal のが含まれています。

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter._
val sparkSession: SparkSession =
  SparkSession.builder().master("local").appName("write-json").getOrCreate()

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame = loadDataFrame(sparkSession)

inputDataFrame
  .writeLayer(catalogHrn, layerId)
  .option("header", "true")
  .save()
sparkSession.stop()
import com.here.platform.data.client.spark.scaladsl.DataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.SerializableField;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

SparkSession sparkSession =
    SparkSession.builder().master("local").appName("write-json").getOrCreate();

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "index-json-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);

JavaLayerDataFrameWriter.create(inputDataFrame)
    .writeLayer(catalogHrn, layerId)
    .option("header", "true")
    .save();
sparkSession.stop();

他の形式でデータを書き込みます

次のスニペットでは DataFrame 、を任意の形式のファイルとしてカタログのインデックス レイヤーに書き込む方法を示します。 この例では、入力 DataFramedata メッセージが文字列として含まれている列が含まれており、複数の行のデータが単純に連結されています。

Scala
Java
import com.here.platform.data.client.spark.scaladsl.{IndexDataConverter, IndexRowMetadata}
import com.here.examples.platform.data.client.spark.ExampleUtil._
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import com.here.platform.data.client.spark.scaladsl.GroupedData
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Encoders, Row, SparkSession}
// val inputDF: DataFrame (Input data stored as a DataFrame)
// val catalogHrn: HRN (HRN of the output catalog that contains the layer $layerId)
// val layerId: String (ID of the output index layer)
inputDF
  .writeLayer(catalogHrn, layerId)
  .withDataConverter(new IndexDataConverter {
    override def serializeGroup(
        rowMetadata: IndexRowMetadata,
        rows: Iterator[Row]
    ): GroupedData[IndexRowMetadata] = {
      val joinedText = rows
        .map(_.getAs[Array[Byte]]("data").map(_.toChar).mkString)
        .mkString
      GroupedData(rowMetadata, joinedText.getBytes())
    }
  })
  .save()
import com.here.platform.data.client.spark.scaladsl.DataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.SerializableField;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Dataset<Row> inputDF (Input data stored as a DataSet<Row>)
// HRN catalogHrn (HRN of the output catalog that contains the layer $layerId)
// String layerId: String (ID of the output index layer)
JavaLayerDataFrameWriter.create(inputDF)
    .writeLayer(catalogHrn, layerId)
    .withDataConverter(
        new IndexDataConverter() {
          @Override
          public GroupedData<IndexRowMetadata> serializeGroup(
              IndexRowMetadata rowMetadata, Iterator<Row> rows) {
            StringBuilder builder = new StringBuilder();
            rows.forEachRemaining(row -> builder.append(new String(row.<byte[]>getAs("data"))));
            String joinedText = builder.toString();
            return new GroupedData<>(rowMetadata, joinedText.getBytes());
          }
        })
    .save();
注 :

RSQL の詳細 については、 RSQL を参照してください。

」に一致する結果は 件です

    」に一致する結果はありません