増分データの検証

この章では、データ プロセッシング ライブラリの新機能について説明します。また、 API は今後のバージョンで変更される可能性があります。 検証スイートは現在、 Scala 言語でのみ利用できます。

データ検証ワークフロー

カタログ内のデータセットを検証すると、データの変更によってライブラリまたはサービスが予期せぬ動作を示したり、動作を停止したりする状況を防ぐことができます。 さらに、地図データなどの大規模なデータセットでは、体系的かつ完全なカバレッジテストが必要です。

データ プロセッシング ライブラリでは、検証ワークフローは次の要素で構成されています。

  1. 機能の抽出—抽出ロジックは入力カタログのデータを読み取り、並列で検証できる自己完結型のデータのパーティションにグループ化します。
  2. 検証—検証ロジックは、一連の受け入れ条件に基づいて各テストデータパーティションを検証します。 テストレポートを出力し、一連のテストメトリックを抽出します。
  3. 評価—評価ロジックは、テストメトリクスを検査して、入力データの品質について最終的な決定を行います。 結果が公開され、元のデータリリース候補の入力カタログのライブ展開をさらにゲートまたはトリガーするために使用できます。

データ プロセッシング ライブラリは com.here.platform.data.processing.validation 、パッケージ内でこれらのフェーズを実装するための特定のクラスおよび変換を提供します。

フィーチャーの抽出

検証モジュールは、入力データからテスト機能を抽出する特定の API を提供していません。 ただし、は抽出ロジックを使用してDeltaSet[K, TestData]を提供します。Kは、地理的に分割されたデータの場合Partition.HereTile、または地理的に分割されていないデータの場合Partition.Genericに使用されます。TestDataは、テスト対象のデータを表すユーザー定義のタイプです。 各 TestData 値は、独立してテストできる入力データの完全に指定された自己完結型のサブセットで構成されます ( たとえば、タイルの内容と、そこから参照されているタイル ) 。

DeltaSetPartitionNamePartitioner とパーティション分割する必要 があります。これにより、テストレポートをパブリッシュしてメトリックを集約するすべての変換が確実にシャッフルされなくなります。 によって提供されるデフォルトのパーティション作成者 DeltaContext はで PartitionNamePartitioner あり、安全に使用できます。

chapter about DeltaSets 使用可能なすべての変換の説明については、を参照してください。

検証スイート

テストシナリオは、TestDataの単一のインスタンスで動作し、TestContextを介してモジュールと対話して、各テストケースの結果およびログメトリック値を登録します。 テストシナリオでは、基本クラスSuiteを拡張し、runメソッドにテストロジックを実装します。

カスタムデータは、各テスト結果に添付できます。また、カスタム GeoJSON ジオメトリを使用すると、プラットフォームポータルでテストレポートレイヤーを検査するときにレンダリングできます。

Suite クラスはサブクラス化でき TestContext 、インターフェイスを直接使用してテストシナリオを実装できます。 ただし、意図さ Suite れている使用方法は、既存のテストフレームワークと統合されたインターフェイスの組み込みの拡張機能を使用することです。 このモジュールは現在、Scalatestに基づいてこのような拡張機能を提供しています。

次のスニペットは、 Suite および TestContext インターフェイスを直接使用する方法を示しています。

import com.here.olp.util.quad.HereQuad
import com.here.platform.data.processing.validation.Suite

case class TestData(tileId: Long, nodes: Seq[Node])

class SimpleSuite extends Suite[TestData] {

  private def test(title: String)(body: => Unit)(implicit context: TestContext): Unit = {
    try {
      body
      context.registerSucceeded(title)
    } catch {
      case e: Exception => context.registerFailed(title, payload = Some(e.getMessage))
    }
  }

  override def run(data: TestData, context: TestContext): Unit = {
    implicit val ctx = context

    val bbox = new HereQuad(data.tileId).getBoundingBox

    // Every node should be inside its host tile
    data.nodes.foreach { node =>
      test("node-inside-tile") {
        require(bbox.contains(node.geometry), s"Node $node is outside the tile")
      }
    }

    // Nodes should have distinct IDs
    test("distinct-ids") {
      require(data.nodes.map(_.id).distinct.size == data.nodes.size)
    }
  }
}

