Configure Your Application

The Data Archiving Library is configured via Typesafe Config. Usually this means that you provide an application.conf file which contains all the application-specific settings that differ from the default ones provided by the reference.conf configuration files from the Data Archiving Library.

The Data Archiving Library relies on the Data Client Library to access the HERE platform, so for your convenience you can specify frequently-configured Data Client Library settings like Set Up a Proxy, Selecting a Connector Type and Selecting a Discovery Service Env in the Data Archiving Library's application.conf. Additionally, you can add configuration settings as needed. For more information, see Data Client Configuration.

Note

Some of the Data Client Library default settings are overridden by the Data Archiving Library. For your reference, these settings are shown below in the tab Data Archiving Library custom default properties for Data Client Library. We recommend that you not change these settings in application.conf because doing so may reduce the Data Archiving Library fault tolerance.

Note

The property aggregation.window-seconds is different from the index attribute of type timewindow. The property aggregation.window-seconds determines how frequently data will be aggregated and processed by the data processing pipeline. The index attribute of type timewindow contains the finest time granularity at which the data will be indexed and later queried, as specified in the attribute's duration field.

Note

When using http-connector, the number of subscriptions cannot exceed the parallelism allowed by stream layer. The property source.consumer-id helps identify the number of subscriptions for the consumer-group. Therefore, for http-connector, we recommend to set the property source.consumer-id.

It must be unique within the consumer group. If you do not provide one, the system will generate one. When consumer needs to recover (in case of failure), if same consumer-group and consumer-id values are used, then old subscription will be re-used. For more information, see Get Data from a Stream Layer.

User defined properties
Data Archiving Library default properties
Data Archiving Library custom default properties for Data Client Library
############################################################
## Data Archiving Library Application Config File Example ##
############################################################

# FOR USING DATA ARCHIVING LIBRARY, IT IS MANDATORY FOR USERS TO PACKAGE application.conf FILE IN APPLICATION JAR.

#############################################
## Required properties in application.conf ##
#############################################

# These settings are for Data Archiving Library's Stream Execution Environment.
env {

  # Fully Qualified Class Name of any User Defined Function interface implementation provided by the user.
  # The class must be public and have a public constructor.
  udf = <UPDATE_ME>    # Eg:- udf = com.here.platform.data.archive.example.AvroSimpleKeyExample
}

# These settings are for Data Archiving Library's source (Stream Layer).
source {

  # Here Resource Name for Catalog which contains Stream Layer whose data is to be archived.
  # This property can be passed through the pipeline-config.conf file.
  # If this property is passed in both the application.conf and pipeline-config.conf files,
  # then the value will be taken from the pipeline-config.conf file because it has a higher priority.
  hrn = "<UPDATE_ME>"    # Eg:- hrn = "hrn:here:data::olp-here:sensor-data-stream" or hrn = "hrn:here-cn:data::olp-cn-here:sensor-data-stream" (for China)

  # Stream Layer ID whose data is to be archived.
  layer = "<UPDATE_ME>"    # Eg:- layer = "stream-layer"

  # Any string that uniquely identifies the data archiving pipeline
  consumer-group = "<UPDATE_ME>"    # Eg:- consumer-group = "my-sensor-data-stream-avro-group"
}

# These settings are for Data Archiving Library's sink (Index Layer).
sink {

  # Here Resource Name for Catalog which contains Index Layer where data is to be archived.
  # This property can be passed through the pipeline-config.conf file.
  # If this property is passed in both the application.conf and pipeline-config.conf files,
  # then the value will be taken from the pipeline-config.conf file because it has a higher priority.
  hrn = "<UPDATE_ME>"    # Eg:- hrn = "hrn:here:data::olp-here:sensor-data-archive" or hrn = "hrn:here-cn:data::olp-cn-here:sensor-data-archive" (for China)

  # Index Layer ID where data is to be archived.
  layer = "<UPDATE_ME>"    # Eg:- layer = "index-layer"
}

