Configure the Library

The Data Processing Library components are configured using the Typesafe Config library, that supports multiple configuration file formats. The recommended file format for manual editing is the HOCON format which defines a simplified JSON syntax with includes, references to environment variables, and so on.

You can find the configuration parameters in the reference.conf file in the batch-core resources.

The snippet below shows the contents of this file, which documents the configuration parameters available and their default values where applicable. Some parameters are commented out because they must always be defined at the application level.

// The here.platform.data-processing.driver section defines common parameters used
// across the processing library as well as in compilers.
// Changing this won't trigger a full compilation, thus the user code must not use these parameters to affect its behaviour.
here.platform.data-processing.driver {

  // The Application name in Spark
  appName = "DataProcessing"

  parallelRetrievers = 4    // how many partitions to retrieve in parallel per task (used by state mgmt)
  parallelUploads = 4       // how many partitions to upload in parallel per task (used by publishing and state mgmt)
  uniquePartitionLimitInBytes = 1024 // partitions which have size in bytes bigger than limit are assumed to be unique
  numCommitPartitions = 85  // how many multiparts in DS commit, must be <99 due to DS limit and the library use

  disableIncremental = false // if true, disables incremental compilation. This does not apply to:
                             // - Incremental publishing.
                             // - Skipped compilations in case no input layer has changed.

  disableCommitIntegrityCheck = false // if true, disables the final integrity check on the committed partitions.
                                      // This option serves only as a countermeasure in extreme cases, where the
                                      // amount of output metadata makes the small overhead introduced by the
                                      // check unsustainable. Should be false in all but the most extreme cases.

  // The storage level used for caching RDDs.
  //
  // See org.apache.spark.storage.StorageLevel and the Spark RDD-persistence guide
  // (http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence) for possible values.
  //
  // Some values, for quick reference only:
  //   DISK_ONLY - Store the RDD partitions only on disk.
  //   MEMORY_ONLY - Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory,
  //                 some partitions will not be cached and will be recomputed on the fly each time they're needed.
  //   MEMORY_ONLY_SER - Store RDD as serialized Java objects (one byte array per partition). This is generally more
  //                     space-efficient than deserialized objects, especially when using a fast serializer,
  //                     but more CPU-intensive to read.
  //   MEMORY_AND_DISK - Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory,
  //                     store the partitions that don't fit on disk, and read them from there when they're needed.
  //   MEMORY_AND_DISK_SER - Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead
  //                         of recomputing them on the fly each time they're needed.
  sparkStorageLevels {
    default = "MEMORY_AND_DISK_SER"           // used for all the RDDs of the Data Processing Library unless otherwise mentioned
    catalogQueries = "MEMORY_AND_DISK_SER"    // metadata queried from DS
    publishedPayloads = "MEMORY_AND_DISK_SER" // metadata of payloads just published
    persistedState = "MEMORY_AND_DISK_SER"    // metadata of state just published
  }

  state {
    // Configuration for the incremental compiler's internal state, stored to a layer in the output catalog
    partitions = 10         // how many partitions for the state, increase if state partition sizes > 10M
    layer = "state"         // the incremental state layer, must exist in the output catalog configuration
  }
}

