DeltaSets を使用してインクリメンタル処理パイプラインを作成します
注
DeltaSets
はデータ プロセッシング ライブラリの新機能で、 API は今後のバージョンで変更される可能 DeltaSets
性があります。 Scala でのみ使用でき、 Java プロジェクトでは使用できません。
DeltaSets は、データ プロセッシング ライブラリによって提供される新しい分散処理抽象化です。 Spark DDs と同様に、DeltaSets は、mapReduce
およびfilterByKey
などの変換を使用してクラスタ内のデータを変換するための機能的なインターフェイスを提供します。 DDs との主な違いは、 DeltaSet 変換を必要に応じて増分計算できることです。
DeltaSets を使用 すると、カスタムコンパイルパターンを作成できます。つまり、特定のアプリケーションで必要な数のresolveFn
、compileInFn
、、compileOutFn
およびの関数を含むコンパイラを作成できます。
デザイン
主な処理の抽象化はです DeltaSet[K, V]
。ここで K
、はキーのタイプで V
、は値のタイプです。 DeltaSet は、 Spark クラスタに保存され、変換されるキーと値のペアのコレクションを表します。 キーは 1 つの値にのみ関連付けることができます。
-
K
— com.here.platform.data.processing.catalog.Partition.Key
パーティションカタログ内のプラットフォームを識別するキーであることがよくあります。 ただし、 K
Serializable で 暗黙 的な順序 付けが定義されている任意のタイプを指定できます。 これらのタイプの例には、文字列、整数、およびタプルがあります。
-
V
—多くの場合、プラットフォームカタログ内のデータ、または実際のデータがカタログに保存されているcom.here.platform.data.processing.blobstore.Payload
をcom.here.platform.data.processing.catalog.Partition.Meta
特定します。 ただし V
、整数または文字列を含めて、任意のタイプを指定できます。
たとえば、プラットフォームカタログレイヤーの内容を読み取る DeltaSet[Key, Meta]
と、になります。
DeltaSet は常に不変ですが 、変換 を適用することで DeltaSet が変換されます。 たとえば、変換mapValues({x => x + 1})
を使用して、すべての値が 1 ずつ増加する新しいDeltaSet[Key, Int]
にDeltaSet[Key, Int]
を変換できます。
DeltaSet がDeltaSet[Key, Payload]
に変換さ れ 、出力カタログに必要なペイロードが含まれるよう になると、 DeltaSet を公開できます。 その結果、が作成 PublishedSet
され、出力カタログの新しいバージョンとしてコミットできます。
変換 は常に遅延します。つまり、変換は出力カタログをコミットしたときにのみ実行されます。 つまり、 DeltaSet はプラットフォームカタログにコミットされるまで評価されません。
例 : レイヤーをコピーします
この例では、あるカタログから別のカタログにレイヤーをコピーするパイプラインを実装する方法を示します。
DeltaSets を使用する最も簡単な方法は、アプリケーションのMain
オブジェクトでDeltaSimpleSetup
を拡張することです。 パイプラインの標準設定ファイルおよびコマンド ラインオプションのサポートを追加するには、この例で はPipelineRunner
特性も拡張 して ( 「ドライバの設定と実行」を参照 ) 、Main
オブジェクトに 次のスケルトンを指定します。
import com.here.platform.data.processing.catalog.{Catalog, Layer}
import com.here.platform.data.processing.driver._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.deltasets._
import com.here.platform.data.processing.driver.runner.pipeline.PipelineRunner
object Main extends PipelineRunner with DeltaSimpleSetup {
val applicationVersion: String = "1.0"
def setupSets(completeConfig: CompleteConfig, context: DeltaContext): Iterable[PublishedSet] = {
???
}
}
パイプラインの処理ロジックを定義するsetupSets
メソッドを実装するようにMain
オブジェクトをDeltaSimpleSetup
要求します。 DeltaSets を使用して定義された処理ロジックは、さまざまな方法で構造化できます。通常は、次の 4 つのフェーズで構成されます。
-
Key
Meta
1 つ以上の入力カタログからとのペアを照会すると、になり DeltaSet[Key, Meta]
ます。 - メタデータに対応するペイロードを取得すると、
DeltaSet[Key, Payload]
になります。 - ペイロードに保存されているデータを変換し、キーを書き換えてターゲットカタログとターゲットレイヤーを保存します。
- 変換されたペイロードを公開
PublishedSet
すると、その結果が出力カタログにコミットされます。
この例では、ステップ 3 でペイロードを変換するのではなく、変更せずにペイロードをコピーします。
-
クエリ : 1 つ以上の入力カタログからKey
AND Meta
のペアを照会する場合、 setupSets
メソッドは引数 A DeltaContext
としてを提供 します。この引数には、特に入力カタログへのアクセス権が付与されます。
import com.here.platform.data.processing.catalog.Partition._
val keyMetas: DeltaSet[Key, Meta] =
context.queryCatalogLayer(Catalog.Id("inCatalogA"), Layer.Id("inLayerA"))
-
取得 : Payload
メタデータに対応するを取得するに Retriever
は、コンテキストから DeltaSet 変換をインポートし、対応するカタログのオブジェクトのインスタンスを取得します。 変換が可能に keyMetas
なります。
import com.here.platform.data.processing.blobstore.Payload
import context.transformations._
val retriever = context.inRetriever(Catalog.Id("inCatalogA"))
val keyPayloads: DeltaSet[Key, Payload] =
keyMetas.mapValuesWithKey((key, meta) => retriever.getPayload(key, meta))
-
プロセス : キーを書き換えてターゲットカタログおよびターゲットレイヤーを保存するに mapKeys
は、操作を使用します。
val rewrittenKeys: DeltaSet[Key, Payload] =
keyPayloads.mapKeys(
OneToOne(
_.copy(catalog = Default.OutCatalogId, layer = Layer.Id("outLayerA")),
_.copy(catalog = Catalog.Id("inCatalogA"), layer = Layer.Id("inLayerA"))
),
PreservesPartitioning
)
-
公開 : 変換されたペイロードをパブリッシュするために、DeltaSet[Key, Payload]
は、パブリッシュ先のレイヤーのセットを引数として受け取るpublish
操作を提供します。
val result: PublishedSet = rewrittenKeys.publish(Set(Layer.Id("inLayerA")))
Iterable(result)
完全な例を以下に示します。
import com.here.platform.data.processing.catalog.{Catalog, Layer, Partition}
import com.here.platform.data.processing.driver._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.deltasets._
import com.here.platform.data.processing.driver.runner.pipeline.PipelineRunner
object Main extends PipelineRunner with DeltaSimpleSetup {
val applicationVersion: String = "1.0"
def setupSets(completeConfig: CompleteConfig, context: DeltaContext): Iterable[PublishedSet] = {
import context.transformations._
val retriever = context.inRetriever(Catalog.Id("inCatalogA"))
Iterable(
context
.queryCatalogLayer(Catalog.Id("inCatalogA"), Layer.Id("inLayerA"))
.mapValuesWithKey((key, meta) => retriever.getPayload(key, meta))
.mapKeys(
OneToOne[Partition.Key, Partition.Key](
_.copy(catalog = Default.OutCatalogId, layer = Layer.Id("outLayerA")),
_.copy(catalog = Catalog.Id("inCatalogA"), layer = Layer.Id("inLayerA"))
),
PreservesPartitioning
)
.publish(Set(Layer.Id("outLayerA")))
)
}
}
このセクションでは、 DeltaSets で現在使用可能な変換について説明します。
ペイロードを公開します
すべての DeltaSet は最終的に、出力カタログで公開するデータを含むペイロードのセットに変換する必要があります。 publish
操作は任意の DeltaSet タイプで利用 DeltaSet[Key, Payload]
でき、この操作によってすべてのペイロードが Data API にアップロードされます。 publish
操作の結果は PublishedSet
からのみ返さ setupSets
れますが、これ以上の変換はできません。 これについては、以下の例で説明します。
val payloads: DeltaSet[Key, Payload] = ???
val published: PublishedSet = payloads.publish(Set(Layer.Id("outLayer")))
publish
同じレイヤーに複数のレイヤーを追加することはできません。 出力レイヤーの一部を個別に公開する必要がある場合 publishPart
は、代わりにを使用します。
(拡張)複数パーツのパブリッシュ
出力レイヤー ( または出力レイヤーのセット ) のジョイント解除された部分をアップロードする場合、および各部分を個別に読み取る必要がある場合は、publish
の代わりにpublishPart
を使用します。 各出力キーは、を使用して、単一のパーツに確定的に割り当てる必要 PublishedPartMapper
があります。
次の例では、キー PartMapperByLayer
をズーム レベルに基づいて別の公開パーツにマップするために、を使用しています。 下位のズーム レベルパーティションが最初に公開され、その後読み取りを行って上位のズーム レベルで集約パーティションを構築します。 unionPublishedParts
この操作は、最終的にすべてのパーツを組み合わせて最終的なパーツにするために使用されます PublishedSet
。
val intermediate: DeltaSet[Key, Payload] = ???
val partMapper = PartMapperByLevel(Set(12, 11))
val firstPart: PublishedPart =
intermediate.publishPart(Set(Layer.Id("multi-level-layer")),
partMapper,
partMapper.partForLevel(12))
val secondPart: PublishedPart = firstPart
.readBack()
.mapGroup({
case (key, meta) => (key.copy(partition = key.partition.ancestors.head), (key, meta))
}, context.defaultPartitioner)
.mapValues { partitions: Iterable[(Key, Meta)] =>
val payload: Payload = Aggregator.aggregate(partitions)
payload
}
.publishPart(Set(Layer.Id("multi-level-layer")), partMapper, partMapper.partForLevel(11))
context.unionPublishedParts(Seq(firstPart, secondPart))
値を変換します
mapValues
DeltaSet mapValuesWithKey
内の値を変換するには、およびを使用します。 これらの操作では DeltaSet のキーは変更されないため、実行時にクラスタ内のワーカーノード間でデータをシャッフルする必要はありません。 このため、これらの操作は非常に効率的です。
次の例では、 a の値 DeltaSet[Key, Int]
がを使用して 1 つずつ増加します mapValues
。
val integers: DeltaSet[Key, Int] = ???
val incrementedIntegers: DeltaSet[Key, Int] = integers.mapValues(_ + 1)
mapValuesWithKey
は類似した操作ですが、変換関数のキーも提供されます。 この操作の詳細については、レイヤーのコピーの例を参照してください。
キーと値を変換します
キーと値の両方を同時に変換する場合、またはキーと値のペアのキーをその値に基づいて変換する場合は、mapUnique
、mapGroup
、またはmapReduce
のいずれかの変換を使用します。 ただし、パフォーマンスに問題がある場合は、mapValues
mapValuesWithKey
またはmapKeys*
のいずれかの操作を使用することを検討してください。
mapUnique
キーが重複していない限り、キーと値のペアを新しいキーと値のペアに変換します。 重複したキーが生成された場合、実行時に変換が失敗します。 クラスタ内のノード間でデータがシャッフルされるため、引数として明示的にパーティション分割者を指定する必要があります。
たとえば、次のスニペットでは、キーと値が 2 つのレイヤーに分割されます。
- 正の値 : レイヤーをに設定
positive_values
します。 - 負の値 : レイヤーをに設定
negative_values
します。
val deltaSet1: DeltaSet[Key, Int] = ???
val split: DeltaSet[Key, Int] =
deltaSet1.mapUnique(
mapFn = { (key, i) =>
if (i >= 0) {
(key.copy(layer = Layer.Id("positive_values")), i)
} else {
(key.copy(layer = Layer.Id("negative_values")), i)
}
},
partitioning = context.defaultPartitioner
)
mapGroup
キーと値のペアを新しいキーと値のペアに変換します。 重複するキーの値がグループ化され、 DeltaSet によって各キーに値のコレクションが割り当てられます。
たとえば、次のスニペットで は、 HERE Tile パーティションに関連付けられているメタデータ が 親 HERE Tile にマッピングされ、グループ化されます。
val deltaSet1: DeltaSet[Key, Meta] = ???
val deltaSet2: DeltaSet[Key, Iterable[Meta]] =
deltaSet1.mapGroup(
mapFn = {
case (key, value) =>
(key.copy(partition = key.partition.parent.getOrElse(key.partition)), value)
},
partitioning = context.defaultPartitioner
)
mapReduce
キーと値のペアを新しいバージョンに変換します。このバージョンでは、指定した Reduce 関数を使用して、重複するキーの値が削減されます。 この Reduce 関数は、 2 つの値を 1 つに結合する必要があります。 を使用 mapReduce
すると、を使用し mapGroup
て各値を減らしてから、を使用するよりも効率的 mapValues
です。
たとえば、次のスニペットで は、 HERE Tile パーティションに関連付けられているすべての整数 が 親 HERE Tile にマッピングされ、各出力 HERE Tile の値がその合計に削減されます。
val deltaSet1: DeltaSet[Key, Int] = ???
val deltaSet2: DeltaSet[Key, Int] =
deltaSet1.mapReduce(
mapFn = {
case (key, value) =>
(key.copy(partition = key.partition.parent.getOrElse(key.partition)), value)
},
reduceFn = _ + _,
partitioning = context.defaultPartitioner
)
これら mapUnique
の、 mapGroup
、 mapReduce
、およびの操作は、キー値のマッピング関数を使用して、正確に 1 つのキーを生成します。 一方、flatMapUnique
、flatMapGroup
、およびflatMapReduce
の操作では、 0 個以上のキーを生成するためのマッピング関数が使用されます。
キーを変換します
DeltaSets は、 DeltaSet 内のキーを値を考慮せずに変更するための一連の変換を提供します。 これらの変換は非常に効率的で、可能であればキーおよび値に基づいて変換よりも優先されます。
たとえば、 mapKeys
は DeltaSet 内のキーのみを読み取りまたは書き込みなしで変換する効率的な操作です。 キーの変換は 1 対 1 である必要があります。つまり、入力 DeltaSet のすべてのキーが出力 DeltaSet の一意のキーにマップされます。 変換が 1 対 1 であることを確認し、効率的な増分処理を可能にするには、 mapKeys
キーマッピング関数と mapFn
その関数の逆関数の両方を指定する必要 inverseFn
があります。
val partitioner = NameHashPartitioner(10)
val input: DeltaSet[Key, Int] = ???
val incrementedIntegers: DeltaSet[Key, Int] =
input.mapKeys(
OneToOne(
mapFn = key => key.copy(layer = Layer.Id("outLayerA")),
inverseFn = key => key.copy(layer = Layer.Id("inLayerA"))
),
partitioner
)
この操作の使用方法の詳細については、レイヤーのコピーの例を参照してください。
DeltaSet にキーが含まれて x
いる場合 inverseFn(mapFn(x)) != x
、実行時に変換が失敗します。 によって作成されたキーでは逆関数を呼び出すことができる mapFn
ため、渡さ inverseFn
れた任意のキーについて正しい結果を返す必要があります。可能なキーのサブセットにのみ定義されている場合は、その逆関数を部分関数として定義できます。
val partitioner = HashPartitioner[String](context.defaultParallelism)
val input: DeltaSet[Int, String] = ???
val stringKeyed: DeltaSet[String, String] =
input.mapKeys(
OneToOne[Int, String](
mapFn = _.toString,
inverseFn = {
case s: String if s forall Character.isDigit => s.toInt
}
),
partitioner
)
逆変換を指定するのが不便または不可能な場合 mapUnique
は、以下で説明するより高価な変換を使用することを検討してください。
flatMapKeys
各入力キーを 0 個以上の出力キー (1 対多 ) にマップする変換です。 mapKeys
各出力キーが 1 つの入力キーの結果であることを示すために、反転関数と同様の関数を渡す必要があります。
複数の入力キーを同じ出力キー (1 対 1 または 1 対 n) にマッピングする場合、キーおよび値の変換と同様に、値のセットをグループ化または削減できます。 DeltaSets は、 4 つの変換を提供して、グループ化 / 削減と n-to-1/m-to-n: のすべての組み合わせをカバーし、コレクション内のすべての値をmapKeysGroup
flatMapKeysGroup
グループ化します。一方、mapKeysReduce
flatMapKeysReduce
Reduce 関数をすべての値に適用します。 flatMapKeysGroup
使用方法の例について は、 DirectMToN コンパイラーの移行を参照してください。
データをフィルタリングします
filterByKey
DeltaSet のキーと値のペアを、キーのみに基づいてフィルタリングします。 この変換は非常に効率的に実行され、 Spark クラスタ内のノード間でデータを交換する必要はありません。
次のスニペットでは 、hmc
カタログからのadministrative_places
レイヤーが照会され、「 1469256839 」で始まる汎用パーティション名のすべてのパーティションキーがフィルタリングされます。 hmc
これは、カタログ ID が HERE Map Content カタログを参照している場合に、オーストラリアのすべての管理場所を読み取ることに対応しています。
import com.here.platform.data.processing.catalog.Partition.Generic
val filteredInput: DeltaSet[Key, Meta] =
context
.queryCatalogLayer(Catalog.Id("hmc"), Layer.Id("administrative_places"))
.filterByKey {
case Key(catalog, layer, Generic(name)) =>
name.startsWith("1469256839")
case _ => false
}
filterByKey
操作でパーティションキーフィルタを使用することもできます。 たとえば 、ベルリン周辺のバウンディング ボックスに属する HERE Tile を使用して、 HERE Tile 名としてパーティションが指定されているパーティションのみをフィルタリングするに は、次のfilterByKey
操作を使用します。
val filteredInput: DeltaSet[Key, Meta] =
context
.queryCatalogLayer(Catalog.Id("hmc"), Layer.Id("road_attributes"))
.filterByKey(
BoundingBoxFilter(
south = 50.97656,
west = 11.95313,
north = 51.06445,
east = 12.04102
)
)
パーティションキーフィルタ は、パスの設定ファイルから定義することも here.platform.data-processing.deltasets.partitionKeyFilters
できます。 設定ファイルで定義されているパーティションキーフィルタは、 query
すべての変換およびに適用 readBack
されます。
データに参加します
join
2 つの DeltaSet を取得して DeltaSet を作成する DeltaSet 変換です。 DeltaSet に含まれる各キーについて、各 DeltaSets のキーに関連付けられている値のペアが含まれます。
val integers: DeltaSet[Key, Int] = ???
val strings: DeltaSet[Key, String] = ???
val pairs: DeltaSet[Key, (Int, String)] = integers join strings
DeltaSets によって提供されるその他の種類の結合変換は次のとおりです。
-
outerJoin
2 つの DeltaSet を取得し、 DeltaSet に含まれている各キーについて、各 DeltaSets のキーに関連付けられている値のペアを含む DeltaSet を作成します。 DeltaSets のいずれかでキーに値が関連付けられていない場合、ペアのエントリはに設定 None
されます。 -
leftOuterJoin
2 つの DeltaSet を取得し、左側の DeltaSet に含まれている各キーについて、各 DeltaSets のキーに関連付けられている値のペアを含む DeltaSet を作成します。 キーが右側の DeltaSet の値に関連付けられていない場合、ペアのエントリはに設定 None
されます。
次の例は、結合の一般的な使用例を示しています。 2 つのレイヤーroad-attributes
topology-geometry
があり、カタログhmc
(HERE Map Content)から照会されます。 両方の DeltaSets のキーが書き換えられ、出力カタログの同じカタログおよびレイヤーが含まれるようになります。 次に、 outerJoin
が計算され、両方のレイヤーのメタデータを含む DeltaSet が作成されます。 このようにして、パーティションの内容を関連付けることができます。
val topology: DeltaSet[Key, Meta] =
context
.queryCatalogLayer(Catalog.Id("hmc"), Layer.Id("topology_geometry"))
.mapKeys(
OneToOne(
_.copy(catalog = Default.OutCatalogId, layer = Layer.Id("outLayerA")),
_.copy(catalog = Catalog.Id("hmc"), layer = Layer.Id("topology_geometry"))
),
context.defaultPartitioner
)
val roadAttributes: DeltaSet[Key, Meta] =
context
.queryCatalogLayer(Catalog.Id("hmc"), Layer.Id("road_attributes"))
.mapKeys(
OneToOne(
_.copy(catalog = Default.OutCatalogId, layer = Layer.Id("outLayerA")),
_.copy(catalog = Catalog.Id("hmc"), layer = Layer.Id("road_attributes"))
),
context.defaultPartitioner
)
val pairs: DeltaSet[Key, (Option[Meta], Option[Meta])] =
topology outerJoin roadAttributes
join
はステートフル変換です が、outerJoin
leftOuterJoin
はステートフルではありません。このため、後の 2 つの変換がより効率的になります。
ユニオンデータ
disjointUnion
2 つ の入力 DeltaSet を受け取り、いずれかの入力 DeltaSets に含まれているすべてのキーと値のペアを含む出力 DeltaSet を生成する DeltaSet 変換です。 両方 の入力 DeltaSets にキーが含まれている場合、操作によって例外がスローされます。
次のスニペットでは、入力 DeltaSet をズーム レベル 10 および 12 のタイルを持つ 2 つの DeltaSet に分割します。 次に、各 DeltaSet が mapValues
文字列に変換さ combined
れ、結果の和が変数に保存されます。
val input: DeltaSet[Key, Meta] = ???
val tilesAt10: DeltaSet[Key, String] =
input
.filterByKey {
case Key(_, _, t: HereTile) => t.quad.getZoomLevel == 10
case _ => false
}
.mapValues(???)
val tilesAt12: DeltaSet[Key, String] =
input
.filterByKey {
case Key(_, _, t: HereTile) => t.quad.getZoomLevel == 12
case _ => false
}
.mapValues(???)
val combined: DeltaSet[Key, String] =
tilesAt10 disjointUnion tilesAt12
上記のスニペットでは、両方の入力 DeltaSets のタイプが完全に同じです。 また、次のスニペットのように、さまざまな値タイプを持つこともできます。 この場合、出力 DeltaSet の値タイプは、入力の一般的な値タイプのスーパータイプです。 次のスニペットで DeltaSet[Key, Int]
は、との和が DeltaSet[Key, String]
として入力さ DeltaSet[Key, Any]
れています。 これは、 Scala で Any
はString
とInt
のスーパータイプであるために有効です。
val integers: DeltaSet[Key, Int] = ???
val strings: DeltaSet[Key, String] = ???
val union: DeltaSet[Key, Any] = integers disjointUnion strings
このコンテキストでは、 2 つ以上 DeltaSet
のののジョイント解除ユニオンを作成できます。
val integers1: DeltaSet[Key, Int] = ???
val integers2: DeltaSet[Key, Int] = ???
val integers3: DeltaSet[Key, Int] = ???
val union: DeltaSet[Key, Int] =
context.disjointUnion(List(integers1, integers2, integers3))
参照を動的に解決します
mapValuesWithResolver
キーとメタのペアの DeltaSet 、サブジェクト、および他のパーティション、リファレンスへの動的なアクセスを変換するために使用できます。 resolveReferences
またはを使用して静的な参照解決を行う代わり RefTreeCompiler
に、必要なすべての参照を事前に計算する必要があります。 一方、ダイナミックリファレンス分解能はより柔軟で、コード行数が少なく、特に複雑なリファレンス構造の場合は、静的リファレンス分解能よりも高速になります。
mapValuesWithResolver
mapValuesWithKey
ただし、各サブジェクトに適用されるマッピング関数には、サブジェクトのキーとメタデータ、 Resolver
およびサブジェクトが参照できる任意のキーのメタデータを決定するの 3 つの引数があります。 サブジェクトのメタデータおよびリファレンスを使用して、対応するペイロードを取得できます。
は Resolver
、 1 つ以上のを使用 ResolutionStrategy
して、キーに対応するメタデータを検索します。 このような戦略の 1 つにDirectQuery
、 Data API を介してメタデータを直接要求する方法があります。この方法は簡単ですが、解決されたメタデータごとに 1 つのネットワーククエリーが必要です。 次のセクションでは、大量のメタデータを一度にダウンロードすることで、より効率的に解決できる 3 つの解決方法について説明します。
次のスニペットでは、カタログinA
のレイヤーA
にあるパーティションからカタログinA
のレイヤーB
にあるパーティションへの参照を解決し ます。 レイヤー内の各パーティション A
は、レイヤー内のパーティションの名前を参照 B
します。 通常、たとえば HERE Map Content を処理する場合、パーティションの名前は他のデータと一緒に保存されます。 ただし、この例では、レイヤーのパーティションに他の情報がないと仮定 A
しています。
まず、 DeltaSet A
にレイヤーをクエリー subjects
します。 次に mapValuesWithResolver
、マッピング関数と strategies
パラメーターを渡して呼び出し DirectQuery
ます。このパラメーターはに設定されています。
マッピング機能内 :
- レイヤー内のパーティションを取得
A
するには、を使用 retriever
します。 - パーティションの内容を文字列
referenceName
に変換し、レイヤーB
内のreferenceName
パーティションを参照するKey
オブジェクトを作成します。 - 参照に対応するメタデータを取得するには、リゾルバを使用します。
- 参照されているパーティションが存在しない場合は、例外がスローされます。
- それ以外の場合は、そのパーティションを取得します。
val retriever = context.inRetriever(Catalog.Id("inA"))
val subjects: DeltaSet[Key, Meta] =
context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("A"))
subjects.mapValuesWithResolver(
mapFn = {
case (resolver, key, meta) =>
val referenceName = new String(retriever.getPayload(key, meta).content)
val referenceKey = Key(Catalog.Id("inA"), Layer.Id("B"), Generic(referenceName))
resolver.resolve(referenceKey) match {
case None => throw new Exception("Partition does not exist!")
case Some(referenceMeta) =>
val referencePartition = retriever.getPayload(referenceKey, referenceMeta)
???
}
},
strategies = List(DirectQuery(Catalog.Id("inA"), Set(Layer.Id("B"))))
)
解決戦略
現在、次の 4 つの解決策があります。
-
DirectQuery
-- カタログと一連のレイヤーを指定すると 、この方法では Data API を介して各キーのメタデータを直接取得します。 リクエストの結果は、実行者ごとにキャッシュされます。 キャッシュのサイズは、のコンストラクタに引数を渡すことによって設定でき DirectQuery
ます。デフォルトは 10000 メタデータオブジェクト(約 3MB )です。
-
Broadcast
-- メタデータを含む DeltaSet を指定すると、この戦略はメタデータの完全なコピーを各 Spark 実行者に送信し、ネットワークリクエストを追加せずに各実行者でメタデータ全体を利用できるようにします。 内部的には、この方法では Spark ブロードキャスト変数が使用されます。 メタデータの量によっては、各 e キュトールに大量のメモリが必要になることがあります。 メタデータの保存に必要なメモリは、 DeltaSet のパーティションあたり約 300 バイトです。
-
BackwardResolution
--DeltaSet に参照が含まれており、各参照を一連のサブジェクトパーティションにマップする関数があるとし BackwardResolution
ます。は、サブジェクトパーティションを処理するときに、これ以上のネットワーククエリーを実行せずに、これらの参照をリゾルバに公開します。 たとえば、次のスニペットでは、レイヤーの各パーティション A
がレイヤーのすべての子とグループ化 B
されています。
BackwardResolution(
context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("B")), { (key, meta) =>
Set(key.copy(layer = Layer.Id("A"), partition = key.partition.parent.get))
}
)
事前定義 BackwardResolution.toSamePartition
された後方解決方式では、各サブジェクトパーティションが同じ名前の参照パーティションでグループ化されます。 同様 BackwardResolution.toNeighbors
に、各サブジェクトタイルを、指定した深さのすべてのネイバータイルでグループ化します。
-
ForwardResolution
--DeltaSet に参照が含まれており、各サブジェクトを参照パーティションのセットにマップする関数があるとし ForwardResolution
ます。は、サブジェクトパーティションの処理時に、これ以上のネットワーククエリーを実行せずに、これらの参照をリゾルバに公開します。 この方法はの逆です BackwardResolution
。 ForwardResolution
主題の DeltaSet のキータイプと値タイプをタイプパラメータとして取得します。 次のスニペットは、すべてのネイバータイルで各サブジェクトタイルをグループ化する方法を示しています。
ForwardResolution[Key, Meta](
context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("B")), {
case (key @ Key(_, _, tile: HereTile), meta) =>
tile
.neighbors(1)
.map(tile => key.copy(layer = Layer.Id("B"), partition = tile))
}
)
mapValuesWithResolver
解決策の一覧表を取得して、順番に組み合わせることができます。 たとえば、次のスニペットの解決方法の一覧表を検討します。 レイヤーのパーティションを処理 A
し、レイヤー内のタイルへの参照を解決する場合 B
、現在処理されているタイルの直接のネイバーはすべて、ネットワークリクエストなしで利用できます。 レイヤー内の他のタイルを解決 B
するには ( 遠くにいるか、別のズーム レベルで ) 、ネットワークリクエストが実行されます。 レイヤーC
への参照を解決するには、常にネットワークリクエストを使用し、レイヤーD
内の参照はブロードキャストを使用して解決します。
val strategies =
List(
BackwardResolution.toNeighbors(
context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("B")),
Catalog.Id("inA"),
Layer.Id("A")
),
DirectQuery(Catalog.Id("inA"), Set(Layer.Id("B"), Layer.Id("C"))),
Broadcast(context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("D")))
)
mapValuesWithResolver
はDeltaSet[Key, Meta]
に適用できます。この場合、サブジェクトDeltaSet
のパーティション作成者もパーティションリファレンスに使用されます。 キーまたは値の種類が異なるDeltaSet
にmapValuesWithResolver
が適用されている場合は、参照のパーティションを作成する必要があります。 たとえば、このスニペットでは、が DeltaSet[HereTile, String]
変換されます。
val deltaSet: DeltaSet[HereTile, String] = ???
deltaSet.mapValuesWithResolver(
(r, k, s) => ???,
Seq(
ForwardResolution(
context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("B")), {
case (tile, string) =>
tile
.neighbors(1)
.map(tile => Key(Catalog.Id("inA"), Layer.Id("B"), tile))
}
)),
HashPartitioner(12)
)
参照を静的に解決します
resolveReferences
RefTreeCompiler の最初のステップと同じ機能を実装する変換です。カタログパーティション間の参照関係を指定すると、変換によって DeltaSet 内の各パーティションがその参照でグループ化されます。 参照関係は次のものによって定義されます。
- RefTree。パーティション間に存在する可能性のある参照の種類を指定します
- パーティションによって参照されるパーティションキーの具体的なセットを計算する RESOLVE 関数については 、「 RefTreeCompiler」を参照してください。
次のスニペットでは、パーティション名が HERE Tile である各キーとメタのペアをdeltaSet1
でグループ化するためにresolveReferences
使用され、すべてのネイバーのすべてのキーとメタのペアでグループ化されます。
import com.here.platform.data.processing.compiler.reftree._
val deltaSet1: DeltaSet[Key, Meta] = ???
val deltaSet2: DeltaSet[Key, (Meta, Map[Key, Meta])] =
deltaSet1.resolveReferences(
RefTree(
Subject((Catalog.Id("inCatalogA"), Layer.Id("inLayerA")),
Ref(RefTree.RefName("neighbor"),
(Catalog.Id("inCatalogA"), Layer.Id("inLayerA"))))),
resolveFn = {
case (Key(catalog, layer, partition: HereTile), meta) =>
val neighbor = partition.neighbors(radius = 1) - partition
Map(RefTree.RefName("neighbor") -> neighbor.map(neighborTile =>
Key(catalog, layer, neighborTile)))
case _ => Map.empty
}
)
メモ :resolveReferences
での参照解決手順とは RefTreeCompiler
異なる動作が 1 つの詳細に示されます。 パーティションキーフィルタ が設定ファイルで定義されている場合、これらのフィルタは参照とサブジェクトの両方に適用されます。 で RefTreeCompiler
は、サブジェクトのみがフィルタリングされます。
公開されたデータを読み取ります
PublishedSet
の場合、readBack
は適用できる唯一の変換です。 PublishedSet
レイヤー にパーティションを公開した結果として作成されたが、公開後にそのレイヤーに含まれているすべてのパーティションのキーとメタデータを含む DeltaSet に変換されます。 この方法では、以前の処理ステップで公開されたパーティションを読み取り戻して、次の手順で使用できます。
以下のコード スニペットでは、出力カタログレイヤーに公開されたintermediate
結果をreadBack
使用して読み取ります。 readBack
次に、を disjointUnion
入力カタログ hmc
( HERE Map Content )のレイヤーと組み合わせて、これらのレイヤー間の参照を解決します。
val intermediate: DeltaSet[Key, Payload] = ???
val intermediatePublished: PublishedSet = intermediate.publish(Set(Layer.Id("intermediate")))
val intermediateAndTopology: DeltaSet[Key, Meta] =
intermediatePublished.readBack() disjointUnion
context.queryCatalogLayer(Catalog.Id("hmc"), Layer.Id("topology_geometry"))
val references: DeltaSet[Key, (Meta, Map[Key, Meta])] =
intermediateAndTopology.resolveReferences(
RefTree(
Subject((Default.OutCatalogId, Layer.Id("intermediate")),
Ref(RefTree.RefName("intermediate_to_topology"),
(Catalog.Id("hmc"), Layer.Id("topology_geometry"))))),
resolveFn = ???
)
DDs を DeltaSets に変換します
toDeltaSet
Spark RDD を DeltaSet に変換するために使用できる操作です。 たとえば、 Spark を使用して他のソースからデータを取り込み、 DeltaSets を使用する処理パイプラインに統合する場合に使用できます。 RDD にはキーと値のペアが含まれている必要があります。また、同じキーを持つ複数のペアを含めることはできません。 RDD のパーティションを再作成するパーティション分割者を渡す必要 toDeltaSet
があります。 RDD がすでに特定のパーティション分割者とパーティション分割されていない限り、 RDD の再分割によってシャッフルが発生します。
作成された DeltaSet には、パイプラインの最後の実行以降の変更に関する情報が含まれていません。パイプラインの増分実行中であっても、 Downstream DeltaSet 変換は DeltaSet 内のすべてのデータを処理します。
次のスニペットでは、 Spark DDs 経由で csv ファイルを DeltaSet にインジェストする方法を示します。 DeltaContext
を介してアクセスできるSparkContext
を使用して、 csv ファイルairports.csv
を読み取ります。 次に、 RDD をキー値のフォームに変換します。このフォームでは、 csv ファイルの最初の列がキーとして機能します。 最後 toDeltaSet
に、 RDD を DeltaSet に変換するために使用します。
val sc = context.driverContext.spark
val rowsByFirstColumn: RDD[(Key, Array[String])] =
sc.textFile("airports.csv").map { x =>
val columns = x.split(",").map(_.trim)
(Key('outCatalog, 'outLayer, Generic(columns(0))), columns.drop(1))
}
val deltaSet: DeltaSet[Key, Array[String]] =
rowsByFirstColumn.toDeltaSet(context.defaultPartitioner)
Spark のパーティション分割とシャッフル
DeltaSet 内のデータは、常に特定 のパーティション作成者に従ってパーティション化されます。このパーティション分割者は、 DeltaSet 内の各キーを Spark パーティションに割り当て、その結果、データが存在するクラスタ内のノードに割り当てます。 DeltaSet 変換中は、変換で明示的に新しいパーティション作成者が指定されていない限り、常にパーティション分割者が保持されます。 特に repartition
、この変換では、パーティション分割者の変更と、新しいパーティション分割者に従ってデータの再分割のみが行われます。
DeltaSet 内のキーを変換する可能性のあるすべての変換、またはパーティション作成者を変更するすべての変換では、データのパーティション分割を再実行する必要があります。したがって、データのシャッフルと呼ばれるクラスタ内のノード間でデータを移動できます。 シャフリングは高価な操作であるため、回避する必要があります。 データ のシャッフルに必要な DeltaSet 変換については、変換プロパティの表を参照してください。
パーティション分割の戦略
データをシャッフルする各変換では、結果のキーのパーティション分割方法を明示的に指定する必要があります。 このパーティション分割方式は、パーティション分割方式または PreservesPartitioning
特別な値のいずれかになります。
パーティャーを使用する場合、変換ではそのパーティション分割者を使用して結果を再分割します。 変換のパフォーマンス要件がない場合は、DeltaContext
の [defaultPartitioner
] フィールドを使用できます。
DeltaSet 内のキーが変換によって変更されても、各キーが同じ Spark パーティション内に残っている場合 PreservesPartitioning
は、を使用できます。 この戦略によって、データのシャッフルによる変換が完全に回避され、パフォーマンスが大幅に改善されます。 変換でパーティション分割を保持できない場合は、実行時に例外がスローされます。
レイヤーのコピーの例では、 DeltaSet 内のキーのカタログとレイヤーを変更するためにmapKeys
使用します。 カタログとレイヤーに関係なく、同じ Spark パーティション内の同じ名前のすべてのカタログパーティションをグループ化する、defaultPartitioner
、PartitionNamePartitioner
とパーティション分割されます。 その結果、すべてのカタログパーティションが同じ Spark パーティション内に残り、使用でき PreservesPartitioning
ます。
PreservesPartitioning
アップストリームとダウンストリームのキータイプ DeltaSet
が同じではない場合でも、パーティション分割方式を使用でき DeltaSet
ます。ただし、アップストリームがダウンストリームキータイプも処理できるように一般的なパーティション分割者によって分割されている必要があります。 この高度な機能を使用すると、 Partition.Key
たとえば、キーのある DeltaSet を Partition.Name
、 Spark shuffle を作成せずにキーのある DeltaSet にマップできます。
val catalog: Catalog.Id = ???
val layer: Layer.Id = ???
context
.queryCatalogLayer(catalog, layer)
.mapKeys(
OneToOne[Key, Name](_.partition, Key(catalog, layer, _)),
PreservesPartitioning
)
HERE を PreservesPartitioning
使用できます。これは、で使用されるデフォルトのパーティャー queryCatalogLayer
が PartitionNamePartitioner
、アップストリームキータイプ、 Partition.Key
、およびダウンストリームキータイプの両方をサポートするであるため Partition.Name
です。
データの遅延と永続性
DeltaSet が 2 つ以上の変換で使用される場合は、結果が 2 回以上再計算されないように、結果がメモリまたは Spark ワーカーのディスクに保持される必要があります。 これについて は、 RDD Persistence ポリシーの RDD についてより詳細に説明 し、 DeltaSets に均等に適用されます。 次 persist
の例のように、 DeltaSet を再利用できるように永続化するには、変換を使用します。
import org.apache.spark.storage.StorageLevel
val deltaSet1: DeltaSet[Key, Int] = ???
val doubled = deltaSet1
.mapValues(_ * 2)
.persist(StorageLevel.MEMORY_AND_DISK_2)
val reuse1 =
doubled
.mapValues(x => Payload(BigInt(x).toByteArray))
.publish(Set(Layer.Id("outLayer1")))
val reuse2 =
doubled
.mapValues(x => Payload(BigInt(x / 2).toByteArray))
.publish(Set(Layer.Id("outLayer2")))
パフォーマンスのプロパティ
さまざまな変換の内部実装の時間と空間のコストは異なり、外部から直接見ることはできません。 特に、特定の変換を他の変換よりも高価にするプロパティが 2 つあります。つまり、シャフリングとステートフル変換です。
データをシャッフルする変換によって、クラスタ内のノード間でデータが移動されます。この処理では、帯域幅が使用され、計算の速度が低下します。
ステートフルである変換で は、差分計算を可能にするために、補助情報を出力カタログのstate
レイヤーに保存する必要があります。 つまり、出力カタログで余分なストレージが消費され、状態の計算には余分な時間が必要になります。また、計算が終了するまで状態を維持するには、クラスタのノードで追加の RAM が必要になります。
ほとんどのパイプラインでは、ステートフルやシャフリングの操作を使用することは避けられませんが、次の表を参照すると、可能な限りステートフルやシャフリングの変換を回避できます。
操作 | シャッフル ? | ステートフル? |
detectChanges | いいえ | はい |
disjointUnion | いいえ | いいえ |
filterByKey | いいえ | いいえ |
flatMapGroup | はい1 | はい |
flatMapKeys | はい1 | いいえ |
flatMapKeysGroup | はい1 | いいえ |
flatMapKeysReduce | はい1 | いいえ |
flatMapReduce | はい1 | はい |
flatMapUnique | はい1 | はい |
join | いいえ | はい |
leftOuterJoin | いいえ | いいえ |
mapGroup | はい1 | はい |
mapKeys | はい1 | いいえ |
mapKeysGroup | はい1 | いいえ |
mapKeysReduce | はい1 | いいえ |
mapReduce | はい1 | はい |
mapUnique | はい1 | はい |
mapValues | いいえ | いいえ |
mapValuesWithKey | いいえ | いいえ |
mapValuesWithResolver | はい | はい |
outerJoin | いいえ | いいえ |
persist | いいえ | いいえ |
publish | いいえ | いいえ |
publishPart | いいえ | いいえ |
readBack | いいえ | いいえ |
repartition | はい | いいえ |
resolveReferences | はい | はい |
toDeltaSet | はい | いいえ |
1。 PreservesPartitioning
パーティション分割方式としてが使用されている場合、シャフリングを回避できます。 素晴らしいホテルです
ステートフル 変換によって生じるオーバーヘッドを避けるため、可能な限り避けることをお勧めします。 たとえば、ステートフル join
の代わりにステートレス outerJoin
を使用したり、ステートフル mapGroup
を使用する代わりにステートレス mapKeysGroup
および mapValues
でキーおよび値を別々に変換したりできます。
forceStateless
設定 オプションを使用すると、すべての変換をステートレスにすることができます。 これにより、状態によって生じるオーバーヘッドが回避されますが、増分実行中にパーティション間の依存関係を変換で追跡することはできません。 実際には、DeltaSet は増分実行で、アップストリーム DeltaSet がまったく変更されなかったと判断できない限り、変更されたかどうかにかかわらず、すべてのアップストリームパーティションを処理する必要があります。 後者の場合、の値に関係 forceStateless
なく、処理を行う必要はありません。 特に例外は detectChanges
で、forceStateless
が設定されている場合でも、アップストリーム DeltaSet の変更された部分のみを処理できます。 detectChanges
ただし、状態を持たない変更済みパーティションのセットを削減することはできませ forceStateless
ん。また、を設定することで効果的に無効になります。
比較と移行
このセクションでは、 DeltaSets をデータ プロセッシング ライブラリで分散計算を表現する他の方法と比較し、より柔軟性の低いインターフェイスから DeltaSets に移行する方法を提供します。
ファンクショナルパターン
ファンクショナルパターンの以前のユーザーは、 DeltaSets を カスタム コンパイルパターンを作成する方法として理解でき ます。つまり、特定のアプリケーションで必要な数のresolveFn
、compileInFn
、、およびcompileOutFn
の関数を含むコンパイラを使用できます。 ただし、 DeltaSets には、計算を構造化する方法が多数用意されています。たとえば、 1 つのcompileInFn
結果を複数のcompileOutFn
で再利用したり、複数のcompileInFn
の結果を結合したり、中間結果を出力カタログにパブリッシュしたりできます。
DeltaSet 変換で表される MapGroupCompiler は、次のものに対応します。
flatMapGroup
を使用して結果をinLayers
グループ化し、すべてのキーとメタのペアにcompileIn
関数を適用します compileOut
機能を使用して、中間データのグループをペイロードに変換します - 結果を公開しています
以下に完全な例を示し TODO
ます。タグは、中間データ、入力レイヤー、および MapGroupCompiler を定義する場所を示します。
import com.here.platform.data.processing.blobstore.Retriever
import com.here.platform.data.processing.catalog._
import com.here.platform.data.processing.compiler.{CompileOut1To1Fn, MapGroupCompiler}
import com.here.platform.data.processing.driver._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.deltasets._
object MapGroupMain extends DeltaSimpleSetup {
case class IntermediateData()
def inLayers: Map[Catalog.Id, Set[Layer.Id]] =
???
def constructMapGroupCompiler(retrievers: Map[Catalog.Id, Retriever])
: MapGroupCompiler[IntermediateData] with CompileOut1To1Fn[IntermediateData] =
???
def setupSets(completeConfig: CompleteConfig, context: DeltaContext): Iterable[PublishedSet] = {
import context.transformations._
val retrievers = inLayers.map { case (c, _) => (c, context.inRetriever(c)) }
val compiler = constructMapGroupCompiler(retrievers)
val partitioner =
compiler.outPartitioner(context.defaultParallelism).getOrElse(context.defaultPartitioner)
val result =
context
.queryCatalogs(
inLayers
)
.flatMapGroup(
Function.untupled(compiler.compileInFn),
partitioner
)
.mapValuesWithKey(
compiler.compileOutFn
)
.publish(
compiler.outLayers
)
Iterable(result)
}
}
DeltaSet 変換で表される RefTreeCompiler は、次のものに対応します。
- を使用して、サブジェクトパーティションのセットとその参照をグループ化します
resolveReferences
compileIn
すべてのサブジェクトと参照のペアに関数を適用し、を使用して結果をグループ化します flatMapGroup
compileOut
機能を使用して、中間データのグループをペイロードに変換します - 結果を公開しています
以下に完全な例を示し TODO
ます。タグは、中間データ、入力レイヤー、および RefTreeCompiler を定義する場所を示します。
import com.here.platform.data.processing.blobstore.Retriever
import com.here.platform.data.processing.catalog._
import com.here.platform.data.processing.compiler.reftree.CompileInFnWithRefs
import com.here.platform.data.processing.compiler.{CompileOut1To1Fn, RefTreeCompiler}
import com.here.platform.data.processing.driver._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.deltasets._
object RefTreeMain extends DeltaSimpleSetup {
case class IntermediateData()
def inLayers: Map[Catalog.Id, Set[Layer.Id]] =
???
def constructTestRefTreeCompiler(
retrievers: Map[Catalog.Id, Retriever]): RefTreeCompiler[IntermediateData]
with CompileInFnWithRefs[IntermediateData]
with CompileOut1To1Fn[IntermediateData] =
???
def setupSets(completeConfig: CompleteConfig, context: DeltaContext): Iterable[PublishedSet] = {
import context.transformations._
val retrievers = inLayers.map { case (c, _) => (c, context.inRetriever(c)) }
val compiler = constructTestRefTreeCompiler(retrievers)
val partitioner =
compiler.outPartitioner(context.defaultParallelism).getOrElse(context.defaultPartitioner)
Iterable(
context
.queryCatalogs(
compiler.inLayers
)
.resolveReferences(
compiler.refStructure,
Function.untupled(compiler.resolveFn)
)
.flatMapGroup(
{ case (k, (v, refs)) => compiler.compileInFn((k, v), refs) },
partitioner
)
.mapValuesWithKey(
compiler.compileOutFn
)
.publish(
compiler.outLayers
)
)
}
}
DeltaSet 変換で表現されたダイレクト M:N コンパイラーは、次のものに対応します。
compileIn
およびを使用して、各入力キーと値のペアの中間データを計算 mapValuesWithKey
します。 mappingFn
およびを使用 flatMapKeysGroup
して、すべての入力キーと値のペアを対応する出力キーにマッピングします。 HERE では、の逆も mappingFn
指定する必要があります。 compileOut
機能を使用して、中間データのグループをペイロードに変換します - 結果を公開しています
以下に完全な例を示します。 TODO
タグは、中間データ、入力レイヤー、の逆および mappingFn
を定義する場所を示し DirectMToNCompiler
ます。
import com.here.platform.data.processing.blobstore.Retriever
import com.here.platform.data.processing.catalog._
import com.here.platform.data.processing.compiler.direct.CompileInFn
import com.here.platform.data.processing.compiler.{CompileOut1To1Fn, DirectMToNCompiler}
import com.here.platform.data.processing.driver._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.deltasets._
object DirectMToNMain extends DeltaSimpleSetup {
case class IntermediateData()
def inverseMappingFn: Partition.Key => Iterable[Partition.Key] = ???
def inLayers: Map[Catalog.Id, Set[Layer.Id]] =
???
def constructTestDirectMToNCompiler(retrievers: Map[Catalog.Id, Retriever]): DirectMToNCompiler[
IntermediateData] with CompileInFn[IntermediateData] with CompileOut1To1Fn[IntermediateData] =
???
def setupSets(completeConfig: CompleteConfig, context: DeltaContext): Iterable[PublishedSet] = {
import context.transformations._
val retrievers = inLayers.map { case (c, _) => (c, context.inRetriever(c)) }
val compiler = constructTestDirectMToNCompiler(retrievers)
val partitioner =
compiler.outPartitioner(context.defaultParallelism).getOrElse(context.defaultPartitioner)
Iterable(
context
.queryCatalogs(
compiler.inLayers
)
.mapValuesWithKey {
case (k, v) =>
compiler.compileInFn((k, v))
}
.flatMapKeysGroup(
ManyToMany(
compiler.mappingFn,
inverseMappingFn
),
partitioner
)
.mapValuesWithKey(
compiler.compileOutFn
)
.publish(
compiler.outLayers
)
)
}
}
は Direct1toNCompiler
、類似した一連の変換を flatMapKeysGroup
使用してで置き換えることができ flatMapKeys
ます。
Spark RDD ベースのパターン
RDD と同様 に、 DeltaSet は、マシンのクラスタに分散されたデータを表し、filterKeys
またはmapValues
などの一連の機能的な操作を使用して変換できます。 実際、 DeltaSet は内部的に RDD を使用してそのデータを表します。 ただし、 DeltaSets の使用と DDs の直接の使用には、主に次の 3 つの違いがあります。
-
キー値: RDD とは異なり、 DeltaSet にはキーと値のペアのみが含まれ、重複するキーは含まれません。 map
そのため、たとえば DeltaSets には単純な操作はありません。重複するキーが作成される可能性があります。
-
パーティション分割が強い: さらに、 DeltaSet は、常に特定のパーティション作成者に従ってパーティション分割されます。このパーティションでは、各キーと値のペアに 1 つの Spark パーティションが割り当てられ、次にその がクラスタ内の特定のマシンに保存されます。 RDD
一方、は、データのパーティション分割方法について特定のパーティション分割者が定義されていなくても、データを保存できます。
-
増分: DeltaSet で表現されたすべての計算は 、 DepCompiler および IncrementalDepCompiler で行われた依存関係を手動で追跡することなく、増分的に実行できます。
複数のコンパイラーによるタスク
複数のコンパイラーを使用するタスクでは、コンパイラーの結果をカタログレイヤーにアップロードし、そのカタログレイヤーの内容を別のコンパイラーにダウンロードすることで、複数のコンパイラーの効果をチェーン化できます。 DeltaSets では、複数のコンパイラの効果をチェーン化することもできますが、中間結果を複数のコンパイラタスクで行った場合と同じように、出力カタログからパブリッシュして読み取るかどうかを決定できます。 DeltaSets を使用して複数のコンパイラータスクをエミュレートする方法の詳細について readBack
は、変換を参照してください。
ID と設定
DeltaSet 変換の動作を設定ファイルから変更すると便利な場合があります。たとえば、アプリケーションを再コンパイルせずに変換のパフォーマンスパラメータを調整できます。 DeltaSet の各変換には一意の ID が割り当てられ、設定ファイル内のすべての変換を識別するために使用されます。 設定ファイルから変換を設定するに withId
は、関数を呼び出して ID を割り当てます。この関数は、直前の変換の ID を設定します。
val deltaSet1: DeltaSet[Key, Int] = ???
val doubled: DeltaSet[Key, Int] = deltaSet1.mapValues(_ * 2).withId("doublingMap")
この方法で ID を割り当てなかった変換には、ソースコードでの表示順に基づいて自動的に ID が割り当てられます。 ID を確認する id
には、変換の結果として作成された DeltaSet のメソッドを呼び出します。
ID は DeltaSet
、だけでなく PublishedSet
、にも割り当てられます 両方とも、共通のスーパークラスのクラスを識別および設定する機能を共有 BaseSet
します。
パイプライン内でモジュール性を確立する場合 BaseSet
は、のグループを共通の名前空間にラップすると便利です。 ID はこの名前空間内で一意である必要があり、名前空間は Spark UI のログメッセージおよび名前付き DDs に含まれています。 BaseSet.Namespace.enter
新しい名前空間を入力する場合に使用します。 名前空間はネストできます。
val deltaSet1: DeltaSet[Key, Int] = ???
BaseSet.Namespace.enter("routingModule") {
val doubled: DeltaSet[Key, Int] = deltaSet1
.mapValues(_ * 2)
.withId("doublingMap")
}
DeltaSets は ' パイプラインapplication.conf
にセクションを追加することによって構成でき ます詳細については 'Configuring the ライブラリを参照してください すべて の変換および各変換に適用されるデフォルト値の両方を個別に設定できます。 以下のスニペットは、デフォルトの設定を示し here.platform.data-processing.deltasets.default
、で定義されている各オプションについて説明しています。 デフォルトの設定を変更するに application.conf
は、このスニペットをにコピーし、必要に応じて値を変更します。
here.platform.data-processing.deltasets.default {
intermediateStorageLevel = "MEMORY_AND_DISK"
validationLevel = "SAFETY"
threads = 1
sorting = false
incremental = true
forceStateless = false
}
設定のhere.platform.data-processing.deltasets.id
パスにある ID id
を使用して変換を設定できます。 たとえば、次のスニペットは doublingMap
、変換で使用されるスレッド数を設定します。
here.platform.data-processing.deltasets.doublingMap {
threads = 3
}
設定ファイルは、変換の名前空間に準拠しています。 たとえば、 here.platform.data-processing.deltasets.namespace1
を使用 namespace1
して、名前空間で作成されたすべての変換を設定できます。 ID id1
を持つ変換がnamespace1
内部で作成された場合、 here.platform.data-processing.deltasets.default
here.platform.data-processing.deltasets.namespace1
およびhere.platform.data-processing.deltasets.namespace1.id1
で定義されている設定をこの順序で適用することで、その設定が作成されます。
設定ファイルから読み取られた設定は、アプリケーションコードでプログラムによって上書きすることもできます。 プログラムによる上書きは、設定ファイルで定義されているすべての設定よりも優先されます。
val deltaSet1: DeltaSet[Key, Int] = ???
val doubled: DeltaSet[Key, Int] = deltaSet1
.mapValues(_ * 2)
.withConfigOverride(
c => c.withIncremental(false)
)