DeltaSets を使用してインクリメンタル処理パイプラインを作成します

DeltaSets はデータ プロセッシング ライブラリの新機能で、 API は今後のバージョンで変更される可能 DeltaSets 性があります。 Scala でのみ使用でき、 Java プロジェクトでは使用できません。

DeltaSets は、データ プロセッシング ライブラリによって提供される新しい分散処理抽象化です。 Spark DDs と同様に、DeltaSets は、mapReduceおよびfilterByKeyなどの変換を使用してクラスタ内のデータを変換するための機能的なインターフェイスを提供します。 DDs との主な違いは、 DeltaSet 変換を必要に応じて増分計算できることです。

DeltaSets を使用 すると、カスタムコンパイルパターンを作成できます。つまり、特定のアプリケーションで必要な数のresolveFncompileInFn、、compileOutFnおよびの関数を含むコンパイラを作成できます。

デザイン

主な処理の抽象化はです DeltaSet[K, V]。ここで K 、はキーのタイプで V 、は値のタイプです。 DeltaSet は、 Spark クラスタに保存され、変換されるキーと値のペアのコレクションを表します。 キーは 1 つの値にのみ関連付けることができます。

  • Kcom.here.platform.data.processing.catalog.Partition.Keyパーティションカタログ内のプラットフォームを識別するキーであることがよくあります。 ただし、 KSerializable で 暗黙 的な順序 付けが定義されている任意のタイプを指定できます。 これらのタイプの例には、文字列、整数、およびタプルがあります。

  • V —多くの場合、プラットフォームカタログ内のデータ、または実際のデータがカタログに保存されているcom.here.platform.data.processing.blobstore.Payloadcom.here.platform.data.processing.catalog.Partition.Meta特定します。 ただし V 、整数または文字列を含めて、任意のタイプを指定できます。

たとえば、プラットフォームカタログレイヤーの内容を読み取る DeltaSet[Key, Meta]と、になります。

DeltaSet は常に不変ですが 、変換 を適用することで DeltaSet が変換されます。 たとえば、変換mapValues({x => x + 1})を使用して、すべての値が 1 ずつ増加する新しいDeltaSet[Key, Int]DeltaSet[Key, Int]を変換できます。

DeltaSet がDeltaSet[Key, Payload]に変換さ れ 、出力カタログに必要なペイロードが含まれるよう になると、 DeltaSet を公開できます。 その結果、が作成 PublishedSetされ、出力カタログの新しいバージョンとしてコミットできます。

変換 は常に遅延します。つまり、変換は出力カタログをコミットしたときにのみ実行されます。 つまり、 DeltaSet はプラットフォームカタログにコミットされるまで評価されません。

例 : レイヤーをコピーします

この例では、あるカタログから別のカタログにレイヤーをコピーするパイプラインを実装する方法を示します。

DeltaSets を使用する最も簡単な方法は、アプリケーションのMainオブジェクトでDeltaSimpleSetupを拡張することです。 パイプラインの標準設定ファイルおよびコマンド ラインオプションのサポートを追加するには、この例で はPipelineRunner特性も拡張 して ( 「ドライバの設定と実行」を参照 ) 、Mainオブジェクトに 次のスケルトンを指定します。

import com.here.platform.data.processing.catalog.{Catalog, Layer}
import com.here.platform.data.processing.driver._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.deltasets._
import com.here.platform.data.processing.driver.runner.pipeline.PipelineRunner

object Main extends PipelineRunner with DeltaSimpleSetup {

  val applicationVersion: String = "1.0"

  def setupSets(completeConfig: CompleteConfig, context: DeltaContext): Iterable[PublishedSet] = {
    ???
  }
}

パイプラインの処理ロジックを定義するsetupSetsメソッドを実装するようにMainオブジェクトをDeltaSimpleSetup要求します。 DeltaSets を使用して定義された処理ロジックは、さまざまな方法で構造化できます。通常は、次の 4 つのフェーズで構成されます。

  1. KeyMeta 1 つ以上の入力カタログからとのペアを照会すると、になり DeltaSet[Key, Meta]ます。
  2. メタデータに対応するペイロードを取得すると、DeltaSet[Key, Payload]になります。
  3. ペイロードに保存されているデータを変換し、キーを書き換えてターゲットカタログとターゲットレイヤーを保存します。
  4. 変換されたペイロードを公開 PublishedSet すると、その結果が出力カタログにコミットされます。

この例では、ステップ 3 でペイロードを変換するのではなく、変更せずにペイロードをコピーします。

  1. クエリ : 1 つ以上の入力カタログからKey AND Metaのペアを照会する場合、 setupSetsメソッドは引数 A DeltaContextとしてを提供 します。この引数には、特に入力カタログへのアクセス権が付与されます。

    import com.here.platform.data.processing.catalog.Partition._
    val keyMetas: DeltaSet[Key, Meta] =
    context.queryCatalogLayer(Catalog.Id("inCatalogA"), Layer.Id("inLayerA"))
    
  2. 取得 : Payloadメタデータに対応するを取得するに Retriever は、コンテキストから DeltaSet 変換をインポートし、対応するカタログのオブジェクトのインスタンスを取得します。 変換が可能に keyMetas なります。

    import com.here.platform.data.processing.blobstore.Payload
    import context.transformations._ // This import enables transformations on DeltaSets
    val retriever = context.inRetriever(Catalog.Id("inCatalogA"))
    val keyPayloads: DeltaSet[Key, Payload] =
    keyMetas.mapValuesWithKey((key, meta) => retriever.getPayload(key, meta))
    
  3. プロセス : キーを書き換えてターゲットカタログおよびターゲットレイヤーを保存するに mapKeys は、操作を使用します。

    val rewrittenKeys: DeltaSet[Key, Payload] =
    keyPayloads.mapKeys(
     OneToOne(
       _.copy(catalog = Default.OutCatalogId, layer = Layer.Id("outLayerA")),
       _.copy(catalog = Catalog.Id("inCatalogA"), layer = Layer.Id("inLayerA"))
     ),
     PreservesPartitioning
    )
    
  4. 公開 : 変換されたペイロードをパブリッシュするために、DeltaSet[Key, Payload]は、パブリッシュ先のレイヤーのセットを引数として受け取るpublish操作を提供します。

    val result: PublishedSet = rewrittenKeys.publish(Set(Layer.Id("inLayerA")))
    Iterable(result)
    

完全な例を以下に示します。

import com.here.platform.data.processing.catalog.{Catalog, Layer, Partition}
import com.here.platform.data.processing.driver._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.deltasets._
import com.here.platform.data.processing.driver.runner.pipeline.PipelineRunner

object Main extends PipelineRunner with DeltaSimpleSetup {

  val applicationVersion: String = "1.0"

  def setupSets(completeConfig: CompleteConfig, context: DeltaContext): Iterable[PublishedSet] = {
    import context.transformations._
    val retriever = context.inRetriever(Catalog.Id("inCatalogA"))
    Iterable(
      context
        .queryCatalogLayer(Catalog.Id("inCatalogA"), Layer.Id("inLayerA"))
        .mapValuesWithKey((key, meta) => retriever.getPayload(key, meta))
        .mapKeys(
          OneToOne[Partition.Key, Partition.Key](
            _.copy(catalog = Default.OutCatalogId, layer = Layer.Id("outLayerA")),
            _.copy(catalog = Catalog.Id("inCatalogA"), layer = Layer.Id("inLayerA"))
          ),
          PreservesPartitioning
        )
        .publish(Set(Layer.Id("outLayerA")))
    )
  }

}

