import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
val schema: StructType = new StructType(
Array[StructField](
StructField("mt_dataHandle", DataTypes.StringType, nullable = false, Metadata.empty),
StructField("signRecognitionCount", DataTypes.IntegerType, nullable = false, Metadata.empty)
))
val reader = sparkSession
.readLayer(catalogHrn, layerId)
.format("raw")
.query(
"tileId=INBOUNDINGBOX=(23.648524, 22.689013, 62.284241, 60.218811) and eventType==SignRecognition")
.option("olp.connector.metadata-columns", true)
.option("olp.connector.query-parallelism", 100)
if (compressed)
reader.option("olp.connector.data-decompression-timeout", 1200000)
val df: DataFrame = reader.load()
val dfSignRecognitionCount: DataFrame = df.flatMap { row: Row =>
val messageList: mutable.Buffer[SdiiMessage.Message] =
SdiiMessageList.MessageList.parseFrom(row.getAs[Array[Byte]]("data")).getMessageList.asScala
messageList.map { message =>
RowFactory.create(row.getAs[Object]("mt_dataHandle"),
message.getPathEvents.getSignRecognitionCount.asInstanceOf[Object])
}
}(RowEncoder(schema))
val messagesWithAtLeastOneSignRecognition = dfSignRecognitionCount
.select("mt_dataHandle", "signRecognitionCount")
.where("signRecognitionCount > 0")
val dataHandles = messagesWithAtLeastOneSignRecognition
.map[String]((r: Row) => r.getAs[String]("mt_dataHandle"))(Encoders.STRING)
.dropDuplicates()
.collectAsList()
val count = messagesWithAtLeastOneSignRecognition.count()
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
StructType schema =
new StructType(
new StructField[] {
new StructField("mt_dataHandle", DataTypes.StringType, false, Metadata.empty()),
new StructField(
"signRecognitionCount", DataTypes.IntegerType, false, Metadata.empty())
});
Dataset<Row> df =
JavaLayerDataFrameReader.create(sparkSession)
.readLayer(catalogHrn, layerId)
.format("raw")
.query(
"tileId=INBOUNDINGBOX=(23.648524, 22.689013, 62.284241, 60.218811) and eventType==SignRecognition")
.option("olp.connector.metadata-columns", true)
.option("olp.connector.query-parallelism", 100)
.load();
Dataset<Row> dfSignRecognitionCount =
df.flatMap(
(FlatMapFunction<Row, Row>)
row ->
SdiiMessageList.MessageList.parseFrom(row.<byte[]>getAs("data"))
.getMessageList().stream()
.map(
m ->
RowFactory.create(
row.getAs("mt_dataHandle"),
m.getPathEvents().getSignRecognitionCount()))
.iterator(),
RowEncoder.apply(schema));
Dataset<Row> messagesWithAtLeastOneSignRecognition =
dfSignRecognitionCount
.select("mt_dataHandle", "signRecognitionCount")
.where("signRecognitionCount > 0");
List<String> dataHandles =
messagesWithAtLeastOneSignRecognition
.map((MapFunction<Row, String>) row -> row.getAs("mt_dataHandle"), Encoders.STRING())
.dropDuplicates()
.collectAsList();
long count = messagesWithAtLeastOneSignRecognition.count();