ライブラリを設定します

データ プロセッシング ライブラリコンポーネントは 、複数 の設定ファイル形式をサポートする typesafe 設定ライブラリを使用して設定されます。 手動編集に推奨されるファイル形式は、インクルード、環境変数への参照などを含む単純化された JSON 構文を定義する HOCON 形式です。

設定パラメータは、batch-coreリソースのreference.confファイルにあります。

以下のスニペットには、このファイルの内容が表示されます。このファイルには、使用可能な設定パラメータと、該当する場合はそのデフォルト値が記載されています。 一部のパラメーターは、常にアプリケーションレベルで定義する必要があるため、コメント化されます。

// The here.platform.data-processing.driver section defines common parameters used
// across the processing library as well as in compilers.
// Changing this won't trigger a full compilation, thus the user code must not use these parameters to affect its behaviour.
here.platform.data-processing.driver {

  // The Application name in Spark
  appName = "DataProcessing"

  parallelRetrievers = 4    // how many partitions to retrieve in parallel per task (used by state mgmt)
  parallelUploads = 4       // how many partitions to upload in parallel per task (used by publishing and state mgmt)
  uniquePartitionLimitInBytes = 1024 // partitions which have size in bytes bigger than limit are assumed to be unique
  numCommitPartitions = 85  // how many multiparts in DS commit, must be <99 due to DS limit and the library use

  disableIncremental = false // if true, disables incremental compilation. This does not apply to:
                             // - Incremental publishing.
                             // - Skipped compilations in case no input layer has changed.

  disableCommitIntegrityCheck = false // if true, disables the final integrity check on the committed partitions.
                                      // This option serves only as a countermeasure in extreme cases, where the
                                      // amount of output metadata makes the small overhead introduced by the
                                      // check unsustainable. Should be false in all but the most extreme cases.

  // The storage level used for caching RDDs.
  //
  // See org.apache.spark.storage.StorageLevel and the Spark RDD-persistence guide
  // (http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence) for possible values.
  //
  // Some values, for quick reference only:
  //   DISK_ONLY - Store the RDD partitions only on disk.
  //   MEMORY_ONLY - Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory,
  //                 some partitions will not be cached and will be recomputed on the fly each time they're needed.
  //   MEMORY_ONLY_SER - Store RDD as serialized Java objects (one byte array per partition). This is generally more
  //                     space-efficient than deserialized objects, especially when using a fast serializer,
  //                     but more CPU-intensive to read.
  //   MEMORY_AND_DISK - Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory,
  //                     store the partitions that don't fit on disk, and read them from there when they're needed.
  //   MEMORY_AND_DISK_SER - Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead
  //                         of recomputing them on the fly each time they're needed.
  sparkStorageLevels {
    default = "MEMORY_AND_DISK_SER"           // used for all the RDDs of the Data Processing Library unless otherwise mentioned
    catalogQueries = "MEMORY_AND_DISK_SER"    // metadata queried from DS
    publishedPayloads = "MEMORY_AND_DISK_SER" // metadata of payloads just published
    persistedState = "MEMORY_AND_DISK_SER"    // metadata of state just published
  }

  state {
    // Configuration for the incremental compiler's internal state, stored to a layer in the output catalog
    partitions = 10         // how many partitions for the state, increase if state partition sizes > 10M
    layer = "state"         // the incremental state layer, must exist in the output catalog configuration
  }
}