変換

このセクションでは、 DeltaSets で現在使用可能な変換について説明します。

ペイロードを公開します

すべての DeltaSet は最終的に、出力カタログで公開するデータを含むペイロードのセットに変換する必要があります。 publish 操作は任意の DeltaSet タイプで利用 DeltaSet[Key, Payload] でき、この操作によってすべてのペイロードが Data API にアップロードされます。 publish 操作の結果は PublishedSet からのみ返さ setupSetsれますが、これ以上の変換はできません。 これについては、以下の例で説明します。

val payloads: DeltaSet[Key, Payload] = ???
val published: PublishedSet = payloads.publish(Set(Layer.Id("outLayer")))

publish 同じレイヤーに複数のレイヤーを追加することはできません。 出力レイヤーの一部を個別に公開する必要がある場合 publishPart は、代わりにを使用します。

拡張)複数パーツのパブリッシュ

出力レイヤー ( または出力レイヤーのセット ) のジョイント解除された部分をアップロードする場合、および各部分を個別に読み取る必要がある場合は、publishの代わりにpublishPartを使用します。 各出力キーは、を使用して、単一のパーツに確定的に割り当てる必要 PublishedPartMapperがあります。

次の例では、キー PartMapperByLayer をズーム レベルに基づいて別の公開パーツにマップするために、を使用しています。 下位のズーム レベルパーティションが最初に公開され、その後読み取りを行って上位のズーム レベルで集約パーティションを構築します。 unionPublishedParts この操作は、最終的にすべてのパーツを組み合わせて最終的なパーツにするために使用されます PublishedSet

val intermediate: DeltaSet[Key, Payload] = ???
val partMapper = PartMapperByLevel(Set(12, 11))

val firstPart: PublishedPart =
  intermediate.publishPart(Set(Layer.Id("multi-level-layer")),
                           partMapper,
                           partMapper.partForLevel(12))

val secondPart: PublishedPart = firstPart
  .readBack()
  .mapGroup({
    case (key, meta) => (key.copy(partition = key.partition.ancestors.head), (key, meta))
  }, context.defaultPartitioner)
  .mapValues { partitions: Iterable[(Key, Meta)] =>
    val payload: Payload = Aggregator.aggregate(partitions)
    payload
  }
  .publishPart(Set(Layer.Id("multi-level-layer")), partMapper, partMapper.partForLevel(11))

context.unionPublishedParts(Seq(firstPart, secondPart))

値を変換します

mapValues DeltaSet mapValuesWithKey 内の値を変換するには、およびを使用します。 これらの操作では DeltaSet のキーは変更されないため、実行時にクラスタ内のワーカーノード間でデータをシャッフルする必要はありません。 このため、これらの操作は非常に効率的です。

次の例では、 a の値 DeltaSet[Key, Int] がを使用して 1 つずつ増加します mapValues

val integers: DeltaSet[Key, Int] = ???
val incrementedIntegers: DeltaSet[Key, Int] = integers.mapValues(_ + 1)

mapValuesWithKey は類似した操作ですが、変換関数のキーも提供されます。 この操作の詳細については、レイヤーのコピーの例を参照してください。

キーと値を変換します

キーと値の両方を同時に変換する場合、またはキーと値のペアのキーをその値に基づいて変換する場合は、mapUniquemapGroup、またはmapReduceのいずれかの変換を使用します。 ただし、パフォーマンスに問題がある場合は、mapValuesmapValuesWithKeyまたはmapKeys*のいずれかの操作を使用することを検討してください。

mapUnique キーが重複していない限り、キーと値のペアを新しいキーと値のペアに変換します。 重複したキーが生成された場合、実行時に変換が失敗します。 クラスタ内のノード間でデータがシャッフルされるため、引数として明示的にパーティション分割者を指定する必要があります。

たとえば、次のスニペットでは、キーと値が 2 つのレイヤーに分割されます。

  • 正の値 : レイヤーをに設定 positive_valuesします。
  • 負の値 : レイヤーをに設定 negative_valuesします。
val deltaSet1: DeltaSet[Key, Int] = ???
val split: DeltaSet[Key, Int] =
  deltaSet1.mapUnique(
    mapFn = { (key, i) =>
      if (i >= 0) {
        (key.copy(layer = Layer.Id("positive_values")), i)
      } else {
        (key.copy(layer = Layer.Id("negative_values")), i)
      }
    },
    partitioning = context.defaultPartitioner
  )

mapGroup キーと値のペアを新しいキーと値のペアに変換します。 重複するキーの値がグループ化され、 DeltaSet によって各キーに値のコレクションが割り当てられます。

たとえば、次のスニペットで は、 HERE Tile パーティションに関連付けられているメタデータ HERE Tile にマッピングされ、グループ化されます。

val deltaSet1: DeltaSet[Key, Meta] = ???
val deltaSet2: DeltaSet[Key, Iterable[Meta]] =
  deltaSet1.mapGroup(
    mapFn = {
      case (key, value) =>
        (key.copy(partition = key.partition.parent.getOrElse(key.partition)), value)
    },
    partitioning = context.defaultPartitioner
  )

mapReduce キーと値のペアを新しいバージョンに変換します。このバージョンでは、指定した Reduce 関数を使用して、重複するキーの値が削減されます。 この Reduce 関数は、 2 つの値を 1 つに結合する必要があります。 を使用 mapReduce すると、を使用し mapGroup て各値を減らしてから、を使用するよりも効率的 mapValuesです。

たとえば、次のスニペットで は、 HERE Tile パーティションに関連付けられているすべての整数 HERE Tile にマッピングされ、各出力 HERE Tile の値がその合計に削減されます。

val deltaSet1: DeltaSet[Key, Int] = ???
val deltaSet2: DeltaSet[Key, Int] =
  deltaSet1.mapReduce(
    mapFn = {
      case (key, value) =>
        (key.copy(partition = key.partition.parent.getOrElse(key.partition)), value)
    },
    reduceFn = _ + _,
    partitioning = context.defaultPartitioner
  )

これら mapUniqueの、 mapGroupmapReduce 、およびの操作は、キー値のマッピング関数を使用して、正確に 1 つのキーを生成します。 一方、flatMapUniqueflatMapGroup、およびflatMapReduceの操作では、 0 個以上のキーを生成するためのマッピング関数が使用されます。

キーを変換します

DeltaSets は、 DeltaSet 内のキーを値を考慮せずに変更するための一連の変換を提供します。 これらの変換は非常に効率的で、可能であればキーおよび値に基づいて変換よりも優先されます。

たとえば、 mapKeys は DeltaSet 内のキーのみを読み取りまたは書き込みなしで変換する効率的な操作です。 キーの変換は 1 対 1 である必要があります。つまり、入力 DeltaSet のすべてのキーが出力 DeltaSet の一意のキーにマップされます。 変換が 1 対 1 であることを確認し、効率的な増分処理を可能にするには、 mapKeys キーマッピング関数と mapFnその関数の逆関数の両方を指定する必要 inverseFnがあります。

val partitioner = NameHashPartitioner(10)
val input: DeltaSet[Key, Int] = ???
val incrementedIntegers: DeltaSet[Key, Int] =
  input.mapKeys(
    OneToOne(
      mapFn = key => key.copy(layer = Layer.Id("outLayerA")),
      inverseFn = key => key.copy(layer = Layer.Id("inLayerA"))
    ),
    partitioner
  )

この操作の使用方法の詳細については、レイヤーのコピーの例を参照してください。