上記のスニペットは、パーティションレベルのテストケース、パーティション全体のグローバルプロパティの検証、およびパーティションから抽出されたサブフィーチャーのプロパティを検証するサブパーティションレベルのテストケースの違いを示しています ( 単一のトポロジの道路やノードなど ) 。 したがって、SuiteTestDataパーティションで実行されている場合でも、モジュールはテストケースごとの統計情報を追跡し、評価フェーズで最適な粒度を提供します。

テスト Suite 自体を実装 DeltaSet する場合、 Spark 、変換、パーティション分割の概念についての知識は必要ありません。 TestData タイプの定義と受け入れ条件のセットを考慮すると、データ プロセッシング ライブラリについての知識がない開発者は、ただちにテストシナリオの作成を開始できます。 Scalatest などの一般的なテスト DSL を統合することで、この作業がさらに簡素化されます。

Scalatest 統合

com.here.platform.data.processing.validation.scalatestパッケージには、 TestDataインスタンスおよびTestContextへのアクセスを提供する、org.scalatest.Suite内で混合する一連の特性が含まれています。 利用可能な Scalatest ドメイン固有の言語を使用してテストを記述でき、結果が自動的に登録されます。 特に Bindings 、この特性は、現在のテストコンテキストおよびテスト対象のデータへのアクセスを提供します。 PayloadAndGeometry この特性は、各テスト結果に自動的に添付されるカスタムデータおよびジオメトリを登録するメソッドを提供します。 ネスト org.scalatest.Suiteされたを実装し TestData て、パーティションから抽出されたサブフィーチャーをテストできます。 以下のスニペットは、前述の例と同じ例を示しています。今回は Scalatest で実装されました。

import com.here.olp.util.quad.HereQuad
import com.here.platform.data.processing.validation.scalatest.ScalatestSuite
import com.here.platform.data.processing.validation.scalatest.Bindings
import com.here.platform.data.processing.validation.scalatest.PayloadAndGeometry
import org.scalatest.Suite
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import com.here.platform.data.processing.validation.SuiteCompiler
import scala.collection.immutable

case class TestData(tileId: Long, nodes: Seq[Node])

class SuiteWithScalatest extends AnyFunSuite with Matchers with Bindings[TestData] {
  private val bbox = new HereQuad(data.tileId).getBoundingBox

  // partition level tests
  test("distinct-ids") {
    data.nodes.iterator.map(_.id).toSet.size shouldBe data.nodes.size
  }

  // feature level tests
  override val nestedSuites: immutable.IndexedSeq[Suite] =
    data.nodes.iterator.map(new NodeSpec(_)).toIndexedSeq

  class NodeSpec(node: Node) extends AnyFunSuite with Matchers with PayloadAndGeometry {
    override val onFailPayload: Option[Any] = Some(node)

    test("node-inside-tile") {
      bbox.contains(node.geometry) shouldBe true
    }
  }
}

ネスト org.scalatest.Suiteされたを使用すると、 Scalatest の「似 たように動作する」ことが推奨されます。なぜなら、ネストされたが多くのテストケースが作成される可能性があるからです。この場合、ネストされたスイートの数は多くなるのではなく、 Scalatest によって効率的に実行されません。

次のスニペットに示すように、ScalatestSuiteクラスを使用してorg.scalatest.SuiteクラスをSuiteに適応できます。

val suite = new ScalatestSuite(classOf[SuiteWithScalatest])

メトリクスとアキュムレータ

各スイート、および各テストケースについて、検証モジュールは失敗数と成功数を追跡します。 この情報は、後で Metrics オブジェクトに保存され、汎用 Accumulatorに保存されたカスタムの累積メトリック値とともに集約されます

ライブラリには、LongDouble値を蓄積して追跡するための一連のビルトインAccumulator実装が用意されています。 を使用して TestContext.withAccumulator 、既存のアキュムレータを作成または更新できます。

import com.here.platform.data.processing.validation._

class MySuite extends Suite[TestData] {

  override def run(data: TestData, context: TestContext): Unit = {
    context.withAccumulator[LongAccumulator]("some-long-accumulator")(_ + 42)
    context.withAccumulator[DoubleAccumulator]("some-double-accumulator")(_ + 3.1415)
    context.withAccumulator[AggregatedLongAccumulator](
      "some-long-accumulator-with-aggregated-stats")(_ + 1)
  }
}