// The here.platform.data-processing.executors section defines common parameters for compilation pipeline executors
// Except for "partitionKeyFilters", changing this section won't trigger a full compilation,
// thus the user code must not use these parameters to affect its behaviour.
here.platform.data-processing.executors {

  // These parameters are used by the RefTreeCompilerExecutor
  reftree {
    // number of threads to use within one executor to run the resolve function
    parallelResolves = 10
  }

  // These parameters are used by the DirectCompilerExecutors and the RefTreeCompilerExecutor
  compilein {
    // number of threads to use within one executor to run the compileIn function
    threads = 10
    // sorting will make sure partitions with similar names are compiled together,
    // increasing cache hit ratio in many setups
    sorting = false
  }

  // These parameters are used by the DirectCompilerExecutors and the RefTreeCompilerExecutor
  compileout {
    // number of threads to use within one executor to run the compileOut function
    threads = 10
    // sorting will make sure partitions with similar names are compiled together,
    // increasing cache hit ratio in many setups
    sorting = false
  }

  debug {
    // If set to true, logical errors (i.e. IllegalArgumentException) are not thrown immediately during a stage that
    // invokes user-defined functions, but collected to the container in the driver, and thrown at once at the end of the stage.
    collectStageErrors = false
  }

  // partitionKeyFilters can be used to limit the compilation to a subset of input partitions.
  // When multiple filters are defined, they are combined for each partition with an OR operation.
  // Filters can be further combined using the "AndFilter", "NotFilter" and "OrFilter".
  partitionKeyFilters = [
  //   {
  //     // This should either be the fully qualified name of a class implementing the
  //     // PartitionKeyFilter interface, or simply the name of a class within the built-in filters package:
  //     // com.here.platform.data.processing.driver.filter.
  //     className = "BoundingBoxFilter"
  //     param = { // The actual type is function of the constructor of the class mentioned above
  //       boundingBox { // This is valid for BoundingBoxFilter
  //         north = 24.8
  //         south = 24.68
  //         east = 121.8
  //         west = 121.7
  //       }
  //       // This tells this geographical filter to match non-geographical generic partitions and
  //       // avoid having to retain them using a second filter. Be aware that other filters OR-ed with
  //       // this filter won't have the desired effect when this is true (the default value).
  //       matchGenericPartitioning = true
  //     }
  //   }
  //   ,
  //   {
  //     // AllowListFilter can alternatively be used to filter by catalog+layer and/or individual partition names.
  //     // An empty set of catalogs or partitions is equivalent as not flitering based on that property.
  //     className = "AllowListFilter"
  //     param.catalogsAndLayers = { "tmob": ["CoreMap-Road"] }
  //     param.partitions = ["389114915", "389114916", "389114917"]
  //   }
  //   ,
  //   {
  //     // Operator can also be used to combine other filters.
  //     // e.g. This matches any partition except within the union of the two specified bounding boxes:
  //     className = "NotFilter"
  //     param.operand = {
  //       className = "OrFilter"
  //       param.operands = [
  //         {
  //           className = "BoundingBoxFilter"
  //           param.boundingBox { north = 90, south = -90, east = 121.7835, west = 121.7615 }
  //           param.matchGenericPartitioning = false
  //         }, {
  //           className = "BoundingBoxFilter"
  //           param.boundingBox { north = 24.7083, south = 24.6863, east = 180, west = -180 }
  //           param.matchGenericPartitioning = false
  //         }
  //       ]
  //     }
  //   }
  ]
}

// The optional here.platform.data-processing.executors.byId section defines executor specific
// parameters for a compilation pipeline executor
// * a sub-set of here.platform.data-processing.executors parameters can be nested under a user defined executor name
// * any missing values will be taken from the defaults under here.platform.data-processing.executors
// * only the first level nodes are optional, i.e., if any member of compilein is given then all of its
//   fields should be given
// example:

//here.platform.data-processing.executors.byId {
//  street-executor {
//    reftree.parallelResolves = 10
//  }
//}

// Spark configuration parameters can be passed to SparkConf using this section as well:
// these settings override Spark settings with the same name that may be already set before
// the compiler runs.
// Parameters added via -D parameters on the JVM command line or similar mechanisms are therefore
// overridden by these. To change them, use a custom application.conf with updated values.
here.platform.data-processing.spark {
  // serializer = "org.apache.spark.serializer.JavaSerializer" // mainly for testing, slower than Kryo
  serializer = "org.apache.spark.serializer.KryoSerializer"    // recommended default serializer
  kryo.registrationRequired = false                            // enable to see which classes are not registered
  kryo.registrator = "com.here.platform.data.processing.spark.KryoRegistrator"      // define custom class to register compiler classes
  kryo.referenceTracking = true                                // Track references of a tree to allow cyclical graphs.
                                                               //   However on large trees, the number of references may
                                                               //   exceed the Integer.MAX_VALUE that is allowed.
                                                               //   The issue and resolution is documented here:
                                                               //   https://github.com/EsotericSoftware/kryo/issues/497.
                                                               //
                                                               //   As a workaround, set this parameter to `false`.
  rdd.compress = true // enable compression for serialized RDD partitions. Can save substantial
                      // space at the cost of some extra CPU time.
  ui.showConsoleProgress = true // show the progress bar in the console. When "INFO" log level
                                // is enabled, Spark overrides this setting and always
                                // disables the progress bar.
}