DeltaSet にキーが含まれて xいる場合 inverseFn(mapFn(x)) != x、実行時に変換が失敗します。 によって作成されたキーでは逆関数を呼び出すことができる mapFnため、渡さ inverseFn れた任意のキーについて正しい結果を返す必要があります。可能なキーのサブセットにのみ定義されている場合は、その逆関数を部分関数として定義できます。

val partitioner = HashPartitioner[String](context.defaultParallelism)
val input: DeltaSet[Int, String] = ???
val stringKeyed: DeltaSet[String, String] =
  input.mapKeys(
    OneToOne[Int, String](
      mapFn = _.toString,
      inverseFn = {
        case s: String if s forall Character.isDigit => s.toInt
      }
    ),
    partitioner
  )

逆変換を指定するのが不便または不可能な場合 mapUnique は、以下で説明するより高価な変換を使用することを検討してください。

flatMapKeys 各入力キーを 0 個以上の出力キー (1 対多 ) にマップする変換です。 mapKeys 各出力キーが 1 つの入力キーの結果であることを示すために、反転関数と同様の関数を渡す必要があります。

複数の入力キーを同じ出力キー (1 対 1 または 1 対 n) にマッピングする場合、キーおよび値の変換と同様に、値のセットをグループ化または削減できます。 DeltaSets は、 4 つの変換を提供して、グループ化 / 削減と n-to-1/m-to-n: のすべての組み合わせをカバーし、コレクション内のすべての値をmapKeysGroupflatMapKeysGroupグループ化します。一方、mapKeysReduceflatMapKeysReduce Reduce 関数をすべての値に適用します。 flatMapKeysGroup使用方法の例について は、 DirectMToN コンパイラーの移行を参照してください。

データをフィルタリングします

filterByKey DeltaSet のキーと値のペアを、キーのみに基づいてフィルタリングします。 この変換は非常に効率的に実行され、 Spark クラスタ内のノード間でデータを交換する必要はありません。

次のスニペットでは 、hmcカタログからのadministrative_placesレイヤーが照会され、「 1469256839 」で始まる汎用パーティション名のすべてのパーティションキーがフィルタリングされます。 hmc これは、カタログ ID が HERE Map Content カタログを参照している場合に、オーストラリアのすべての管理場所を読み取ることに対応しています。

import com.here.platform.data.processing.catalog.Partition.Generic
val filteredInput: DeltaSet[Key, Meta] =
  context
    .queryCatalogLayer(Catalog.Id("hmc"), Layer.Id("administrative_places"))
    .filterByKey {
      case Key(catalog, layer, Generic(name)) =>
        name.startsWith("1469256839")
      case _ => false
    }

filterByKey操作でパーティションキーフィルタを使用することもできます。 たとえば 、ベルリン周辺のバウンディング ボックスに属する HERE Tile を使用して、 HERE Tile 名としてパーティションが指定されているパーティションのみをフィルタリングするに は、次のfilterByKey操作を使用します。

val filteredInput: DeltaSet[Key, Meta] =
  context
    .queryCatalogLayer(Catalog.Id("hmc"), Layer.Id("road_attributes"))
    .filterByKey(
      BoundingBoxFilter(
        south = 50.97656,
        west = 11.95313,
        north = 51.06445,
        east = 12.04102
      )
    )

パーティションキーフィルタ は、パスの設定ファイルから定義することも here.platform.data-processing.deltasets.partitionKeyFiltersできます。 設定ファイルで定義されているパーティションキーフィルタは、 query すべての変換およびに適用 readBackされます。

データに参加します

join 2 つの DeltaSet を取得して DeltaSet を作成する DeltaSet 変換です。 DeltaSet に含まれる各キーについて、各 DeltaSets のキーに関連付けられている値のペアが含まれます。

val integers: DeltaSet[Key, Int] = ???
val strings: DeltaSet[Key, String] = ???
val pairs: DeltaSet[Key, (Int, String)] = integers join strings

DeltaSets によって提供されるその他の種類の結合変換は次のとおりです。

  • outerJoin 2 つの DeltaSet を取得し、 DeltaSet に含まれている各キーについて、各 DeltaSets のキーに関連付けられている値のペアを含む DeltaSet を作成します。 DeltaSets のいずれかでキーに値が関連付けられていない場合、ペアのエントリはに設定 Noneされます。
  • leftOuterJoin 2 つの DeltaSet を取得し、左側の DeltaSet に含まれている各キーについて、各 DeltaSets のキーに関連付けられている値のペアを含む DeltaSet を作成します。 キーが右側の DeltaSet の値に関連付けられていない場合、ペアのエントリはに設定 Noneされます。

次の例は、結合の一般的な使用例を示しています。 2 つのレイヤーroad-attributestopology-geometryがあり、カタログhmc(HERE Map Content)から照会されます。 両方の DeltaSets のキーが書き換えられ、出力カタログの同じカタログおよびレイヤーが含まれるようになります。 次に、 outerJoin が計算され、両方のレイヤーのメタデータを含む DeltaSet が作成されます。 このようにして、パーティションの内容を関連付けることができます。

val topology: DeltaSet[Key, Meta] =
  context
    .queryCatalogLayer(Catalog.Id("hmc"), Layer.Id("topology_geometry"))
    .mapKeys(
      OneToOne(
        _.copy(catalog = Default.OutCatalogId, layer = Layer.Id("outLayerA")),
        _.copy(catalog = Catalog.Id("hmc"), layer = Layer.Id("topology_geometry"))
      ),
      context.defaultPartitioner
    )
val roadAttributes: DeltaSet[Key, Meta] =
  context
    .queryCatalogLayer(Catalog.Id("hmc"), Layer.Id("road_attributes"))
    .mapKeys(
      OneToOne(
        _.copy(catalog = Default.OutCatalogId, layer = Layer.Id("outLayerA")),
        _.copy(catalog = Catalog.Id("hmc"), layer = Layer.Id("road_attributes"))
      ),
      context.defaultPartitioner
    )
val pairs: DeltaSet[Key, (Option[Meta], Option[Meta])] =
  topology outerJoin roadAttributes

joinステートフル変換です が、outerJoinleftOuterJoinはステートフルではありません。このため、後の 2 つの変換がより効率的になります。

ユニオンデータ

disjointUnion 2 の入力 DeltaSet を受け取り、いずれかの入力 DeltaSets に含まれているすべてのキーと値のペアを含む出力 DeltaSet を生成する DeltaSet 変換です。 両方 の入力 DeltaSets にキーが含まれている場合、操作によって例外がスローされます。

次のスニペットでは、入力 DeltaSet をズーム レベル 10 および 12 のタイルを持つ 2 つの DeltaSet に分割します。 次に、各 DeltaSet が mapValues 文字列に変換さ combinedれ、結果の和が変数に保存されます。

val input: DeltaSet[Key, Meta] = ???
val tilesAt10: DeltaSet[Key, String] =
  input
    .filterByKey {
      case Key(_, _, t: HereTile) => t.quad.getZoomLevel == 10
      case _                      => false
    }
    .mapValues(???)
val tilesAt12: DeltaSet[Key, String] =
  input
    .filterByKey {
      case Key(_, _, t: HereTile) => t.quad.getZoomLevel == 12
      case _                      => false
    }
    .mapValues(???)
val combined: DeltaSet[Key, String] =
  tilesAt10 disjointUnion tilesAt12

