増分データの検証
注 この章では、データ プロセッシング ライブラリの新機能について説明します。また、 API は今後のバージョンで変更される可能性があります。 検証スイートは現在、 Scala 言語でのみ利用できます。
データ検証ワークフロー カタログ内のデータセットを検証すると、データの変更によってライブラリまたはサービスが予期せぬ動作を示したり、動作を停止したりする状況を防ぐことができます。 さらに、地図データなどの大規模なデータセットでは、体系的かつ完全なカバレッジテストが必要です。
データ プロセッシング ライブラリでは、検証ワークフローは次の要素で構成されています。
機能の抽出—抽出ロジックは入力カタログのデータを読み取り、並列で検証できる自己完結型のデータのパーティションにグループ化します。 検証—検証ロジックは、一連の受け入れ条件に基づいて各テストデータパーティションを検証します。 テストレポートを出力し、一連のテストメトリックを抽出します。 評価—評価ロジックは、テストメトリクスを検査して、入力データの品質について最終的な決定を行います。 結果が公開され、元のデータリリース候補の入力カタログのライブ展開をさらにゲートまたはトリガーするために使用できます。 データ プロセッシング ライブラリは com.here.platform.data.processing.validation
、パッケージ内でこれらのフェーズを実装するための特定のクラスおよび変換を提供します。
検証モジュールは、入力データからテスト機能を抽出する特定の API を提供していません。 ただし、は抽出ロジックを使用してDeltaSet[K, TestData]
を提供します。K
は、地理的に分割されたデータの場合Partition.HereTile
、または地理的に分割されていないデータの場合Partition.Generic
に使用されます。TestData
は、テスト対象のデータを表すユーザー定義のタイプです。 各 TestData
値は、独立してテストできる入力データの完全に指定された自己完結型のサブセットで構成されます ( たとえば、タイルの内容と、そこから参照されているタイル ) 。
DeltaSet
は PartitionNamePartitioner
とパーティション分割する必要 があります。これにより、テストレポートをパブリッシュしてメトリックを集約するすべての変換 が確実にシャッフルされなくなります。 によって提供されるデフォルトのパーティション作成者 DeltaContext
はで PartitionNamePartitioner
あり、安全に使用できます。
chapter about DeltaSets
使用可能なすべての変換の説明については、を参照してください。
検証スイート テストシナリオは、TestData
の単一のインスタンスで動作し、TestContext
を介してモジュールと対話して、各テストケースの結果およびログメトリック値を登録します。 テストシナリオでは、基本クラスSuite
を拡張し、run
メソッドにテストロジックを実装します。
カスタムデータは、各テスト結果に添付できます。また、カスタム GeoJSON ジオメトリを使用すると、プラットフォームポータルでテストレポートレイヤーを検査するときにレンダリング できます。
Suite
クラスはサブクラス化でき TestContext
、インターフェイスを直接使用してテストシナリオを実装できます。 ただし、意図さ Suite
れている使用方法は、既存のテストフレームワークと統合されたインターフェイスの組み込みの拡張機能を使用することです。 このモジュールは現在、Scalatest
に基づいてこのような拡張機能 を提供しています。
次のスニペットは、 Suite
および TestContext
インターフェイスを直接使用する方法を示しています。
import com. here. olp. util. quad . HereQuad
import com. here. platform. data. processing. validation . Suite
case class TestData( tileId: Long , nodes: Seq[ Node] )
class SimpleSuite extends Suite[ TestData] {
private def test( title: String ) ( body: => Unit ) ( implicit context: TestContext) : Unit = {
try {
body
context. registerSucceeded( title)
} catch {
case e: Exception => context. registerFailed( title, payload = Some( e. getMessage) )
}
}
override def run( data: TestData, context: TestContext) : Unit = {
implicit val ctx = context
val bbox = new HereQuad( data. tileId) . getBoundingBox
data. nodes. foreach { node =>
test( "node-inside-tile" ) {
require( bbox. contains( node. geometry) , s"Node $node is outside the tile" )
}
}
test( "distinct-ids" ) {
require( data. nodes. map( _. id) . distinct. size == data. nodes. size)
}
}
}
上記のスニペットは、パーティションレベルのテストケース、パーティション全体のグローバルプロパティの検証、およびパーティションから抽出されたサブフィーチャーのプロパティを検証するサブパーティションレベルのテストケースの違いを示しています ( 単一のトポロジの道路やノードなど ) 。 したがって、Suite
がTestData
パーティションで実行されている場合でも、モジュールはテストケースごとの統計情報を追跡し、評価フェーズで最適な粒度を提供します。
注 テスト Suite
自体を実装 DeltaSet
する場合、 Spark 、変換、パーティション分割の概念についての知識は必要ありません。 TestData
タイプの定義と受け入れ条件のセットを考慮すると、データ プロセッシング ライブラリについての知識がない開発者は、ただちにテストシナリオの作成を開始できます。 Scalatest などの一般的なテスト DSL を統合 することで、この作業がさらに簡素化されます。
Scalatest 統合 com.here.platform.data.processing.validation.scalatest
パッケージには、 TestData
インスタンスおよびTestContext
へのアクセスを提供する、org.scalatest.Suite
内で混合する一連の特性が含まれています。 利用可能な Scalatest ドメイン固有の言語を使用してテストを記述でき、結果が自動的に登録されます。 特に Bindings
、この特性は、現在のテストコンテキストおよびテスト対象のデータへのアクセスを提供します。 PayloadAndGeometry
この特性は、各テスト結果に自動的に添付されるカスタムデータおよびジオメトリを登録するメソッドを提供します。 ネスト org.scalatest.Suite
されたを実装し TestData
て、パーティションから抽出されたサブフィーチャーをテストできます。 以下のスニペットは、前述の例と同じ例を示しています。今回は Scalatest で実装されました。
import com. here. olp. util. quad . HereQuad
import com. here. platform. data. processing. validation. scalatest . ScalatestSuite
import com. here. platform. data. processing. validation. scalatest . Bindings
import com. here. platform. data. processing. validation. scalatest . PayloadAndGeometry
import org. scalatest . Suite
import org. scalatest. funsuite . AnyFunSuite
import org. scalatest. matchers. should . Matchers
import com. here. platform. data. processing. validation . SuiteCompiler
import scala. collection. immutable
case class TestData( tileId: Long , nodes: Seq[ Node] )
class SuiteWithScalatest extends AnyFunSuite with Matchers with Bindings[ TestData] {
private val bbox = new HereQuad( data. tileId) . getBoundingBox
test( "distinct-ids" ) {
data. nodes. iterator. map( _. id) . toSet. size shouldBe data. nodes. size
}
override val nestedSuites: immutable. IndexedSeq[ Suite] =
data. nodes. iterator. map( new NodeSpec( _) ) . toIndexedSeq
class NodeSpec( node: Node) extends AnyFunSuite with Matchers with PayloadAndGeometry {
override val onFailPayload: Option[ Any ] = Some( node)
test( "node-inside-tile" ) {
bbox. contains( node. geometry) shouldBe true
}
}
}
注 ネスト org.scalatest.Suite
されたを使用すると、 Scalatest の「似 たように動作する」ことが推奨 されます。なぜなら、ネストされたが多くのテストケースが作成される可能性があるからです。この場合、ネストされたスイートの数は多くなるのではなく、 Scalatest によって効率的に実行されません。
次のスニペットに示すように、ScalatestSuite
クラスを使用してorg.scalatest.Suite
クラスをSuite
に適応できます。
val suite = new ScalatestSuite( classOf[ SuiteWithScalatest] )
メトリクスとアキュムレータ 各スイート、および各テストケースについて、検証モジュールは失敗数と成功数を追跡します。 この情報は、後で Metrics
オブジェクトに保存され、汎用 Accumulator
に保存されたカスタムの累積メトリック値とともに集約されます
ライブラリには、Long
Double
値を蓄積して追跡するための一連のビルトインAccumulator
実装が用意されています。 を使用して TestContext.withAccumulator
、既存のアキュムレータを作成または更新できます。
import com. here. platform. data. processing. validation . _
class MySuite extends Suite[ TestData] {
override def run( data: TestData, context: TestContext) : Unit = {
context. withAccumulator[ LongAccumulator] ( "some-long-accumulator" ) ( _ + 42 )
context. withAccumulator[ DoubleAccumulator] ( "some-double-accumulator" ) ( _ + 3.1415 )
context. withAccumulator[ AggregatedLongAccumulator] (
"some-long-accumulator-with-aggregated-stats" ) ( _ + 1 )
}
}
Accumulator
インターフェイスをサブクラス化することで、カスタムAccumulator
クラスを実装できます。 デフォルト の JSON シリアライザを使用する場合 、 Metrics
カスタム Accumulator
クラスの追加のタイプヒントを使用してシリアライザ / デシリアライザを補強する必要があります。
import com. here. platform. data. processing. validation . Serialization. { Deserializer, Serializer}
import com. here. platform. data. processing. validation . _
import org. json4s . { Formats, ShortTypeHints}
case class SetAccumulator( set: Set[ Long ] ) extends Accumulator[ SetAccumulator] {
override def merge( other: SetAccumulator) : SetAccumulator = SetAccumulator( set ++ other. set)
def + ( value: Long ) : SetAccumulator = SetAccumulator( set + value)
}
object MyMetricsSerializers {
val formats: Formats = DefaultJsonSerializers. metricsFormats + ShortTypeHints(
List( classOf[ SetAccumulator] ) )
implicit val metricsSerializer: Serializer[ Metrics] = new JsonSerializer[ Metrics] ( formats)
implicit val metricsDeserializer: Deserializer[ Metrics] =
new JsonDeserializer[ Metrics] ( formats)
}
case class TestData( poiCategories: Seq[ Int ] )
class MySuite extends Suite[ TestData] {
override def run( data: TestData, context: TestContext) : Unit = {
data. poiCategories. foreach { category =>
context. withAccumulator[ SetAccumulator] ( "all-poi-categories" ) ( _ + category)
}
}
}
実行中、公開中、およびメトリクスの集約 DeltaSet[K, TestData]
分散 TestData
パーティションが含まれているが、 Suite[TestData]
テストシナリオを実装している場合 Report
Metrics
Payload
は、テストスイートをテストの削除セットのすべての値にマップし、返されたテストを取得して、それらをにシリアライズし、ペイロードを正しい出力レイヤーにマップする必要があります。
A SuiteCompiler
がこれをすべて処理します。 Suite
または スイートと TestData
そのインスタンスのコレクションが、 Map[Layer.Id, Payload]
エンコードされたテストレポートとメトリクスがレポートレイヤー(またはレイヤー)およびメトリクスレイヤーにマップされたを返します。
次のスニペットに示すように、SuiteCompiler
は通常、DeltaSet
mapValues
変換を使用してテストデータにマップされます。
implicit val deltaContext: DeltaContext = ? ? ?
val testData: DeltaSet[ Partition. HereTile, TestData] = ? ? ?
import deltaContext. transformations . _
import com. here. platform. data. processing. validation . DefaultJsonSerializers. _
val compiler = new SuiteCompiler( new MySuite)
val reportAndMetricsPayloads: DeltaSet[ Partition. HereTile, Map[ Layer. Id, Payload] ] =
testData. mapValues( compiler. compile)
または、 TestData
現在 のパーティションによって参照されている追加の入力データを実際には解決せずに取得するように API を提供できます。 mapValuesWithResolver
は、次 のコード スニペットに示すように、Resolver
インスタンスをTestData
コンストラクタに渡して、この処理を行うために使用できます。
class TestData( retriever: Retriever, resolver: Resolver, val partition: DecodedData) {
def getReference( key: Partition. Key) : Option[ DecodedData] =
resolver
. resolve( key)
. map( meta => DecodedData. parseFrom( retriever. getPayload( key, meta) . content) )
}
implicit val deltaContext: DeltaContext = ? ? ?
val inputData: DeltaSet[ Partition. Key, Partition. Meta] = ? ? ?
val catalogId: Catalog. Id = ? ? ?
val layerId: Layer. Id = ? ? ?
val retriever: Retriever = deltaContext. inRetriever( catalogId)
import deltaContext. transformations . _
import com. here. platform. data. processing. validation . DefaultJsonSerializers. _
val compiler = new SuiteCompiler( new MySuite)
val reportAndMetricsPayloads: DeltaSet[ Partition. HereTile, Map[ Layer. Id, Payload] ] = inputData
. mapValuesWithResolver(
{
case ( resolver, key, meta) =>
compiler. compile(
new TestData( retriever,
resolver,
DecodedData. parseFrom( retriever. getPayload( key, meta) . content) ) )
} ,
List(
DirectQuery( catalogId, Set( layerId) )
)
)
. mapKeys( OneToOne. toHereTile( catalogId, layerId) , PreservesPartitioning)
同じTestData
でパラメータ化された複数のSuite
インスタンス を スイートのコレクション にグループ化し て、同じSuiteCompiler
から使用 できます。
class NodeSuite extends Suite[ TestData] {
override def run( data: TestData, context: TestContext) : Unit = ? ? ?
}
class SegmentSuite extends Suite[ TestData] {
override def run( data: TestData, context: TestContext) : Unit = ? ? ?
}
val suiteCompiler = new SuiteCompiler( Suites( new NodeSuite, new SegmentSuite) )
デフォルト SuiteCompiler
では、は「レポート」レイヤーにテストレポートを公開し、「メトリクス」レイヤーにテストメトリクスを公開しますが、これらのデフォルトは変更できます。 を使用 SuiteCompiler
して複数のスイートを実行する場合は、スイートごとに異なるレポートレイヤーを指定できます。
A SuiteCompiler
で Report
は、クラスおよび Metrics
クラスに暗黙的なシリアライザ / デシリアライザが必要です。 上記のスニペットは 、 JSON を使用してテストレポートおよびメトリクスをエンコードする既定のシリアライザを使用しています。
DeltaSet[K, TestData]
にSuiteCompiler
を適用するとDeltaSet[K, Map[Layer.Id, Payload]]
、K
は、入力データまたはフィーチャー抽出ロジックの元のパーティション分割に基づいて、Partition.HereTile
またはPartition.Generic
のいずれかになります。 で DeltaSet[K, Map[Layer.Id, Payload]]
は、暗黙的な変換を選択して、をパブリッシュ Transformations
できます。
これらのすべての変換で、対応する出力レイヤーのテストレポートとメトリクスが公開され、公開されたメトリクスパーティションが再帰的に集計されて、完全に集約された単一のメトリクスパーティションが構築されます。 集約の実現方法は、のパーティション分割 TestData
(のタイプ K
)によって異なります。 HERE Tile の分割されたメトリクスは、段階的に高いズームレベルで集計されます。 以下のスニペットは、このようなシナリオを示しています。
implicit val deltaContext: DeltaContext = ? ? ?
val testData: DeltaSet[ Partition. HereTile, TestData] = ? ? ?
import deltaContext. transformations . _
import com. here. platform. data. processing. validation . DefaultJsonSerializers. _
import com. here. platform. data. processing. validation . Transformations. _
val compiler = new SuiteCompiler( new MySuite)
val ( reportAndMetricsPublishedSet, aggregatedMetrics) = testData
. mapValues( compiler. compile)
. publishAndAggregateByLevel( compiler. outLayers, compiler. metricsLayer)
注 デフォルト publishAndAggregateByLevel
では、出力メトリクスレイヤーで設定されているすべてのズームレベルをズーム レベル 0 まで走査します。 レベル 0 (マップ全体をカバーするルート HERE Tile )は、有効なタイルレベルのセットに含める必要があります。
一般的に分割されたメトリクス ( 管理者階層、電話番号、その他の非地理的データなど ) は、固定数のステップで集計されます。このステップでは、各ステップで集計されたパーティションの数を指定できます。
implicit val deltaContext: DeltaContext = ? ? ?
val testData: DeltaSet[ Partition. Generic, TestData] = ? ? ?
import deltaContext. transformations . _
import com. here. platform. data. processing. validation . DefaultJsonSerializers. _
import com. here. platform. data. processing. validation . Transformations. _
val compiler = new SuiteCompiler( new MySuite)
val ( reportAndMetricsPublishedSet, aggregatedMetrics) = testData
. mapValues( compiler. compile)
. publishAndAggregateByHash( compiler. outLayers, compiler. metricsLayer, Seq( 1000 , 100 , 10 , 1 ) )
これらのメソッドはいずれも、後で評価するために、テストレポートおよびメトリックのPublishedSet
を返し、 1 つの完全に集約されたMetrics
パーティションを含むDeltaSet[Partition.Key, Metrics]
を返します。
コンパイルプロセスの一部としての検証 必要に SuiteCompiler
応じて、を使用せずにスイートを手動で実行できます。 たとえば、リリース候補カタログをコンパイルしている同じパイプラインからテストシナリオを実行し、出力データが厳しい受け入れ条件に準拠していない場合にただちにバッチジョブを中止するか、または品質マーカーを追加することができます。
val deltaContext: DeltaContext = ? ? ?
import deltaContext. transformations . _
val outputLayer: Layer. Id = ? ? ?
val candidateOutputData: DeltaSet[ Partition. Key, OutputData] = ? ? ?
val suite = new MySuite
val validatedOutputPayloads = candidateOutputData. mapValues { data =>
val ( report, metrics) = suite. run( data)
require( metrics. stats. failed == 0 , "Validation failed" )
Payload( data. toByteArray)
}
val outputPublishedSet = validatedOutputPayloads. publish( Set( outputLayer) )
評価 assess
変換をに適用 DeltaSet[Partition.Key, Metrics]
して、最終的な品質保証評価を含むカスタム評価タイプをコンパイルできます。 通常は、検証が成功したかどうかを示すブール型値が含まれますが、ユースケースごとのカスタム評価を含めることもできます。
case class MyAssessment( isSucceeded: Boolean , failureRate: Double )
val ( reportAndMetricsPublishedSet, aggregatedMetrics) = testData
. mapValues( compiler. compile)
. publishAndAggregateByLevel( compiler. outLayers, compiler. metricsLayer)
implicit val assessmentSerializer = new JsonSerializer[ MyAssessment] ( DefaultFormats)
val assessmentPublishedSet = aggregatedMetrics. assess[ MyAssessment] ( ) { metrics =>
val failureRate = metrics. stats. failed. toDouble / metrics. stats. total. toDouble
MyAssessment( failureRate <= 0.05 , failureRate)
}
複数のSuiteCompiler
が異なるTestData
タイプまたは異なるパーティションスキームにマップされている場合、1 つのSuiteCompiler
ごとに 1 つずつのシーケンスDeltaSet[Partition.Key, Metrics]
が作成されます。 assess
この場合も、一連の拡張ファイルで変換を使用できます。この場合 Metrics
、別の SuiteCompiler
によって生成されたパーティションがさらに集約されます。
case class Assessment( isSucceeded: Boolean )
implicit val deltaContext: DeltaContext = ? ? ?
val roadTestData: DeltaSet[ Partition. HereTile, RoadTestData] = ? ? ?
val adminTestData: DeltaSet[ Partition. Generic, AdminTestData] = ? ? ?
import deltaContext. transformations . _
import com. here. platform. data. processing. validation . DefaultJsonSerializers. _
import com. here. platform. data. processing. validation . Transformations. _
val roadTestCompiler = new SuiteCompiler( new RoadSuite)
val adminTestCompiler = new SuiteCompiler( new AdminSuite)
val ( roadPublishedSet, roadMetrics) = roadTestData
. mapValues( roadTestCompiler. compile)
. publishAndAggregateByLevel( roadTestCompiler. outLayers, roadTestCompiler. metricsLayer)
val ( adminPublishedSet, adminMetrics) = adminTestData
. mapValues( adminTestCompiler. compile)
. publishAndAggregateByHash( roadTestCompiler. outLayers,
roadTestCompiler. metricsLayer,
Seq( 1000 , 100 , 10 , 1 ) )
implicit val assessmentSerializer = new JsonSerializer[ Assessment] ( DefaultFormats)
val assessmentPublishedSet =
Seq( roadMetrics, adminMetrics) . assess[ Assessment] ( ) ( metrics =>
Assessment( metrics. stats. failed == 0 ) )
注 2 つの異なる SuiteCompiler
は、同じレポートおよびメトリックスレイヤーに公開できません。
テストレポートとメトリクスをレンダリングしています デフォルトの JSON シリアライザを使用する場合は、 HERE タイルレポートおよびメトリックスのレイヤーを次のスキーマ HRNS で設定できます。
hrn:here:schema:::com.here.platform.data.processing.validation.schema:report_v2:1.0.0
hrn:here:schema:::com.here.platform.data.processing.validation.schema:metrics_v2:1.0.0
これらのスキーマには、テストレポートに保存されているジオメトリを描画するレンダリングプラグインや、ヒートマップとしてメトリクスをレンダリングするプラグインが含まれています。
テストレポートをレンダリングしました テストメトリクスをレンダリングしました