Accumulatorインターフェイスをサブクラス化することで、カスタムAccumulatorクラスを実装できます。 デフォルト の JSON シリアライザを使用する場合Metrics カスタム Accumulator クラスの追加のタイプヒントを使用してシリアライザ / デシリアライザを補強する必要があります。

import com.here.platform.data.processing.validation.Serialization.{Deserializer, Serializer}
import com.here.platform.data.processing.validation._
import org.json4s.{Formats, ShortTypeHints}

case class SetAccumulator(set: Set[Long]) extends Accumulator[SetAccumulator] {
  override def merge(other: SetAccumulator): SetAccumulator = SetAccumulator(set ++ other.set)

  def +(value: Long): SetAccumulator = SetAccumulator(set + value)
}

object MyMetricsSerializers {
  // augment default metrics formats with type hint for the custom accumulator
  val formats: Formats = DefaultJsonSerializers.metricsFormats + ShortTypeHints(
    List(classOf[SetAccumulator]))

  // Metrics serializer and deserializer that know how to serialize/deserialize the custom
  // accumulator
  implicit val metricsSerializer: Serializer[Metrics] = new JsonSerializer[Metrics](formats)
  implicit val metricsDeserializer: Deserializer[Metrics] =
    new JsonDeserializer[Metrics](formats)
}

case class TestData(poiCategories: Seq[Int])

class MySuite extends Suite[TestData] {

  override def run(data: TestData, context: TestContext): Unit = {
    data.poiCategories.foreach { category =>
      context.withAccumulator[SetAccumulator]("all-poi-categories")(_ + category)
    }
  }
}

実行中、公開中、およびメトリクスの集約

DeltaSet[K, TestData] 分散 TestData パーティションが含まれているが、 Suite[TestData] テストシナリオを実装している場合 ReportMetricsPayload は、テストスイートをテストの削除セットのすべての値にマップし、返されたテストを取得して、それらをにシリアライズし、ペイロードを正しい出力レイヤーにマップする必要があります。

A SuiteCompiler がこれをすべて処理します。 Suite または スイートとTestData そのインスタンスのコレクションが、 Map[Layer.Id, Payload] エンコードされたテストレポートとメトリクスがレポートレイヤー(またはレイヤー)およびメトリクスレイヤーにマップされたを返します。

次のスニペットに示すように、SuiteCompilerは通常、DeltaSetmapValues変換を使用してテストデータにマップされます。

// implicit to enable the validation implicit transformations
implicit val deltaContext: DeltaContext = ???
val testData: DeltaSet[Partition.HereTile, TestData] = ???

import deltaContext.transformations._

import com.here.platform.data.processing.validation.DefaultJsonSerializers._

val compiler = new SuiteCompiler(new MySuite)

val reportAndMetricsPayloads: DeltaSet[Partition.HereTile, Map[Layer.Id, Payload]] =
  testData.mapValues(compiler.compile)

または、 TestData 現在 のパーティションによって参照されている追加の入力データを実際には解決せずに取得するように API を提供できます。 mapValuesWithResolver は、次 のコード スニペットに示すように、ResolverインスタンスをTestDataコンストラクタに渡して、この処理を行うために使用できます。

class TestData(retriever: Retriever, resolver: Resolver, val partition: DecodedData) {

  def getReference(key: Partition.Key): Option[DecodedData] =
    resolver
      .resolve(key)
      .map(meta => DecodedData.parseFrom(retriever.getPayload(key, meta).content))
}

// implicit to enable the validation implicit transformations
implicit val deltaContext: DeltaContext = ???
val inputData: DeltaSet[Partition.Key, Partition.Meta] = ???
val catalogId: Catalog.Id = ???
val layerId: Layer.Id = ???
val retriever: Retriever = deltaContext.inRetriever(catalogId)

import deltaContext.transformations._
import com.here.platform.data.processing.validation.DefaultJsonSerializers._

val compiler = new SuiteCompiler(new MySuite)

val reportAndMetricsPayloads: DeltaSet[Partition.HereTile, Map[Layer.Id, Payload]] = inputData
  .mapValuesWithResolver(
    {
      case (resolver, key, meta) =>
        compiler.compile(
          new TestData(retriever,
                       resolver,
                       DecodedData.parseFrom(retriever.getPayload(key, meta).content)))
    },
    List(
      // resolution strategies
      DirectQuery(catalogId, Set(layerId))
    )
  )
  .mapKeys(OneToOne.toHereTile(catalogId, layerId), PreservesPartitioning)

