推定アルゴリズムで estimateFn は、関連する各入力パーティションについてが評価されます。 これらは両方のタイプのパーティション(通常は HERE Tile )にすることができますが、汎用パーティションを概算入力として使用することもできます。 また estimateFn 、この機能にはメタデータも提供されています。
で estimateFn は、 HERE Tile ID と重みのコレクションを返して、データの密度を記述します。 地理的な場所の密度または重さを概算値に指定します。 estimateFn 同じ HERE Tile ID を返す複数のコールがある場合、それらの重みが合計されます。 概算見積書機能によって作成されたタイル ID には、任意のレベルを指定できます。
class MyEstimateFn extends AdaptivePatternEstimateFn {overridedef inLayers: Map[Catalog.Id, Set[Layer.Id]]={// These are the catalog and layers that contribute to the estimation
Map(
Ids.RoadCatalog -> Set(Ids.RoadJunctions, Ids.RoadGeometries),
Ids.ExtraCatalog -> Set(Ids.ExtraContent))}overridedef inPartitioner(parallelism:Int): Option[Partitioner[InKey]]={// There is in general no need to dictate which Spark partitioner the// AdaptivePatternEstimator has to use for its internal operations,// so we return None here, but this possibility is not prevented.
None
}overridedef estimateFn(src:(InKey, InMeta)): Iterable[(HereTile,Long)]={// The assumption made in this example is the average density of data// is function of what is present in 3 layers of 2 input catalogs// and somehow proportional to the sizes of the payloads of that layers.// The road catalog, that contains road junctions and geometries,// and an extra catalog, that contains additional tiled content.// The weight of the road junction is 5 times the one of the geometry,// while the weight of the extra content is twice the one of the geometry.// The storage level of the 3 input layers is assumed to be the same.
src match{case(InKey(Ids.RoadCatalog, Ids.RoadJunctions, tile: HereTile),
InMeta(_, _, _, Some(size)))=>
List(tile -> size *5)case(InKey(Ids.RoadCatalog, Ids.RoadGeometries, tile: HereTile),
InMeta(_, _, _, Some(size)))=>
List(tile -> size)case(InKey(Ids.ExtraCatalog, Ids.ExtraContent, tile: HereTile),
InMeta(_, _, _, Some(size)))=>
List(tile -> size *2)}}}
def configureCompiler(completeConfig: CompleteConfig,
context: DriverContext,
builder: DriverBuilder): builder.type={// We define our own estimate function and run it as preliminary step// to calculate an adaptive pattern and use it to obtain a partitioner.// The threshold is in the same units returned by the estimate function,// that are related to the size in bytes, so we ask the estimator// to define a pattern and have Spark partitions of a max size of 100 MB.val myEstimateFn =new MyEstimateFn
val estimator =new AdaptivePatternEstimator(myEstimateFn, context)// we use a NameHashPartitioner for partitions that are not aggregatedval fallbackPartitioner = NameHashPartitioner(context.spark.defaultParallelism)val partitioner = estimator.partitioner(100*1024*1024, Some(fallbackPartitioner))// The partitioner is passed to the compiler, that uses it to implement// the inPartitioner and outPartitioner methods, ignoring the static parallelismval myCompiler =new MyCompilerWithPartitioner(partitioner)// Add the compiler to the Driver as new DriverTask
builder.addTask(builder.newTaskBuilder("compiler").withMapGroupCompiler(myCompiler).build())}
パターンは、を設定するときに計算 Driver され、サンプルのコンパイラーに渡されます。
class MyCompilerWithPartitioner(partitioner: AdaptiveLevelingPartitioner)extends MapGroupCompiler[Intermediate]with CompileOut1To1Fn[Intermediate]
コンパイラーは、このメソッドを使用して、パーティャーを定義するメソッドを実装します。
def inPartitioner(parallelism:Int): Option[Partitioner[InKey]]={// parallelism is provided for convenience. In this case the partitioner// has been already initialized: in an adaptive partitioner, the number// of partitions is function of the weights and the threshold plus the// parallelism of the fallback partitioner (if specified).
Some(partitioner)}def outPartitioner(parallelism:Int): Option[Partitioner[OutKey]]={// The output partitioner doesn't have to be the same// as the input partitioner, but in this example it is.
Some(partitioner)}
def configureCompiler(completeConfig: CompleteConfig,
context: DriverContext,
builder: DriverBuilder): builder.type={// We define our own estimate function and run it as preliminary step// to calculate an adaptive pattern and use it to model the output layer.// The threshold is in the same units returned by the estimate function,// that are related to the size in bytes, so we ask the estimator// to define a pattern and have Spark partitions of a max size of 100 MB.val myEstimateFn =new MyEstimateFn
val estimator =new AdaptivePatternEstimator(myEstimateFn, context)val pattern = estimator.adaptivePattern(100*1024*1024)// The pattern is passed to the compiler that uses it to implement// its compileIn function. Other examples may use it differently.// Note the pattern is in a broadcast variable registered in fingerprints.val myCompiler =new MyCompilerWithPattern(pattern)// Add the compiler to the Driver as new DriverTask
builder.addTask(builder.newTaskBuilder("compiler").withMapGroupCompiler(myCompiler).build())}
def compileInFn(in:(InKey, InMeta)): Iterable[(OutKey, Intermediate)]={// In this example we assume input tiles from the road catalog// are to be mapped to variable-level output layers// using the pattern passed in the constructor.// Objects from the geometries layer must be decoded in roads// and provided to both rendering and routing output layers.// Objects from the junction layer must be decoded and// provided to only the output routing layer (just an example).
in match{case(InKey(Ids.RoadCatalog, Ids.RoadGeometries, tile: HereTile), meta)=>// The leveling target should be defined for every inputval outTile = pattern.value.levelingTargetFor(tile).get
val payload: Payload =???/* retrieve and decode the payload */val roads
: Set[Road]=???/* prepare the roads for the output routing and rendering layers */val intermediate = Intermediate(roads, Set.empty)
List(// outTile is used for both, but it is not a hard requirement(OutKey(Default.OutCatalogId, Ids.OutRouting, outTile), intermediate),(OutKey(Default.OutCatalogId, Ids.OutRendering, outTile), intermediate))case(InKey(Ids.RoadCatalog, Ids.RoadJunctions, tile: HereTile), meta)=>// The leveling target should be defined for every inputval outTile = pattern.value.levelingTargetFor(tile).get
val payload: Payload =???/* retrieve and decode the payload */val junctions: Set[Junction]=???/* prepare the junctions for the output routing layer */val intermediate = Intermediate(Set.empty, junctions)
List((OutKey(Default.OutCatalogId, Ids.OutRouting, outTile), intermediate))case _ =>
sys.error("Unexpected input key")}}
def compileOutFn(outKey: OutKey, intermediate: Iterable[Intermediate]): Option[Payload]=
outKey match{case OutKey(Default.OutCatalogId, Ids.OutRendering, _)=>val roads: Set[Road]= intermediate.flatMap(_.roads)(collection.breakOut)val payload: Payload =???/* process the roads and encode the result */
Some(payload)case OutKey(Default.OutCatalogId, Ids.OutRouting, _)=>val junctions: Set[Junction]= intermediate.flatMap(_.junctions)(collection.breakOut)val payload: Payload =???/* process the junctions and encode the result */
Some(payload)case _ =>
sys.error("Unexpected output key")}