インタラクティブなマップ レイヤーデータを書き込みます

このspark-supportモジュールには、 インタラクティブなマップレイヤーに DataFrame を書き込むためのカスタム Spark DataFrameWriter であるクラスLayerDataFrameWriterが用意されています。

プロジェクトの依存関係

HERE platform Spark コネクターを使用してインタラクティブマップ レイヤーにデータを書き込むアプリケーションを作成する場合 は、「 Spark コネクターの依存関係」の章で説明されているように、必要な依存関係をプロジェクトに追加してください。

形式

Spark コネクタは、インタラクティブなマップ レイヤー固有の形式の書き込み機能を提供します。 インタラクティブなマップ レイヤー形式 / スキーマは GeoJSON をベースにしているため、本質的に柔軟性があります。

書き込みプロセス

インタラクティブなマップレイヤーの場合 DataFrame 、行は Publish API を使用してインタラクティブなマップ レイヤーに直接公開されます。オブジェクトがレイヤーに「アップサート」されます。つまり、オブジェクトがすでに存在し、存在しない場合は更新されます。

データフィールド

DataFrameインタラクティブマップ レイヤーに書き込むには、次の構造に従う必要があります。

列名 データ型 意味
mt_id 文字列 オブジェクトの OID
geometry オブジェクトのジオメトリ。最初のフィールドにはタイプが含まれ、 2 番目のフィールドには座標が含まれます
properties 地図 縮小された形式のオブジェクトのプロパティ
custom_members 地図 非標準のトップレベルフィールドが縮小された形式で表示されます
mt_tags 配列 オブジェクトのタグ

追加の列は存在できますが、無視されることに注意してください。

注意

注 : String プロパティおよびカスタムメンバーマップの値に対する制限

プロパティマップまたは custom_members マップのいずれかに String 値を記述する場合は 、値の一部として文字列値の先頭と末尾に二重引用符が使用されていることを確認してください。 申し訳ありませんが、 JSON のシリアル化が書き込み中に正しく動作するようにするには、この処理が必要になります。

インタラクティブなマップ レイヤーにデータを書き込みます

次のスニペットでは、インタラクティブなマップ レイヤーでDataFrame[Row](Dataset<Row>) を作成して書き込む方法を示します。

Scala
Java
import com.here.platform.data.client.model.geojson.{Feature, Geometry, Point}
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import org.apache.spark.sql.{Row, SparkSession}
import com.here.platform.data.client.spark.internal.InteractiveMapPartitionHelper
import com.here.platform.pipeline.PipelineContext
import scala.collection.JavaConverters._
import scala.collection.mutable
val numObjectsInX = 360
val numObjectsInY = 18

val newRows = new mutable.MutableList[Row]

val schema = InteractiveMapPartitionHelper.writeSchema

log.info("Generating test data, " + (numObjectsInX * numObjectsInY) + " objects")
for (x <- 0 until numObjectsInX)
  for (y <- 0 until numObjectsInY) {
    val oid = "X" + x + "_Y" + y
    val coords = List[Double](
      ((360.0 / numObjectsInX.toDouble) * x) - 180.0,
      ((180.0 / numObjectsInY.toDouble) * y) - 90.0 // Stay in Mercator's bounds.
    )
    val geo: Geometry = new Point.Builder().withCoordinates(coords.map(Double.box).asJava).build

    // NOTE: You MUST ensure that a properties Map is set when creating Features this way!
    // It may be empty, but it absolutely MUST be there.
    val properties = mutable.Map[String, Any]()
    properties += ("info" -> ("Testobject #" + (y * numObjectsInY + x).toString))
    properties += ("row" -> y)
    properties += ("col" -> x)

    val feature = new Feature.Builder()
      .withId(oid)
      .withGeometry(geo)
      .withProperties(properties.asJava)
      .withCustomMember("foo", "bar")
      .build

    newRows += InteractiveMapPartitionHelper.toRow(feature, schema)
  }

log.info("Creating dataframe from test data.")
val writeDF = sparkSession.createDataFrame(
  sparkSession.sparkContext.parallelize(newRows),
  schema
)

log.info("Writing test data to layer " + layerId)
writeDF
  .writeLayer(catalogHrn, layerId)
  .option("olp.connector.write-batch-size", 1000)
  .save()
import com.here.platform.data.client.model.geojson.Feature;
import com.here.platform.data.client.model.geojson.Geometry;
import com.here.platform.data.client.model.geojson.Point;
import com.here.platform.data.client.spark.internal.InteractiveMapPartitionHelper;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;

final int numObjectsInX = 360;
final int numObjectsInY = 18;

List<Row> newRows = new ArrayList<>(numObjectsInX * numObjectsInY);

// Create data.
// It's easier to create the Features and make Rows out of them than to create the rows
// directly.
log.info("Generating test data, " + (numObjectsInX * numObjectsInY) + " objects");
StructType schema = InteractiveMapPartitionHelper.writeSchema();
for (int x = 0; x < numObjectsInX; x++) {
  for (int y = 0; y < numObjectsInY; y++) {
    String oid = "X" + x + "_Y" + y;

    List<Double> coordinates = new ArrayList<>(2);
    coordinates.add(((360.0 / (double) numObjectsInX) * x) - 180.0);
    coordinates.add(((180.0 / (double) numObjectsInY) * y) - 90.0);
    Geometry geo = new Point.Builder().withCoordinates(coordinates).build();

    // NOTE: You MUST ensure that a properties Map is set when creating Features this way!
    // It may be empty, but it absolutely MUST be there.
    Map<String, Object> properties = new HashMap<>();
    properties.put("info", new StringBuilder("Testobject #").append(y * numObjectsInY + x));
    properties.put("row", y);
    properties.put("col", x);

    Feature f =
        new Feature.Builder().withGeometry(geo).withId(oid).withProperties(properties).build();

    Row row = InteractiveMapPartitionHelper.toRow(f, schema);
    newRows.add(row);
  }
}

// Create a dataframe from the created rows.
log.info("Creating dataframe from test data.");
Dataset<Row> writeDF =
    sparkSession
        .createDataFrame(
            JavaSparkContext.fromSparkContext(sparkSession.sparkContext()).parallelize(newRows),
            schema)
        .drop("mt_datahub");

// Now do the actual writing.
log.info("Writing test data to layer " + layerId);
JavaLayerDataFrameWriter.create(writeDF)
    .writeLayer(catalogHrn, layerId)
    .option("olp.connector.write-batch-size", 1000)
    .save();

long res = writeDF.count();

」に一致する結果は 件です

    」に一致する結果はありません