上記のスニペットでは、両方の入力 DeltaSets のタイプが完全に同じです。 また、次のスニペットのように、さまざまな値タイプを持つこともできます。 この場合、出力 DeltaSet の値タイプは、入力の一般的な値タイプのスーパータイプです。 次のスニペットで DeltaSet[Key, Int] は、との和が DeltaSet[Key, String] として入力さ DeltaSet[Key, Any]れています。 これは、 Scala で AnyStringIntのスーパータイプであるために有効です。

val integers: DeltaSet[Key, Int] = ???
val strings: DeltaSet[Key, String] = ???
val union: DeltaSet[Key, Any] = integers disjointUnion strings

このコンテキストでは、 2 つ以上 DeltaSetのののジョイント解除ユニオンを作成できます。

val integers1: DeltaSet[Key, Int] = ???
val integers2: DeltaSet[Key, Int] = ???
val integers3: DeltaSet[Key, Int] = ???
val union: DeltaSet[Key, Int] =
  context.disjointUnion(List(integers1, integers2, integers3))

参照を動的に解決します

mapValuesWithResolver キーとメタのペアの DeltaSet 、サブジェクト、および他のパーティション、リファレンスへの動的なアクセスを変換するために使用できます。 resolveReferences またはを使用して静的な参照解決を行う代わり RefTreeCompilerに、必要なすべての参照を事前に計算する必要があります。 一方、ダイナミックリファレンス分解能はより柔軟で、コード行数が少なく、特に複雑なリファレンス構造の場合は、静的リファレンス分解能よりも高速になります。

mapValuesWithResolver mapValuesWithKeyただし、各サブジェクトに適用されるマッピング関数には、サブジェクトのキーとメタデータ、 Resolverおよびサブジェクトが参照できる任意のキーのメタデータを決定するの 3 つの引数があります。 サブジェクトのメタデータおよびリファレンスを使用して、対応するペイロードを取得できます。

Resolver 、 1 つ以上のを使用 ResolutionStrategyして、キーに対応するメタデータを検索します。 このような戦略の 1 つにDirectQueryData API を介してメタデータを直接要求する方法があります。この方法は簡単ですが、解決されたメタデータごとに 1 つのネットワーククエリーが必要です。 次のセクションでは、大量のメタデータを一度にダウンロードすることで、より効率的に解決できる 3 つの解決方法について説明します。

次のスニペットでは、カタログinAのレイヤーAにあるパーティションからカタログinAのレイヤーBにあるパーティションへの参照を解決し ます。 レイヤー内の各パーティション A は、レイヤー内のパーティションの名前を参照 Bします。 通常、たとえば HERE Map Content を処理する場合、パーティションの名前は他のデータと一緒に保存されます。 ただし、この例では、レイヤーのパーティションに他の情報がないと仮定 Aしています。

まず、 DeltaSet A にレイヤーをクエリー subjects します。 次に mapValuesWithResolver 、マッピング関数と strategies パラメーターを渡して呼び出し DirectQueryます。このパラメーターはに設定されています。

マッピング機能内 :

  1. レイヤー内のパーティションを取得 A するには、を使用 retrieverします。
  2. パーティションの内容を文字列referenceNameに変換し、レイヤーB内のreferenceNameパーティションを参照するKeyオブジェクトを作成します。
  3. 参照に対応するメタデータを取得するには、リゾルバを使用します。
  4. 参照されているパーティションが存在しない場合は、例外がスローされます。
  5. それ以外の場合は、そのパーティションを取得します。
val retriever = context.inRetriever(Catalog.Id("inA"))
val subjects: DeltaSet[Key, Meta] =
  context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("A"))
subjects.mapValuesWithResolver(
  mapFn = {
    case (resolver, key, meta) =>
      // Retrieve and decode the payload. String construction is a placeholder for
      // decoding the partition and getting a reference.
      val referenceName = new String(retriever.getPayload(key, meta).content)
      // Construct a key for the partition with "referenceName" in layer B.
      val referenceKey = Key(Catalog.Id("inA"), Layer.Id("B"), Generic(referenceName))
      // Try to find the metadata for `referenceKey`.
      resolver.resolve(referenceKey) match {
        case None                => throw new Exception("Partition does not exist!")
        case Some(referenceMeta) =>
          // Retrieve the referenced partition.
          val referencePartition = retriever.getPayload(referenceKey, referenceMeta)
          ??? // TODO: Do something with the partition
      }
  },
  strategies = List(DirectQuery(Catalog.Id("inA"), Set(Layer.Id("B"))))
)

解決戦略

現在、次の 4 つの解決策があります。

  1. DirectQuery -- カタログと一連のレイヤーを指定すると 、この方法では Data API を介して各キーのメタデータを直接取得します。 リクエストの結果は、実行者ごとにキャッシュされます。 キャッシュのサイズは、のコンストラクタに引数を渡すことによって設定でき DirectQueryます。デフォルトは 10000 メタデータオブジェクト(約 3MB )です。

  2. Broadcast -- メタデータを含む DeltaSet を指定すると、この戦略はメタデータの完全なコピーを各 Spark 実行者に送信し、ネットワークリクエストを追加せずに各実行者でメタデータ全体を利用できるようにします。 内部的には、この方法では Spark ブロードキャスト変数が使用されます。 メタデータの量によっては、各 e キュトールに大量のメモリが必要になることがあります。 メタデータの保存に必要なメモリは、 DeltaSet のパーティションあたり約 300 バイトです。

  3. BackwardResolution --DeltaSet に参照が含まれており、各参照を一連のサブジェクトパーティションにマップする関数があるとし BackwardResolution ます。は、サブジェクトパーティションを処理するときに、これ以上のネットワーククエリーを実行せずに、これらの参照をリゾルバに公開します。 たとえば、次のスニペットでは、レイヤーの各パーティション A がレイヤーのすべての子とグループ化 Bされています。

    BackwardResolution(
    context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("B")), { (key, meta) =>
     Set(key.copy(layer = Layer.Id("A"), partition = key.partition.parent.get))
    }
    )
    

    事前定義 BackwardResolution.toSamePartition された後方解決方式では、各サブジェクトパーティションが同じ名前の参照パーティションでグループ化されます。 同様 BackwardResolution.toNeighbors に、各サブジェクトタイルを、指定した深さのすべてのネイバータイルでグループ化します。

  4. ForwardResolution --DeltaSet に参照が含まれており、各サブジェクトを参照パーティションのセットにマップする関数があるとし ForwardResolution ます。は、サブジェクトパーティションの処理時に、これ以上のネットワーククエリーを実行せずに、これらの参照をリゾルバに公開します。 この方法はの逆です BackwardResolutionForwardResolution 主題の DeltaSet のキータイプと値タイプをタイプパラメータとして取得します。 次のスニペットは、すべてのネイバータイルで各サブジェクトタイルをグループ化する方法を示しています。

    ForwardResolution[Key, Meta](
    context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("B")), {
     case (key @ Key(_, _, tile: HereTile), meta) =>
       tile
         .neighbors(1)
         .map(tile => key.copy(layer = Layer.Id("B"), partition = tile))
    }
    )
    

mapValuesWithResolver 解決策の一覧表を取得して、順番に組み合わせることができます。 たとえば、次のスニペットの解決方法の一覧表を検討します。 レイヤーのパーティションを処理 A し、レイヤー内のタイルへの参照を解決する場合 B、現在処理されているタイルの直接のネイバーはすべて、ネットワークリクエストなしで利用できます。 レイヤー内の他のタイルを解決 B するには ( 遠くにいるか、別のズーム レベルで ) 、ネットワークリクエストが実行されます。 レイヤーCへの参照を解決するには、常にネットワークリクエストを使用し、レイヤーD内の参照はブロードキャストを使用して解決します。