# These settings are for the Data Client Library used in the Data Archiving Library.
here.platform.data-client {

  # Discovery of baseUrls of various Data APIs like publish, metadata, query, etc.
  endpoint-locator {
    # Determines which environment to use for the discovery service's endpoints.
    # Possible values are: 'here', 'here-dev', 'here-cn', 'here-cn-dev', 'custom'.
    # If 'custom' is specified then the 'discovery-service-url' property MUST be set.
    discovery-service-env = <UPDATE_ME>    # Eg:- discovery-service-env = here or discovery-service-env = here-cn (for China)
    # Defines a URL for a custom discovery service endpoint.
    # discovery-service-url = "<custom discovery service URL>"
  }
}

##########################################################################
## Optional properties recommended to be overridden in application.conf ##
##########################################################################

# These settings are for Data Archiving Library's source (Stream Layer).
source {

  # This property is valid only when using http-connector for stream layer. The number of subscriptions cannot exceed the parallelism allowed by stream layer.
  # When consumer needs to recover (in case of failure), if same consumer-group and consumer-id values are used, then old subscription will be re-used.
  consumer-id = "<UPDATE_ME>"    # Eg:- consumer-id = "my-unique-consumer-id"
}

# These settings are for Data Archiving Library's aggregation logic.
aggregation {

  # The Data Archiving Library splits the stream into "buckets" of time interval.
  # For all elements in each bucket, the stream is also split into logical keyed streams based on indexing attributes of each element.
  # This property decides how long the user wants the Data Archiving Pipeline to aggregate the data in memory before archiving to Index Layer.
  # Note that if the value is very small, it will impact the performance of archiving as smaller files will be archived frequently.
  # However, if the value is very big, it requires higher storage requirement (more workers/worker units) to hold the data in disk and memory.
  # For state size consideration, note that Flink creates one copy of each element per window to which it belongs.
  # Default value is set to 15 minutes. The recommended value range is from 10 minutes to 60 minutes. Allowed value range is from 1 second to 24 hours.
  # The value should also be less than or equal to stream layer's time-to-live (ttl) retention period value. The recommendation for the value is to be much smaller than the stream layer ttl.
  # If these two values are too close, there is risk of data expiring from stream layer before it is processed by the pipeline.
  window-seconds = <UPDATE_ME>    # Eg:- window-seconds = 1200

  # Having a keyed stream allows the windowed computation to be performed in parallel by multiple tasks, as each logical keyed stream can be processed independently from the rest.
  # In each aggregation window, all elements referring to the same key will be sent to the same parallel task.
  # Assuming there is no hotspot problem (most if not all elements have the same key), having higher parallelism will improve the data archiving pipeline performance.
  # Required parallelism can be determined by different parameters like indexing attributes cardinality, uniqueness, etc. Please refer Best Practices section in the developer guide for details.
  # It is highly recommended to override this property. Note that the value of this property should be less than or equal to number of workers selected when creating a pipeline.
  parallelism = <UPDATE_ME>    # Eg:- parallelism = 10
}