// The here.platform.data-processing.executors section defines common parameters for compilation pipeline executors
// Except for "partitionKeyFilters", changing this section won't trigger a full compilation,
// thus the user code must not use these parameters to affect its behaviour.
here.platform.data-processing.executors {

  // These parameters are used by the RefTreeCompilerExecutor
  reftree {
    // number of threads to use within one executor to run the resolve function
    parallelResolves = 10
  }

  // These parameters are used by the DirectCompilerExecutors and the RefTreeCompilerExecutor
  compilein {
    // number of threads to use within one executor to run the compileIn function
    threads = 10
    // sorting will make sure partitions with similar names are compiled together,
    // increasing cache hit ratio in many setups
    sorting = false
  }

  // These parameters are used by the DirectCompilerExecutors and the RefTreeCompilerExecutor
  compileout {
    // number of threads to use within one executor to run the compileOut function
    threads = 10
    // sorting will make sure partitions with similar names are compiled together,
    // increasing cache hit ratio in many setups
    sorting = false
  }

  debug {
    // If set to true, logical errors (i.e. IllegalArgumentException) are not thrown immediately during a stage that
    // invokes user-defined functions, but collected to the container in the driver, and thrown at once at the end of the stage.
    collectStageErrors = false
  }

  // partitionKeyFilters can be used to limit the compilation to a subset of input partitions.
  // When multiple filters are defined, they are combined for each partition with an OR operation.
  // Filters can be further combined using the "AndFilter", "NotFilter" and "OrFilter".
  partitionKeyFilters = [
  //   {
  //     // This should either be the fully qualified name of a class implementing the
  //     // PartitionKeyFilter interface, or simply the name of a class within the built-in filters package:
  //     // com.here.platform.data.processing.driver.filter.
  //     className = "BoundingBoxFilter"
  //     param = { // The actual type is function of the constructor of the class mentioned above
  //       boundingBox { // This is valid for BoundingBoxFilter
  //         north = 24.8
  //         south = 24.68
  //         east = 121.8
  //         west = 121.7
  //       }
  //       // This tells this geographical filter to match non-geographical generic partitions and
  //       // avoid having to retain them using a second filter. Be aware that other filters OR-ed with
  //       // this filter won't have the desired effect when this is true (the default value).
  //       matchGenericPartitioning = true
  //     }
  //   }
  //   ,
  //   {
  //     // AllowListFilter can alternatively be used to filter by catalog+layer and/or individual partition names.
  //     // An empty set of catalogs or partitions is equivalent as not flitering based on that property.
  //     className = "AllowListFilter"
  //     param.catalogsAndLayers = { "tmob": ["CoreMap-Road"] }
  //     param.partitions = ["389114915", "389114916", "389114917"]
  //   }
  //   ,
  //   {
  //     // Operator can also be used to combine other filters.
  //     // e.g. This matches any partition except within the union of the two specified bounding boxes:
  //     className = "NotFilter"
  //     param.operand = {
  //       className = "OrFilter"
  //       param.operands = [
  //         {
  //           className = "BoundingBoxFilter"
  //           param.boundingBox { north = 90, south = -90, east = 121.7835, west = 121.7615 }
  //           param.matchGenericPartitioning = false
  //         }, {
  //           className = "BoundingBoxFilter"
  //           param.boundingBox { north = 24.7083, south = 24.6863, east = 180, west = -180 }
  //           param.matchGenericPartitioning = false
  //         }
  //       ]
  //     }
  //   }
  ]
}

// The optional here.platform.data-processing.executors.byId section defines executor specific
// parameters for a compilation pipeline executor
// * a sub-set of here.platform.data-processing.executors parameters can be nested under a user defined executor name
// * any missing values will be taken from the defaults under here.platform.data-processing.executors
// * only the first level nodes are optional, i.e., if any member of compilein is given then all of its
//   fields should be given
// example:

//here.platform.data-processing.executors.byId {
//  street-executor {
//    reftree.parallelResolves = 10
//  }
//}

// Spark configuration parameters can be passed to SparkConf using this section as well:
// these settings override Spark settings with the same name that may be already set before
// the compiler runs.
// Parameters added via -D parameters on the JVM command line or similar mechanisms are therefore
// overridden by these. To change them, use a custom application.conf with updated values.
here.platform.data-processing.spark {
  // serializer = "org.apache.spark.serializer.JavaSerializer" // mainly for testing, slower than Kryo
  serializer = "org.apache.spark.serializer.KryoSerializer"    // recommended default serializer
  kryo.registrationRequired = false                            // enable to see which classes are not registered
  kryo.registrator = "com.here.platform.data.processing.spark.KryoRegistrator"      // define custom class to register compiler classes
  kryo.referenceTracking = true                                // Track references of a tree to allow cyclical graphs.
                                                               //   However on large trees, the number of references may
                                                               //   exceed the Integer.MAX_VALUE that is allowed.
                                                               //   The issue and resolution is documented here:
                                                               //   https://github.com/EsotericSoftware/kryo/issues/497.
                                                               //
                                                               //   As a workaround, set this parameter to `false`.
  rdd.compress = true // enable compression for serialized RDD partitions. Can save substantial
                      // space at the cost of some extra CPU time.
  ui.showConsoleProgress = true // show the progress bar in the console. When "INFO" log level
                                // is enabled, Spark overrides this setting and always
                                // disables the progress bar.
}