val strategies =
  List(
    BackwardResolution.toNeighbors(
      context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("B")),
      Catalog.Id("inA"),
      Layer.Id("A")
    ),
    DirectQuery(Catalog.Id("inA"), Set(Layer.Id("B"), Layer.Id("C"))),
    Broadcast(context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("D")))
  )

mapValuesWithResolverDeltaSet[Key, Meta]に適用できます。この場合、サブジェクトDeltaSetのパーティション作成者もパーティションリファレンスに使用されます。 キーまたは値の種類が異なるDeltaSetmapValuesWithResolverが適用されている場合は、参照のパーティションを作成する必要があります。 たとえば、このスニペットでは、が DeltaSet[HereTile, String] 変換されます。

val deltaSet: DeltaSet[HereTile, String] = ???

deltaSet.mapValuesWithResolver(
  (r, k, s) => ???,
  Seq(
    ForwardResolution(
      context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("B")), {
        case (tile, string) =>
          tile
            .neighbors(1)
            .map(tile => Key(Catalog.Id("inA"), Layer.Id("B"), tile))
      }
    )),
  HashPartitioner(12)
)

参照を静的に解決します

resolveReferences RefTreeCompiler の最初のステップと同じ機能を実装する変換です。カタログパーティション間の参照関係を指定すると、変換によって DeltaSet 内の各パーティションがその参照でグループ化されます。 参照関係は次のものによって定義されます。

  • RefTree。パーティション間に存在する可能性のある参照の種類を指定します
  • パーティションによって参照されるパーティションキーの具体的なセットを計算する RESOLVE 関数については 、「 RefTreeCompiler」を参照してください。

次のスニペットでは、パーティション名が HERE Tile である各キーとメタのペアをdeltaSet1でグループ化するためにresolveReferences使用され、すべてのネイバーのすべてのキーとメタのペアでグループ化されます。

import com.here.platform.data.processing.compiler.reftree._

val deltaSet1: DeltaSet[Key, Meta] = ???
val deltaSet2: DeltaSet[Key, (Meta, Map[Key, Meta])] =
  deltaSet1.resolveReferences(
    RefTree(
      Subject((Catalog.Id("inCatalogA"), Layer.Id("inLayerA")),
              Ref(RefTree.RefName("neighbor"),
                  (Catalog.Id("inCatalogA"), Layer.Id("inLayerA"))))),
    resolveFn = {
      case (Key(catalog, layer, partition: HereTile), meta) =>
        val neighbor = partition.neighbors(radius = 1) - partition
        Map(RefTree.RefName("neighbor") -> neighbor.map(neighborTile =>
          Key(catalog, layer, neighborTile)))
      case _ => Map.empty
    }
  )

メモ :resolveReferences での参照解決手順とは RefTreeCompiler 異なる動作が 1 つの詳細に示されます。 パーティションキーフィルタ が設定ファイルで定義されている場合、これらのフィルタは参照とサブジェクトの両方に適用されます。 で RefTreeCompilerは、サブジェクトのみがフィルタリングされます。

公開されたデータを読み取ります

PublishedSetの場合、readBackは適用できる唯一の変換です。 PublishedSetレイヤー にパーティションを公開した結果として作成されたが、公開後にそのレイヤーに含まれているすべてのパーティションのキーとメタデータを含む DeltaSet に変換されます。 この方法では、以前の処理ステップで公開されたパーティションを読み取り戻して、次の手順で使用できます。

以下のコード スニペットでは、出力カタログレイヤーに公開されたintermediate結果をreadBack使用して読み取ります。 readBack 次に、を disjointUnion 入力カタログ hmc ( HERE Map Content )のレイヤーと組み合わせて、これらのレイヤー間の参照を解決します。

val intermediate: DeltaSet[Key, Payload] = ???
val intermediatePublished: PublishedSet = intermediate.publish(Set(Layer.Id("intermediate")))
val intermediateAndTopology: DeltaSet[Key, Meta] =
  intermediatePublished.readBack() disjointUnion
    context.queryCatalogLayer(Catalog.Id("hmc"), Layer.Id("topology_geometry"))
val references: DeltaSet[Key, (Meta, Map[Key, Meta])] =
  intermediateAndTopology.resolveReferences(
    RefTree(
      Subject((Default.OutCatalogId, Layer.Id("intermediate")),
              Ref(RefTree.RefName("intermediate_to_topology"),
                  (Catalog.Id("hmc"), Layer.Id("topology_geometry"))))),
    resolveFn = ???
  )

DDs を DeltaSets に変換します

toDeltaSet Spark RDD を DeltaSet に変換するために使用できる操作です。 たとえば、 Spark を使用して他のソースからデータを取り込み、 DeltaSets を使用する処理パイプラインに統合する場合に使用できます。 RDD にはキーと値のペアが含まれている必要があります。また、同じキーを持つ複数のペアを含めることはできません。 RDD のパーティションを再作成するパーティション分割者を渡す必要 toDeltaSetがあります。 RDD がすでに特定のパーティション分割者とパーティション分割されていない限り、 RDD の再分割によってシャッフルが発生します。

作成された DeltaSet には、パイプラインの最後の実行以降の変更に関する情報が含まれていません。パイプラインの増分実行中であっても、 Downstream DeltaSet 変換は DeltaSet 内のすべてのデータを処理します。

次のスニペットでは、 Spark DDs 経由で csv ファイルを DeltaSet にインジェストする方法を示します。 DeltaContextを介してアクセスできるSparkContextを使用して、 csv ファイルairports.csvを読み取ります。 次に、 RDD をキー値のフォームに変換します。このフォームでは、 csv ファイルの最初の列がキーとして機能します。 最後 toDeltaSet に、 RDD を DeltaSet に変換するために使用します。

val sc = context.driverContext.spark
val rowsByFirstColumn: RDD[(Key, Array[String])] =
  sc.textFile("airports.csv").map { x =>
    val columns = x.split(",").map(_.trim)
    (Key('outCatalog, 'outLayer, Generic(columns(0))), columns.drop(1))
  }
val deltaSet: DeltaSet[Key, Array[String]] =
  rowsByFirstColumn.toDeltaSet(context.defaultPartitioner)

Spark のパーティション分割とシャッフル

DeltaSet 内のデータは、常に特定 のパーティション作成者に従ってパーティション化されます。このパーティション分割者は、 DeltaSet 内の各キーを Spark パーティションに割り当て、その結果、データが存在するクラスタ内のノードに割り当てます。 DeltaSet 変換中は、変換で明示的に新しいパーティション作成者が指定されていない限り、常にパーティション分割者が保持されます。 特に repartition 、この変換では、パーティション分割者の変更と、新しいパーティション分割者に従ってデータの再分割のみが行われます。

DeltaSet 内のキーを変換する可能性のあるすべての変換、またはパーティション作成者を変更するすべての変換では、データのパーティション分割を再実行する必要があります。したがって、データのシャッフルと呼ばれるクラスタ内のノード間でデータを移動できます。 シャフリングは高価な操作であるため、回避する必要があります。 データ のシャッフルに必要な DeltaSet 変換については、変換プロパティの表を参照してください。

パーティション分割の戦略

データをシャッフルする各変換では、結果のキーのパーティション分割方法を明示的に指定する必要があります。 このパーティション分割方式は、パーティション分割方式または PreservesPartitioning 特別な値のいずれかになります。