# These settings allows you to specify what your pipeline will do when an error occurs in your User Defined Function (UDF) implementation.
# The Data Archiving Library will invoke the error handling strategy when:
# - A UDF implementation throws a non-parsable message
# - A UDF implementation returns an unchecked/runtime exception
# - A UDF implementation returns null
# - An indexing attribute value does not pass the Data Archiving Library's validation rules.
error {
  # You can choose one of the following error handling strategies:
  # - "fail" - The pipeline fails on any error from a User-Defined Function's implementation. The pipeline logs the action.
  # - "ignore" - The pipeline ignores the messages that encounter an error in a User-Defined Function's implementation. The pipeline logs the action and continues processing the next message. This is the default strategy.
  # - "deadletter" - The pipeline archives messages that encounter an error in a User-Defined Function's implementation. The messages are archived in a dead letter index layer, and processing continues. See the following section for more information about using this strategy.
  strategy = <UPDATE_ME>    # Eg:- strategy = "fail"

  # For "deadletter" strategy,
  # Create an index layer for archiving messages that encounter an error.
  # This special index layer must have following settings:
  # - Content Type must be "application/x-avro-binary".
  # - Content Encoding must be "uncompressed".
  # - There must be four indexing attributes with following names and settings:
  #   - A `timewindow` type attribute with the name "indexUploadTime". You can select the desired `duration`. This attribute stores the timestamp of the index upload, truncated by the duration value.
  #   - A `string` type attribute with the name "realm". This attribute stores the realm of the index layer where messages that were successfully processed are indexed.
  #   - A `string` type attribute with the name "catalogId". This attribute stores the `catalogId` of the index layer where messages that were successfully processed are indexed.
  #   - A `string` type attribute with name "layerId". This attribute stores the `layerId` of the index layer where messages that were successfully processed are indexed.
  # - You can select any TTL setting.
  # Add following two commented parameters in your application.conf (uncomment and update with correct values).
  # deadletter.hrn = "<CATALOG_HRN_WHERE_YOU_WANT_TO_STORE_MESSAGES_IN_CASE_OF_ERRORS>"    # Eg:- deadletter.hrn = "hrn:here:data::olp-here:sensor-data-deadletter-avro" or deadletter.hrn = "hrn:here-cn:data::olp-cn-here:sensor-data-deadletter-avro" (for China)
  # deadletter.layer = "<INDEX_LAYER_NAME_WHERE_YOU_WANT_TO_STORE_MESSAGES_IN_CASE_OF_ERRORS>"    # Eg:- deadletter.layer = "index-layer"
}

# These settings are for the Data Client Library used in the Data Archiving Library.
here.platform.data-client {

  # Stream layers are implemented as Kafka clusters.
  # To read from a stream layer, your application must use one of the following connector types:
  # "kafka-connector" - Direct Kafka is the preferred connector type since it directly communicates with the underlying Kafka cluster. It is the default connector.
  # "http-connector" - HTTP Connector is an HTTP wrapper on top of Kafka, and therefore implies a communication overhead. If your application needs to access data in the Marketplace or is running behind a proxy, use the HTTP connector.
  stream.connector.consumer = "<UPDATE_ME>"

  # Define the proxy configuration. The credentials key is optional.
  #
  # proxy {
  #   host = "localhost"
  #   port = 9999
  #
  #   credentials {
  #     username: "user"
  #     password: "pass"
  #   }
  # }
}
##################################################
## Data Archiving Library Reference Config File ##
##################################################

# This is the reference config file that contains default settings for Data Archiving Library.
# Any application-specific settings that differ from the default ones provided here should be set in your application.conf.

