import com.here.platform.data.client.model.geojson.{Feature, Geometry, Point}
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import org.apache.spark.sql.{Row, SparkSession}
import com.here.platform.data.client.spark.internal.InteractiveMapPartitionHelper
import com.here.platform.pipeline.PipelineContext
import scala.collection.JavaConverters._
import scala.collection.mutable
val numObjectsInX = 360
val numObjectsInY = 18
val newRows = new mutable.MutableList[Row]
val schema = InteractiveMapPartitionHelper.writeSchema
log.info("Generating test data, " + (numObjectsInX * numObjectsInY) + " objects")
for (x <- 0 until numObjectsInX)
for (y <- 0 until numObjectsInY) {
val oid = "X" + x + "_Y" + y
val coords = List[Double](
((360.0 / numObjectsInX.toDouble) * x) - 180.0,
((180.0 / numObjectsInY.toDouble) * y) - 90.0
)
val geo: Geometry = new Point.Builder().withCoordinates(coords.map(Double.box).asJava).build
val properties = mutable.Map[String, Any]()
properties += ("info" -> ("Testobject #" + (y * numObjectsInY + x).toString))
properties += ("row" -> y)
properties += ("col" -> x)
val feature = new Feature.Builder()
.withId(oid)
.withGeometry(geo)
.withProperties(properties.asJava)
.withCustomMember("foo", "bar")
.build
newRows += InteractiveMapPartitionHelper.toRow(feature, schema)
}
log.info("Creating dataframe from test data.")
val writeDF = sparkSession.createDataFrame(
sparkSession.sparkContext.parallelize(newRows),
schema
)
log.info("Writing test data to layer " + layerId)
writeDF
.writeLayer(catalogHrn, layerId)
.option("olp.connector.write-batch-size", 1000)
.save()
import com.here.platform.data.client.model.geojson.Feature;
import com.here.platform.data.client.model.geojson.Geometry;
import com.here.platform.data.client.model.geojson.Point;
import com.here.platform.data.client.spark.internal.InteractiveMapPartitionHelper;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
final int numObjectsInX = 360;
final int numObjectsInY = 18;
List<Row> newRows = new ArrayList<>(numObjectsInX * numObjectsInY);
log.info("Generating test data, " + (numObjectsInX * numObjectsInY) + " objects");
StructType schema = InteractiveMapPartitionHelper.writeSchema();
for (int x = 0; x < numObjectsInX; x++) {
for (int y = 0; y < numObjectsInY; y++) {
String oid = "X" + x + "_Y" + y;
List<Double> coordinates = new ArrayList<>(2);
coordinates.add(((360.0 / (double) numObjectsInX) * x) - 180.0);
coordinates.add(((180.0 / (double) numObjectsInY) * y) - 90.0);
Geometry geo = new Point.Builder().withCoordinates(coordinates).build();
Map<String, Object> properties = new HashMap<>();
properties.put("info", new StringBuilder("Testobject #").append(y * numObjectsInY + x));
properties.put("row", y);
properties.put("col", x);
Feature f =
new Feature.Builder().withGeometry(geo).withId(oid).withProperties(properties).build();
Row row = InteractiveMapPartitionHelper.toRow(f, schema);
newRows.add(row);
}
}
log.info("Creating dataframe from test data.");
Dataset<Row> writeDF =
sparkSession
.createDataFrame(
JavaSparkContext.fromSparkContext(sparkSession.sparkContext()).parallelize(newRows),
schema)
.drop("mt_datahub");
log.info("Writing test data to layer " + layerId);
JavaLayerDataFrameWriter.create(writeDF)
.writeLayer(catalogHrn, layerId)
.option("olp.connector.write-batch-size", 1000)
.save();
long res = writeDF.count();