import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, count, first, lit}
import org.apache.spark.sql.types.IntegerType
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-protobuf").getOrCreate()
val inputDataFrame = loadDataFrame(sparkSession)
inputDataFrame.show()
val computationDataFrame = inputDataFrame
.groupBy("text")
.agg(count("text").as("count"), first("partition_name").as("mt_partition"))
val outDataFrame = computationDataFrame
.withColumn("count", col("count").cast(IntegerType))
.withColumn("mt_timestamp", lit(System.currentTimeMillis))
outDataFrame.show()
outDataFrame
.writeLayer(catalogHrn, layerId)
.option("olp.connector.metadata-columns", true)
.save()
sparkSession.stop()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.first;
import static org.apache.spark.sql.functions.lit;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-protobuf").getOrCreate();
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "versioned-protobuf-layer";
Dataset<Row> inputDataset = loadDataFrame(sparkSession);
inputDataset.show();
Dataset<Row> computationDataset =
inputDataset
.groupBy("text")
.agg(count("text").as("count"), first("partition_name").as("mt_partition"));
Dataset<Row> outDataset =
computationDataset
.withColumn("count", col("count").cast(IntegerType))
.withColumn("mt_timestamp", lit(System.currentTimeMillis()));
outDataset.show();
JavaLayerDataFrameWriter.create(outDataset)
.writeLayer(catalogHrn, layerId)
.option("olp.connector.metadata-columns", true)
.save();
sparkSession.stop();