パーティャーを使用する場合、変換ではそのパーティション分割者を使用して結果を再分割します。 変換のパフォーマンス要件がない場合は、DeltaContextの [defaultPartitioner] フィールドを使用できます。

DeltaSet 内のキーが変換によって変更されても、各キーが同じ Spark パーティション内に残っている場合 PreservesPartitioningは、を使用できます。 この戦略によって、データのシャッフルによる変換が完全に回避され、パフォーマンスが大幅に改善されます。 変換でパーティション分割を保持できない場合は、実行時に例外がスローされます。

レイヤーのコピーの例では、 DeltaSet 内のキーのカタログとレイヤーを変更するためにmapKeys使用します。 カタログとレイヤーに関係なく、同じ Spark パーティション内の同じ名前のすべてのカタログパーティションをグループ化する、defaultPartitionerPartitionNamePartitionerとパーティション分割されます。 その結果、すべてのカタログパーティションが同じ Spark パーティション内に残り、使用でき PreservesPartitioningます。

PreservesPartitioning アップストリームとダウンストリームのキータイプ DeltaSetが同じではない場合でも、パーティション分割方式を使用でき DeltaSet ます。ただし、アップストリームがダウンストリームキータイプも処理できるように一般的なパーティション分割者によって分割されている必要があります。 この高度な機能を使用すると、 Partition.Key たとえば、キーのある DeltaSet を Partition.Name 、 Spark shuffle を作成せずにキーのある DeltaSet にマップできます。

// Query data from a catalog and change the key type from Partition.Key to Partition.Name
val catalog: Catalog.Id = ???
val layer: Layer.Id = ???
context
  .queryCatalogLayer(catalog, layer)
  .mapKeys(
    OneToOne[Key, Name](_.partition, Key(catalog, layer, _)),
    PreservesPartitioning
  )

HERE を PreservesPartitioning 使用できます。これは、で使用されるデフォルトのパーティャー queryCatalogLayerPartitionNamePartitioner 、アップストリームキータイプ、 Partition.Key、およびダウンストリームキータイプの両方をサポートするであるため Partition.Nameです。

データの遅延と永続性

DeltaSet が 2 つ以上の変換で使用される場合は、結果が 2 回以上再計算されないように、結果がメモリまたは Spark ワーカーのディスクに保持される必要があります。 これについて は、 RDD Persistence ポリシーの RDD についてより詳細に説明 し、 DeltaSets に均等に適用されます。 次 persist の例のように、 DeltaSet を再利用できるように永続化するには、変換を使用します。

import org.apache.spark.storage.StorageLevel

val deltaSet1: DeltaSet[Key, Int] = ???
val doubled = deltaSet1
  .mapValues(_ * 2)
  .persist(StorageLevel.MEMORY_AND_DISK_2)
val reuse1 =
  doubled
    .mapValues(x => Payload(BigInt(x).toByteArray))
    .publish(Set(Layer.Id("outLayer1")))
val reuse2 =
  doubled
    .mapValues(x => Payload(BigInt(x / 2).toByteArray))
    .publish(Set(Layer.Id("outLayer2")))

パフォーマンスのプロパティ

さまざまな変換の内部実装の時間と空間のコストは異なり、外部から直接見ることはできません。 特に、特定の変換を他の変換よりも高価にするプロパティが 2 つあります。つまり、シャフリングステートフル変換です。

データをシャッフルする変換によって、クラスタ内のノード間でデータが移動されます。この処理では、帯域幅が使用され、計算の速度が低下します。

ステートフルである変換で は、差分計算を可能にするために、補助情報を出力カタログのstateレイヤーに保存する必要があります。 つまり、出力カタログで余分なストレージが消費され、状態の計算には余分な時間が必要になります。また、計算が終了するまで状態を維持するには、クラスタのノードで追加の RAM が必要になります。

ほとんどのパイプラインでは、ステートフルやシャフリングの操作を使用することは避けられませんが、次の表を参照すると、可能な限りステートフルやシャフリングの変換を回避できます。

操作 シャッフル ? ステートフル?
detectChanges いいえ はい
disjointUnion いいえ いいえ
filterByKey いいえ いいえ
flatMapGroup はい1 はい
flatMapKeys はい1 いいえ
flatMapKeysGroup はい1 いいえ
flatMapKeysReduce はい1 いいえ
flatMapReduce はい1 はい
flatMapUnique はい1 はい
join いいえ はい
leftOuterJoin いいえ いいえ
mapGroup はい1 はい
mapKeys はい1 いいえ
mapKeysGroup はい1 いいえ
mapKeysReduce はい1 いいえ
mapReduce はい1 はい
mapUnique はい1 はい
mapValues いいえ いいえ
mapValuesWithKey いいえ いいえ
mapValuesWithResolver はい はい
outerJoin いいえ いいえ
persist いいえ いいえ
publish いいえ いいえ
publishPart いいえ いいえ
readBack いいえ いいえ
repartition はい いいえ
resolveReferences はい はい
toDeltaSet はい いいえ
1PreservesPartitioning パーティション分割方式としてが使用されている場合、シャフリングを回避できます。 素晴らしいホテルです

ステートフル 変換によって生じるオーバーヘッドを避けるため、可能な限り避けることをお勧めします。 たとえば、ステートフル join の代わりにステートレス outerJoin を使用したり、ステートフル mapGroup を使用する代わりにステートレス mapKeysGroup および mapValues でキーおよび値を別々に変換したりできます。

forceStateless設定 オプションを使用すると、すべての変換をステートレスにすることができます。 これにより、状態によって生じるオーバーヘッドが回避されますが、増分実行中にパーティション間の依存関係を変換で追跡することはできません。 実際には、DeltaSet は増分実行で、アップストリーム DeltaSet がまったく変更されなかったと判断できない限り、変更されたかどうかにかかわらず、すべてのアップストリームパーティションを処理する必要があります。 後者の場合、の値に関係 forceStatelessなく、処理を行う必要はありません。 特に例外は detectChanges で、forceStateless が設定されている場合でも、アップストリーム DeltaSet の変更された部分のみを処理できます。 detectChanges ただし、状態を持たない変更済みパーティションのセットを削減することはできませ forceStatelessん。また、を設定することで効果的に無効になります。

比較と移行

このセクションでは、 DeltaSets をデータ プロセッシング ライブラリで分散計算を表現する他の方法と比較し、より柔軟性の低いインターフェイスから DeltaSets に移行する方法を提供します。

ファンクショナルパターン

ファンクショナルパターンの以前のユーザーは、 DeltaSets を カスタム コンパイルパターンを作成する方法として理解でき ます。つまり、特定のアプリケーションで必要な数のresolveFncompileInFn、、およびcompileOutFnの関数を含むコンパイラを使用できます。 ただし、 DeltaSets には、計算を構造化する方法が多数用意されています。たとえば、 1 つのcompileInFn結果を複数のcompileOutFnで再利用したり、複数のcompileInFnの結果を結合したり、中間結果を出力カタログにパブリッシュしたりできます。

MapGroupCompiler を DeltaSets に移行します

DeltaSet 変換で表される MapGroupCompiler は、次のものに対応します。

  • flatMapGroupを使用して結果をinLayersグループ化し、すべてのキーとメタのペアにcompileIn関数を適用します
  • compileOut 機能を使用して、中間データのグループをペイロードに変換します
  • 結果を公開しています

以下に完全な例を示し TODO ます。タグは、中間データ、入力レイヤー、および MapGroupCompiler を定義する場所を示します。