here.platform.data-processing.deltasets {

  // Configures the default settings for DeltaSet operations.
  default {

    // Where to store intermediate results that must be persisted. See
    // https://spark.apache.org/docs/latest/rdd-programming-guide.html#which-storage-level-to-choose
    // Applies to: flatMapReduce, flatMapGroup, mapReduce, mapGroup, mapValues, mapValuesWithKey,
    //             resolve, detectChanges, join.
    intermediateStorageLevel = "MEMORY_AND_DISK_SER"

    // Defines how much extra processing should be done to detect invalid use of DeltaSets and
    // improve debugging.
    //
    // Possible values:
    //
    //   PERFORMANCE: Disables certain potentially expensive checks that help detecting incorrect
    //     uses of DeltaSets as early as possible. When this setting is used it can be harder to find
    //     root causes of a problem. In particular:
    //     - disjointUnion does not validate that the union does not create duplicate keys.
    //     - toDeltaSet does not validate that the given RDD does not contain duplicate keys.
    //
    //   SAFETY: Enable validation checks. Default.
    //
    //   DEBUG: Like "SAFETY", but also enable processing to improve debug output. In particular:
    //     - Logical errors such as IllegalArgumentException, are not thrown immediately during a
    //       stage that invokes user-defined functions. Instead, they are collected in a container in
    //       the driver, and thrown only once.
    //       Applies to: mapReduce/flatMapReduce (only mapFn parameter, not reduceFn),
    //                   mapGroup/flatMapGroup, mapValues/mapValuesWithKey and resolve.
    validationLevel = "SAFETY"

    // Number of threads to use for running user-provided functions, for example mapFn and resolveFn.
    // Increasing this number is useful when you have blocking I/O happening inside the function.
    // Applies to: mapReduce/flatMapReduce (only mapFn parameter, not reduceFn),
    //             mapGroup/flatMapGroup, mapValues/mapValuesWithKey and resolve.
    threads = 1

    // Processes the keys within a partition in sorted order. Sorting ensures that keys with
    // similar names are processed together. This can improve the performance if the map function
    // caches resources, such as downloaded payloads, and the same cache entries are likely to be
    // requested while processing keys that are nearby in the sorting order.
    // Applies to: mapReduce/flatMapReduce (only mapFn parameter, not reduceFn),
    //             mapGroup/flatMapGroup and mapValues/mapValuesWithKey.
    sorting = false

    // Can be used to disable incremental computation of a DeltaSet. This will cause upstream
    // and downstream DeltaSets to also be computed non-incrementally.
    // Applies to: all transformations except publish
    incremental = true

  }

  partitionKeyFilters = [
    // Like executors, DeltaSets can use partition key filters. For the syntax of definining
    // partition key filters, see above. Partition key filters are used in all `query` and
    // `readBack` DeltaSet transformations to filter the input partitions.
  ]

}

// Configure Akka (and DS Clients) to log via the logging framework used in the rest of the project
akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
}

// Any compiler specific configuration should go to this section.
// Changing any configuration here will trigger a full compilation,
// this means that parameters within this can rightfully be used
// to alter the output of the compiler.
here.platform.data-processing.compiler {

}

For compiler-specific defaults, you should define these in the application.conf file, as part of the compiler's resources. You can override the processing library's driver parameters with those that you specify in the command line.

The Typesafe Config library enables you to override the compiler's configuration in various ways, such as:

  • Use -Dconfig.file or -Dconfig.url parameters to replace the whole application.conf file in the classpath with another file.
  • Use -Dhere.platform.data-processing.xxx=value to modify individual parameters via Java's system properties.
  • Use -Dconfig.trace=loads to debug configuration overrides. This option prints the files parsed and outputs the final configuration to stdout.

To define your own configuration parameters, add them to the here.platform.data-processing.compiler section in the application.conf. The values for these parameters can be loaded into an instance of a compiler specific case class T via the compilerConfig[T] method in the CompleteConfig instance passed to the setupDriver. Consider caching the results if you are going to call the method repeatedly.

By default the configuration is loaded when a DriverContext is built, and can be accessed from the context. To load the configuration in your application without a DriverContext, or to customize the configuration when the DriverContext is built, use the CompleteConfig factory methods. Additional configuration parameters can be provided as HOCON key=value strings:

Scala
Java
val defaultConfig = CompleteConfig()

val defaultConfigWithOverrides = CompleteConfig(
  Seq("here.platform.data-processing.executors.compilein.threads=20",
      "here.platform.data-processing.executors.compileout.threads=10"))
CompleteConfig defaultConfiguration = CompleteConfig.load();

CompleteConfig defaultConfigurationWithOverrides =
    CompleteConfig.load(
        Arrays.asList(
            "here.platform.data-processing.executors.compilein.threads=20",
            "here.platform.data-processing.executors.compileout.threads=10"));

results matching ""

    No results matching ""