import com.here.platform.data.client.spark.scaladsl.{IndexDataConverter, IndexRowMetadata}
import com.here.examples.platform.data.client.spark.ExampleUtil._
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import com.here.platform.data.client.spark.scaladsl.GroupedData
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Encoders, Row, SparkSession}
inputDF
.writeLayer(catalogHrn, layerId)
.withDataConverter(new IndexDataConverter {
override def serializeGroup(
rowMetadata: IndexRowMetadata,
rows: Iterator[Row]
): GroupedData[IndexRowMetadata] = {
val joinedText = rows
.map(_.getAs[Array[Byte]]("data").map(_.toChar).mkString)
.mkString
GroupedData(rowMetadata, joinedText.getBytes())
}
})
.save()
import com.here.platform.data.client.spark.scaladsl.DataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.SerializableField;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
JavaLayerDataFrameWriter.create(inputDF)
.writeLayer(catalogHrn, layerId)
.withDataConverter(
new IndexDataConverter() {
@Override
public GroupedData<IndexRowMetadata> serializeGroup(
IndexRowMetadata rowMetadata, Iterator<Row> rows) {
StringBuilder builder = new StringBuilder();
rows.forEachRemaining(row -> builder.append(new String(row.<byte[]>getAs("data"))));
String joinedText = builder.toString();
return new GroupedData<>(rowMetadata, joinedText.getBytes());
}
})
.save();