同じTestDataでパラメータ化された複数のSuiteインスタンス を スイートのコレクションにグループ化し て、同じSuiteCompilerから使用 できます。

class NodeSuite extends Suite[TestData] {
  override def run(data: TestData, context: TestContext): Unit = ???
}

class SegmentSuite extends Suite[TestData] {
  override def run(data: TestData, context: TestContext): Unit = ???
}

val suiteCompiler = new SuiteCompiler(Suites(new NodeSuite, new SegmentSuite))

デフォルト SuiteCompiler では、は「レポート」レイヤーにテストレポートを公開し、「メトリクス」レイヤーにテストメトリクスを公開しますが、これらのデフォルトは変更できます。 を使用 SuiteCompiler して複数のスイートを実行する場合は、スイートごとに異なるレポートレイヤーを指定できます。

A SuiteCompilerReport は、クラスおよび Metrics クラスに暗黙的なシリアライザ / デシリアライザが必要です。 上記のスニペットは JSON を使用してテストレポートおよびメトリクスをエンコードする既定のシリアライザを使用しています。

DeltaSet[K, TestData]SuiteCompilerを適用するとDeltaSet[K, Map[Layer.Id, Payload]]Kは、入力データまたはフィーチャー抽出ロジックの元のパーティション分割に基づいて、Partition.HereTileまたはPartition.Genericのいずれかになります。 で DeltaSet[K, Map[Layer.Id, Payload]] は、暗黙的な変換を選択して、をパブリッシュ Transformationsできます。

これらのすべての変換で、対応する出力レイヤーのテストレポートとメトリクスが公開され、公開されたメトリクスパーティションが再帰的に集計されて、完全に集約された単一のメトリクスパーティションが構築されます。 集約の実現方法は、のパーティション分割 TestData (のタイプ K)によって異なります。 HERE Tile の分割されたメトリクスは、段階的に高いズームレベルで集計されます。 以下のスニペットは、このようなシナリオを示しています。

// implicit to enable the validation implicit transformations
implicit val deltaContext: DeltaContext = ???
val testData: DeltaSet[Partition.HereTile, TestData] = ???

import deltaContext.transformations._

import com.here.platform.data.processing.validation.DefaultJsonSerializers._
import com.here.platform.data.processing.validation.Transformations._

val compiler = new SuiteCompiler(new MySuite)

val (reportAndMetricsPublishedSet, aggregatedMetrics) = testData
  .mapValues(compiler.compile)
  .publishAndAggregateByLevel(compiler.outLayers, compiler.metricsLayer)

デフォルト publishAndAggregateByLevel では、出力メトリクスレイヤーで設定されているすべてのズームレベルをズーム レベル 0 まで走査します。 レベル 0 (マップ全体をカバーするルート HERE Tile )は、有効なタイルレベルのセットに含める必要があります。

一般的に分割されたメトリクス ( 管理者階層、電話番号、その他の非地理的データなど ) は、固定数のステップで集計されます。このステップでは、各ステップで集計されたパーティションの数を指定できます。

// implicit to enable the validation implicit transformations
implicit val deltaContext: DeltaContext = ???
val testData: DeltaSet[Partition.Generic, TestData] = ???

import deltaContext.transformations._

import com.here.platform.data.processing.validation.DefaultJsonSerializers._
import com.here.platform.data.processing.validation.Transformations._

val compiler = new SuiteCompiler(new MySuite)

val (reportAndMetricsPublishedSet, aggregatedMetrics) = testData
  .mapValues(compiler.compile)
  .publishAndAggregateByHash(compiler.outLayers, compiler.metricsLayer, Seq(1000, 100, 10, 1))

これらのメソッドはいずれも、後で評価するために、テストレポートおよびメトリックのPublishedSetを返し、 1 つの完全に集約されたMetricsパーティションを含むDeltaSet[Partition.Key, Metrics]を返します。

コンパイルプロセスの一部としての検証

必要に SuiteCompiler応じて、を使用せずにスイートを手動で実行できます。 たとえば、リリース候補カタログをコンパイルしている同じパイプラインからテストシナリオを実行し、出力データが厳しい受け入れ条件に準拠していない場合にただちにバッチジョブを中止するか、または品質マーカーを追加することができます。