# These settings are for Data Archiving Library's Stream Execution Environment.
env {

  # This property helps Flink to define internal data structures of certain state backends.
  # These internal data structures help the state scale with the number of key-groups if the operator parallelism
  # is changed for stateful stream processing. This is Flink's internal implementation mechanism.
  # It is set to 200 because this is the limitation on number of nodes in the Pipeline API.
  max-parallelism = 200

  # By enabling this Flink's feature, periodic checkpoints will be persisted externally even after job failure.
  # This way, you will have a checkpoint around to resume from if your job fails. This feature is useful for recovering from cluster failure.
  # Note that this feature is currently not supported by the Pipeline API.
  externalized.checkpoint.enabled = false

  # Archiving process requires stateful stream processing. In order to make the state fault tolerant, Flink needs to checkpoint the state.
  # Checkpoint allows Flink to recover state and positions in the streams. It also allows Data Archiving Library to be fault-tolerant.
  checkpoint.enabled = true

  # Start a checkpoint every 5 minutes.
  checkpoint.interval-seconds = 300

  # Checkpoints have to complete within 1 minute, or are discarded.
  checkpoint.timeout-seconds = 60

  # Make sure 5 minutes of progress happen between checkpoints.
  checkpoint.minimum-pause-seconds = 300

  # This defines how many consecutive checkpoint failures will be tolerated, before the whole job is failed over.
  checkpoint.tolerable.failure.number = 3

  # Unaligned checkpoints contain in-flight data (i.e., data stored in buffers) as part of the checkpoint state, which allows checkpoint barriers to overtake these buffers.
  # Thus, the checkpoint duration becomes independent of the current throughput as checkpoint barriers are effectively not embedded into the stream of data anymore.
  # You should use unaligned checkpoints if your checkpointing durations are very high due to backpressure. Then, checkpointing time becomes mostly independent of the end-to-end latency.
  # Be aware unaligned checkpointing adds to I/O to the state backends, so you shouldn’t use it when the I/O to the state backend is actually the bottleneck during checkpointing.
  checkpoint.unaligned.enabled = true

  # Data Archiving Library uses Flink's Failure Rate Restart Strategy.
  # The failure rate restart strategy restarts the job after failure, but when failure rate (failures per time interval) is exceeded, the job eventually fails.
  # In-between two consecutive restart attempts, the restart strategy waits a fixed amount of time, see restart.delay-seconds.
  restart.enabled = true

  # Maximum number of restarts in given time interval before failing a job.
  restart.failure-rate = 5

  # Time interval for measuring failure rate.
  restart.failure-interval-seconds = 2700

  # Delay between two consecutive restart attempts.
  restart.delay-seconds = 60
}

# These settings are for Data Archiving Library's source (Stream Layer).
source {

  # This property represents number of consumers (tasks/workers) reading from the Stream Layer.
  # It is recommended to override this property only if there is any network limitation on your cluster.
  # Note that the value of this property should be less than or equal to number of workers selected when creating a pipeline.
  parallelism = 1

  # If there is no initial offset in source stream or the current offset does not exist anymore on the server,
  # then this property will automatically reset the offset to the latest offset (because default is set to latest).
  # Valid values are [latest, earliest]. It is highly recommended not to override this value.
  # For eg:- If the value is set to earliest, then based on stream's retention period, sufficient resources should be configured to handle the load.
  auto.offset.reset = "latest"
}

# These settings are for Data Archiving Library's parser logic.
parser {

  # This property represents message parsing parallelism.
  # Message parsing logic invokes an api, such as `getKeys`, to retrieve indexing attributes.
  # It is recommended to override this property if your logic to retrieve indexing attributes is compute intensive.
  # Note that the value of this property should be less than or equal to number of workers selected when creating a pipeline.
  parallelism = 1
}

# These settings are for Data Archiving Library's aggregation logic.
aggregation {

  # The Data Archiving Library splits the stream into "buckets" of time interval.
  # For all elements in each bucket, the stream is also split into logical keyed streams based on indexing attributes of each element.
  # This property decides how long the user wants the Data Archiving Pipeline to aggregate the data in memory before archiving to Index Layer.
  # Note that if the value is very small, it will impact the performance of archiving as smaller files will be archived frequently.
  # However, if the value is very big, it requires higher storage requirement (more workers/worker units) to hold the data in disk and memory.
  # For state size consideration, note that Flink creates one copy of each element per window to which it belongs.
  # Default value is set to 15 minutes. The recommended value range is from 10 minutes to 60 minutes. Allowed value range is from 1 second to 24 hours.
  # The value should also be less than or equal to stream layer's time-to-live (ttl) retention period value. The recommendation for the value is to be much smaller than the stream layer ttl.
  # If these two values are too close, there is risk of data expiring from stream layer before it is processed by the pipeline.
  window-seconds = 900

  # Having a keyed stream allows the windowed computation to be performed in parallel by multiple tasks, as each logical keyed stream can be processed independently from the rest.
  # In each aggregation window, all elements referring to the same key will be sent to the same parallel task.
  # Assuming there is no hotspot problem (most if not all elements have the same key), having higher parallelism will improve the data archiving pipeline performance.
  # Required parallelism can be determined by different parameters like indexing attributes cardinality, uniqueness, etc. Please refer Best Practices section in the developer guide for details.
  # It is highly recommended to override this property. Note that the value of this property should be less than or equal to number of workers selected when creating a pipeline.
  parallelism = 1
}