import com.here.platform.data.processing.blobstore.Retriever
import com.here.platform.data.processing.catalog._
import com.here.platform.data.processing.compiler.{CompileOut1To1Fn, MapGroupCompiler}
import com.here.platform.data.processing.driver._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.deltasets._

object MapGroupMain extends DeltaSimpleSetup {

  case class IntermediateData() // TODO: Define the intermediate data of the compiler

  def inLayers: Map[Catalog.Id, Set[Layer.Id]] =
    ??? // TODO: Define the input layers of the compiler

  def constructMapGroupCompiler(retrievers: Map[Catalog.Id, Retriever])
    : MapGroupCompiler[IntermediateData] with CompileOut1To1Fn[IntermediateData] =
    ??? // TODO: Construct the compiler

  def setupSets(completeConfig: CompleteConfig, context: DeltaContext): Iterable[PublishedSet] = {

    import context.transformations._

    val retrievers = inLayers.map { case (c, _) => (c, context.inRetriever(c)) }
    val compiler = constructMapGroupCompiler(retrievers)

    // If the compiler defined a particular partitioner, use it. Otherwise, use same partitioner
    // used for partitioning queried metadata (context.defaultPartitioner), a reasonable default.
    val partitioner =
      compiler.outPartitioner(context.defaultParallelism).getOrElse(context.defaultPartitioner)

    val result =
      context
        .queryCatalogs(
          inLayers
        )
        .flatMapGroup(
          Function.untupled(compiler.compileInFn),
          partitioner
        )
        .mapValuesWithKey(
          compiler.compileOutFn
        )
        .publish(
          compiler.outLayers
        )

    Iterable(result)
  }
}

RefTreeCompiler を DeltaSets に移行します

DeltaSet 変換で表される RefTreeCompiler は、次のものに対応します。

  • を使用して、サブジェクトパーティションのセットとその参照をグループ化します resolveReferences
  • compileIn すべてのサブジェクトと参照のペアに関数を適用し、を使用して結果をグループ化します flatMapGroup
  • compileOut 機能を使用して、中間データのグループをペイロードに変換します
  • 結果を公開しています

以下に完全な例を示し TODO ます。タグは、中間データ、入力レイヤー、および RefTreeCompiler を定義する場所を示します。

import com.here.platform.data.processing.blobstore.Retriever
import com.here.platform.data.processing.catalog._
import com.here.platform.data.processing.compiler.reftree.CompileInFnWithRefs
import com.here.platform.data.processing.compiler.{CompileOut1To1Fn, RefTreeCompiler}
import com.here.platform.data.processing.driver._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.deltasets._

object RefTreeMain extends DeltaSimpleSetup {

  case class IntermediateData() // TODO: Define the intermediate data of the compiler

  def inLayers: Map[Catalog.Id, Set[Layer.Id]] =
    ??? // TODO: Define the input layers of the compiler

  def constructTestRefTreeCompiler(
      retrievers: Map[Catalog.Id, Retriever]): RefTreeCompiler[IntermediateData]
    with CompileInFnWithRefs[IntermediateData]
    with CompileOut1To1Fn[IntermediateData] =
    ??? // TODO: Construct the compiler

  def setupSets(completeConfig: CompleteConfig, context: DeltaContext): Iterable[PublishedSet] = {

    import context.transformations._

    val retrievers = inLayers.map { case (c, _) => (c, context.inRetriever(c)) }
    val compiler = constructTestRefTreeCompiler(retrievers)

    // If the compiler defined a particular partitioner, use it. Otherwise, use same partitioner
    // used for partitioning queried metadata (context.defaultPartitioner), a reasonable default.
    val partitioner =
      compiler.outPartitioner(context.defaultParallelism).getOrElse(context.defaultPartitioner)

    Iterable(
      context
        .queryCatalogs(
          compiler.inLayers
        )
        .resolveReferences(
          compiler.refStructure,
          Function.untupled(compiler.resolveFn)
        )
        .flatMapGroup(
          { case (k, (v, refs)) => compiler.compileInFn((k, v), refs) },
          partitioner
        )
        .mapValuesWithKey(
          compiler.compileOutFn
        )
        .publish(
          compiler.outLayers
        )
    )
  }
}

ダイレクト 1:N および M:N コンパイラーを DeltaSets に移行します

DeltaSet 変換で表現されたダイレクト M:N コンパイラーは、次のものに対応します。

  • compileIn およびを使用して、各入力キーと値のペアの中間データを計算 mapValuesWithKeyします。
  • mappingFn およびを使用 flatMapKeysGroupして、すべての入力キーと値のペアを対応する出力キーにマッピングします。 HERE では、の逆も mappingFn 指定する必要があります。
  • compileOut 機能を使用して、中間データのグループをペイロードに変換します
  • 結果を公開しています

以下に完全な例を示します。 TODO タグは、中間データ、入力レイヤー、の逆および mappingFn を定義する場所を示し DirectMToNCompilerます。

import com.here.platform.data.processing.blobstore.Retriever
import com.here.platform.data.processing.catalog._
import com.here.platform.data.processing.compiler.direct.CompileInFn
import com.here.platform.data.processing.compiler.{CompileOut1To1Fn, DirectMToNCompiler}
import com.here.platform.data.processing.driver._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.deltasets._

object DirectMToNMain extends DeltaSimpleSetup {

  case class IntermediateData() // TODO: Define the intermediate data of the compiler

  def inverseMappingFn: Partition.Key => Iterable[Partition.Key] = ???

  def inLayers: Map[Catalog.Id, Set[Layer.Id]] =
    ??? // TODO: Define the input layers of the compiler

  def constructTestDirectMToNCompiler(retrievers: Map[Catalog.Id, Retriever]): DirectMToNCompiler[
    IntermediateData] with CompileInFn[IntermediateData] with CompileOut1To1Fn[IntermediateData] =
    ??? // TODO: Construct the compiler

  def setupSets(completeConfig: CompleteConfig, context: DeltaContext): Iterable[PublishedSet] = {

    import context.transformations._

    val retrievers = inLayers.map { case (c, _) => (c, context.inRetriever(c)) }
    val compiler = constructTestDirectMToNCompiler(retrievers)

    // If the compiler defined a particular partitioner, use it. Otherwise, use same partitioner
    // used for partitioning queried metadata (context.defaultPartitioner), a reasonable default.
    val partitioner =
      compiler.outPartitioner(context.defaultParallelism).getOrElse(context.defaultPartitioner)

    Iterable(
      context
        .queryCatalogs(
          compiler.inLayers
        )
        .mapValuesWithKey {
          case (k, v) =>
            compiler.compileInFn((k, v))
        }
        .flatMapKeysGroup(
          ManyToMany(
            compiler.mappingFn,
            inverseMappingFn
          ),
          partitioner
        )
        .mapValuesWithKey(
          compiler.compileOutFn
        )
        .publish(
          compiler.outLayers
        )
    )
  }
}

Direct1toNCompiler 、類似した一連の変換を flatMapKeysGroup 使用してで置き換えることができ flatMapKeysます。

Spark RDD ベースのパターン