here.platform.data-processing.deltasets {

  // Configures the default settings for DeltaSet operations.
  default {

    // Where to store intermediate results that must be persisted. See
    // https://spark.apache.org/docs/latest/rdd-programming-guide.html#which-storage-level-to-choose
    // Applies to: flatMapReduce, flatMapGroup, mapReduce, mapGroup, mapValues, mapValuesWithKey,
    //             resolve, detectChanges, join.
    intermediateStorageLevel = "MEMORY_AND_DISK_SER"

    // Defines how much extra processing should be done to detect invalid use of DeltaSets and
    // improve debugging.
    //
    // Possible values:
    //
    //   PERFORMANCE: Disables certain potentially expensive checks that help detecting incorrect
    //     uses of DeltaSets as early as possible. When this setting is used it can be harder to find
    //     root causes of a problem. In particular:
    //     - disjointUnion does not validate that the union does not create duplicate keys.
    //     - toDeltaSet does not validate that the given RDD does not contain duplicate keys.
    //
    //   SAFETY: Enable validation checks. Default.
    //
    //   DEBUG: Like "SAFETY", but also enable processing to improve debug output. In particular:
    //     - Logical errors such as IllegalArgumentException, are not thrown immediately during a
    //       stage that invokes user-defined functions. Instead, they are collected in a container in
    //       the driver, and thrown only once.
    //       Applies to: mapReduce/flatMapReduce (only mapFn parameter, not reduceFn),
    //                   mapGroup/flatMapGroup, mapValues/mapValuesWithKey and resolve.
    validationLevel = "SAFETY"

    // Number of threads to use for running user-provided functions, for example mapFn and resolveFn.
    // Increasing this number is useful when you have blocking I/O happening inside the function.
    // Applies to: mapReduce/flatMapReduce (only mapFn parameter, not reduceFn),
    //             mapGroup/flatMapGroup, mapValues/mapValuesWithKey and resolve.
    threads = 1

    // Processes the keys within a partition in sorted order. Sorting ensures that keys with
    // similar names are processed together. This can improve the performance if the map function
    // caches resources, such as downloaded payloads, and the same cache entries are likely to be
    // requested while processing keys that are nearby in the sorting order.
    // Applies to: mapReduce/flatMapReduce (only mapFn parameter, not reduceFn),
    //             mapGroup/flatMapGroup and mapValues/mapValuesWithKey.
    sorting = false

    // Can be used to disable incremental computation of a DeltaSet. This will cause upstream
    // and downstream DeltaSets to also be computed non-incrementally.
    // Applies to: all transformations except publish
    incremental = true

  }

  partitionKeyFilters = [
    // Like executors, DeltaSets can use partition key filters. For the syntax of definining
    // partition key filters, see above. Partition key filters are used in all `query` and
    // `readBack` DeltaSet transformations to filter the input partitions.
  ]

}

// Configure Akka (and DS Clients) to log via the logging framework used in the rest of the project
akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
}

// Any compiler specific configuration should go to this section.
// Changing any configuration here will trigger a full compilation,
// this means that parameters within this can rightfully be used
// to alter the output of the compiler.
here.platform.data-processing.compiler {

}

コンパイラー固有のデフォルト値について application.conf は、コンパイラーのリソースの一部として、これらの値をファイルで定義する必要があります。 処理ライブラリのドライバーパラメーターは、コマンド ラインで指定したもので上書きできます。

typesafe 設定ライブラリを使用すると、次のようなさまざまな方法でコンパイラーの設定を上書きできます。

  • -Dconfig.fileまたは-Dconfig.urlパラメータを使用して、クラスパス内のapplication.confファイル全体を別のファイルに置き換えます。
  • -Dhere.platform.data-processing.xxx=value Java のシステムプロパティーを使用して個々のパラメータを変更する場合に使用します。
  • -Dconfig.trace=loads 設定の上書きをデバッグするために使用します。 このオプション stdoutを使用すると、解析されたファイルが出力され、最終的な設定がに出力されます

独自の設定パラメータを定義するには、application.confhere.platform.data-processing.compilerセクションに追加します。 これらのパラメーターの値は、setupDriverに渡されたCompleteConfigインスタンスのcompilerConfig[T]メソッドを介して、コンパイラー固有の case クラスTのインスタンスにロードできます。 メソッドを繰り返し呼び出す場合は、結果をキャッシュすることを検討してください。

デフォルト DriverContext では、コンフィギュレーションはが構築されるとロードされ、コンテキストからアクセスできます。 DriverContextを使用せずにアプリケーションに設定をロードする場合、またはDriverContextがビルドされたときに設定をカスタマイズする場合は、CompleteConfigファクトリメソッドを使用します。 追加の設定パラメータは、 HOCON key= 値 の文字列として指定できます。

Scala
Java
val defaultConfig = CompleteConfig()

val defaultConfigWithOverrides = CompleteConfig(
  Seq("here.platform.data-processing.executors.compilein.threads=20",
      "here.platform.data-processing.executors.compileout.threads=10"))
CompleteConfig defaultConfiguration = CompleteConfig.load();

CompleteConfig defaultConfigurationWithOverrides =
    CompleteConfig.load(
        Arrays.asList(
            "here.platform.data-processing.executors.compilein.threads=20",
            "here.platform.data-processing.executors.compileout.threads=10"));

」に一致する結果は 件です

    」に一致する結果はありません