# These settings are for Data Archiving Library's sink (Index Layer).
sink {

  # The Data Archiving Library Sink uploads metadata (indexes) of archived files (each archived file has aggregated messages based on their indexing attributes) to Index Layer.
  # The Index Layer is updated when any of the following two conditions is true:
  # 1. Maximum number of indexes is received by sink operator (index.limit property).
  # 2. Timeout (index.timeout-seconds property).
  # The recommended maximum value for index.limit is 500 which is also the default value. User can override this value if the source stream data load is low.
  # User can also override index.timeout-seconds based on data load in the source stream. Maximum allowed value is 3600.
  index.limit = 500
  index.timeout-seconds = 150

  # This property represents number of tasks/workers uploading metadata to the Index Layer.
  # It is recommended to override this property only if there is any network limitation on your cluster.
  # Note that the value of this property should be less than or equal to number of workers selected when creating a pipeline.
  parallelism = 1
}

# These settings allows you to specify what your pipeline will do when an error occurs in your User Defined Function (UDF) implementation.
# The Data Archiving Library will invoke the error handling strategy when:
# - A UDF implementation throws a non-parsable message
# - A UDF implementation returns an unchecked/runtime exception
# - A UDF implementation returns null
# - An indexing attribute value does not pass the Data Archiving Library's validation rules.
error {
  # You can choose one of the following error handling strategies:
  # - "fail" - The pipeline fails on any error from a User-Defined Function's implementation. The pipeline logs the action.
  # - "ignore" - The pipeline ignores the messages that encounter an error in a User-Defined Function's implementation. The pipeline logs the action and continues processing the next message. This is the default strategy.
  # - "deadletter" - The pipeline archives messages that encounter an error in a User-Defined Function's implementation. The messages are archived in a dead letter index layer, and processing continues. See the following section for more information about using this strategy.
  strategy = "ignore"

  # For "deadletter" strategy,
  # Create an index layer for archiving messages that encounter an error.
  # This special index layer must have following settings:
  # - Content Type must be "application/x-avro-binary".
  # - Content Encoding must be "uncompressed".
  # - There must be four indexing attributes with following names and settings:
  #   - A `timewindow` type attribute with the name "indexUploadTime". You can select the desired `duration`. This attribute stores the timestamp of the index upload, truncated by the duration value.
  #   - A `string` type attribute with the name "realm". This attribute stores the realm of the index layer where messages that were successfully processed are indexed.
  #   - A `string` type attribute with the name "catalogId". This attribute stores the `catalogId` of the index layer where messages that were successfully processed are indexed.
  #   - A `string` type attribute with name "layerId". This attribute stores the `layerId` of the index layer where messages that were successfully processed are indexed.
  # - You can select any TTL setting.
  # Add following two commented parameters in your application.conf (uncomment and update with correct values).
  # deadletter.hrn = "<CATALOG_HRN_WHERE_YOU_WANT_TO_STORE_MESSAGES_IN_CASE_OF_ERRORS>"        # Eg:- "hrn:here:data::olp-here:sensor-data-deadletter-avro" or "hrn:here-cn:data::olp-cn-here:sensor-data-deadletter-avro" (for China)
  # deadletter.layer = "<INDEX_LAYER_NAME_WHERE_YOU_WANT_TO_STORE_MESSAGES_IN_CASE_OF_ERRORS>" # Eg:- "index-layer"
}
###############################################################################################
## Data Archiving Library Custom Config File Overriding Data Client Library Default Settings ##
###############################################################################################

# This is the custom reference config file that contains custom default settings of Data Client Library for Data Archiving Library.
# Any application-specific settings that differ from the custom default ones provided here should be set in your application.conf.

here.platform.data-client {
  retry-policy {
    type = "best-effort"
  }
}

results matching ""

    No results matching ""