RDD と同様 に、 DeltaSet は、マシンのクラスタに分散されたデータを表し、filterKeysまたはmapValuesなどの一連の機能的な操作を使用して変換できます。 実際、 DeltaSet は内部的に RDD を使用してそのデータを表します。 ただし、 DeltaSets の使用と DDs の直接の使用には、主に次の 3 つの違いがあります。

  1. キー値: RDD とは異なり、 DeltaSet にはキーと値のペアのみが含まれ、重複するキーは含まれません。 map そのため、たとえば DeltaSets には単純な操作はありません。重複するキーが作成される可能性があります。

  2. パーティション分割が強い: さらに、 DeltaSet は、常に特定のパーティション作成者に従ってパーティション分割されます。このパーティションでは、各キーと値のペアに 1 つの Spark パーティションが割り当てられ、次にその がクラスタ内の特定のマシンに保存されます。 RDD一方、は、データのパーティション分割方法について特定のパーティション分割者が定義されていなくても、データを保存できます。

  3. 増分: DeltaSet で表現されたすべての計算は 、 DepCompiler および IncrementalDepCompiler で行われた依存関係を手動で追跡することなく、増分的に実行できます。

複数のコンパイラーによるタスク

複数のコンパイラーを使用するタスクでは、コンパイラーの結果をカタログレイヤーにアップロードし、そのカタログレイヤーの内容を別のコンパイラーにダウンロードすることで、複数のコンパイラーの効果をチェーン化できます。 DeltaSets では、複数のコンパイラの効果をチェーン化することもできますが、中間結果を複数のコンパイラタスクで行った場合と同じように、出力カタログからパブリッシュして読み取るかどうかを決定できます。 DeltaSets を使用して複数のコンパイラータスクをエミュレートする方法の詳細について readBack は、変換を参照してください。

ID と設定

DeltaSet 変換の動作を設定ファイルから変更すると便利な場合があります。たとえば、アプリケーションを再コンパイルせずに変換のパフォーマンスパラメータを調整できます。 DeltaSet の各変換には一意の ID が割り当てられ、設定ファイル内のすべての変換を識別するために使用されます。 設定ファイルから変換を設定するに withId は、関数を呼び出して ID を割り当てます。この関数は、直前の変換の ID を設定します。

val deltaSet1: DeltaSet[Key, Int] = ???
val doubled: DeltaSet[Key, Int] = deltaSet1.mapValues(_ * 2).withId("doublingMap")

この方法で ID を割り当てなかった変換には、ソースコードでの表示順に基づいて自動的に ID が割り当てられます。 ID を確認する id には、変換の結果として作成された DeltaSet のメソッドを呼び出します。

ID は DeltaSet、だけでなく PublishedSet、にも割り当てられます 両方とも、共通のスーパークラスのクラスを識別および設定する機能を共有 BaseSetします。

パイプライン内でモジュール性を確立する場合 BaseSetは、のグループを共通の名前空間にラップすると便利です。 ID はこの名前空間内で一意である必要があり、名前空間は Spark UI のログメッセージおよび名前付き DDs に含まれています。 BaseSet.Namespace.enter 新しい名前空間を入力する場合に使用します。 名前空間はネストできます。

val deltaSet1: DeltaSet[Key, Int] = ???
BaseSet.Namespace.enter("routingModule") {
  val doubled: DeltaSet[Key, Int] = deltaSet1
    .mapValues(_ * 2)
    .withId("doublingMap")
}

DeltaSets は ' パイプラインapplication.confにセクションを追加することによって構成でき ます詳細については 'Configuring the ライブラリを参照してください すべて の変換および各変換に適用されるデフォルト値の両方を個別に設定できます。 以下のスニペットは、デフォルトの設定を示し here.platform.data-processing.deltasets.default、で定義されている各オプションについて説明しています。 デフォルトの設定を変更するに application.conf は、このスニペットをにコピーし、必要に応じて値を変更します。

// Configures the default settings for DeltaSet operations.
here.platform.data-processing.deltasets.default {

  // Where to store intermediate results that must be persisted. See
  // https://spark.apache.org/docs/latest/rdd-programming-guide.html#which-storage-level-to-choose
  // Applies to: flatMapReduce, flatMapGroup, mapReduce, mapGroup, mapValues, mapValuesWithKey,
  //             resolve, detectChanges, join.
  intermediateStorageLevel = "MEMORY_AND_DISK"

  // Defines how much extra processing should be done to detect invalid use of DeltaSets and
  // improve debugging.
  //
  // Possible values:
  //
  //   PERFORMANCE: Disables certain potentially expensive checks that help detecting incorrect
  //     uses of DeltaSets as early as possible. When this setting is used it can be harder to find
  //     root causes of a problem. In particular:
  //     - disjointUnion does not validate that the union does not create duplicate keys.
  //     - toDeltaSet does not validate that the given RDD does not contain duplicate keys.
  //
  //   SAFETY: Enable validation checks. Default.
  //
  //   DEBUG: Like "SAFETY", but also enable processing to improve debug output. In particular:
  //     - Logical errors such as IllegalArgumentException, are not thrown immediately during a
  //       stage that invokes user-defined functions. Instead, they are collected in a container in
  //       the driver, and thrown only once.
  //       Applies to: mapReduce/flatMapReduce (only mapFn parameter, not reduceFn),
  //                   mapGroup/flatMapGroup, mapValues/mapValuesWithKey and resolve.
  validationLevel = "SAFETY"

  // Number of threads to use for running user-provided functions, for example mapFn and resolveFn.
  // Increasing this number is useful when you have blocking I/O happening inside the function.
  // Applies to: mapReduce/flatMapReduce (only mapFn parameter, not reduceFn),
  //             mapGroup/flatMapGroup, mapValues/mapValuesWithKey and resolve.
  threads = 1

  // Processes the keys within a partition in sorted order. Sorting ensures that keys with
  // similar names are processed together. This can improve the performance if the map function
  // caches resources, such as downloaded payloads, and the same cache entries are likely to be
  // requested while processing keys that are nearby in the sorting order.
  // Applies to: mapReduce/flatMapReduce (only mapFn parameter, not reduceFn),
  //             mapGroup/flatMapGroup and mapValues/mapValuesWithKey.
  sorting = false

  // Can be used to disable incremental computation of a DeltaSet. This will cause upstream
  // and downstream DeltaSets to also be computed non-incrementally.
  // Applies to: all transformations except publish
  incremental = true

  // Can be used to disable stateful computation of a DeltaSet. No state will be generated
  // for the DeltaSet, and the DeltaSet cannot track changes during incremental runs, that is,
  // it will either process all or none of the input partitions.
  // Applies to: all stateful transformation
  forceStateless = false

}

設定のhere.platform.data-processing.deltasets.idパスにある ID idを使用して変換を設定できます。 たとえば、次のスニペットは doublingMap 、変換で使用されるスレッド数を設定します。

here.platform.data-processing.deltasets.doublingMap {
  threads = 3
}

設定ファイルは、変換の名前空間に準拠しています。 たとえば、 here.platform.data-processing.deltasets.namespace1 を使用 namespace1して、名前空間で作成されたすべての変換を設定できます。 ID id1を持つ変換がnamespace1内部で作成された場合、 here.platform.data-processing.deltasets.defaulthere.platform.data-processing.deltasets.namespace1およびhere.platform.data-processing.deltasets.namespace1.id1で定義されている設定をこの順序で適用することで、その設定が作成されます。

設定ファイルから読み取られた設定は、アプリケーションコードでプログラムによって上書きすることもできます。 プログラムによる上書きは、設定ファイルで定義されているすべての設定よりも優先されます。

val deltaSet1: DeltaSet[Key, Int] = ???
val doubled: DeltaSet[Key, Int] = deltaSet1
  .mapValues(_ * 2)
  .withConfigOverride(
    c => c.withIncremental(false)
  )

」に一致する結果は 件です

    」に一致する結果はありません