val deltaContext: DeltaContext = ???

import deltaContext.transformations._

val outputLayer: Layer.Id = ???

val candidateOutputData: DeltaSet[Partition.Key, OutputData] = ???

val suite = new MySuite

// fail the pipeline if at least one test-case has failed
val validatedOutputPayloads = candidateOutputData.mapValues { data =>
  val (report, metrics) = suite.run(data)
  require(metrics.stats.failed == 0, "Validation failed")
  Payload(data.toByteArray)
}

val outputPublishedSet = validatedOutputPayloads.publish(Set(outputLayer))

評価

assess 変換をに適用 DeltaSet[Partition.Key, Metrics] して、最終的な品質保証評価を含むカスタム評価タイプをコンパイルできます。 通常は、検証が成功したかどうかを示すブール型値が含まれますが、ユースケースごとのカスタム評価を含めることもできます。

case class MyAssessment(isSucceeded: Boolean, failureRate: Double)

val (reportAndMetricsPublishedSet, aggregatedMetrics) = testData
  .mapValues(compiler.compile)
  .publishAndAggregateByLevel(compiler.outLayers, compiler.metricsLayer)

implicit val assessmentSerializer = new JsonSerializer[MyAssessment](DefaultFormats)

val assessmentPublishedSet = aggregatedMetrics.assess[MyAssessment]() { metrics =>
  val failureRate = metrics.stats.failed.toDouble / metrics.stats.total.toDouble
  MyAssessment(failureRate <= 0.05, failureRate)
}

複数のSuiteCompilerが異なるTestDataタイプまたは異なるパーティションスキームにマップされている場合、1 つのSuiteCompilerごとに 1 つずつのシーケンスDeltaSet[Partition.Key, Metrics]が作成されます。 assess この場合も、一連の拡張ファイルで変換を使用できます。この場合 Metrics 、別の SuiteCompilerによって生成されたパーティションがさらに集約されます。

case class Assessment(isSucceeded: Boolean)
implicit val deltaContext: DeltaContext = ???
val roadTestData: DeltaSet[Partition.HereTile, RoadTestData] = ???
val adminTestData: DeltaSet[Partition.Generic, AdminTestData] = ???

import deltaContext.transformations._

import com.here.platform.data.processing.validation.DefaultJsonSerializers._
import com.here.platform.data.processing.validation.Transformations._

val roadTestCompiler = new SuiteCompiler(new RoadSuite)
val adminTestCompiler = new SuiteCompiler(new AdminSuite)

val (roadPublishedSet, roadMetrics) = roadTestData
  .mapValues(roadTestCompiler.compile)
  .publishAndAggregateByLevel(roadTestCompiler.outLayers, roadTestCompiler.metricsLayer)

val (adminPublishedSet, adminMetrics) = adminTestData
  .mapValues(adminTestCompiler.compile)
  .publishAndAggregateByHash(roadTestCompiler.outLayers,
                             roadTestCompiler.metricsLayer,
                             Seq(1000, 100, 10, 1))

implicit val assessmentSerializer = new JsonSerializer[Assessment](DefaultFormats)

val assessmentPublishedSet =
  Seq(roadMetrics, adminMetrics).assess[Assessment]()(metrics =>
    Assessment(metrics.stats.failed == 0))

2 つの異なる SuiteCompilerは、同じレポートおよびメトリックスレイヤーに公開できません。

テストレポートとメトリクスをレンダリングしています

デフォルトの JSON シリアライザを使用する場合は、 HERE タイルレポートおよびメトリックスのレイヤーを次のスキーマ HRNS で設定できます。

  • hrn:here:schema:::com.here.platform.data.processing.validation.schema:report_v2:1.0.0
  • hrn:here:schema:::com.here.platform.data.processing.validation.schema:metrics_v2:1.0.0

これらのスキーマには、テストレポートに保存されているジオメトリを描画するレンダリングプラグインや、ヒートマップとしてメトリクスをレンダリングするプラグインが含まれています。

テストレポートをレンダリングしました
テストレポートをレンダリングしました
テストメトリクスをレンダリングしました
テストメトリクスをレンダリングしました

」に一致する結果は 件です

    」に一致する結果はありません