アプリケーションを設定します

データアーカイブライブラリは、typesafe 設定を使用して設定されます。 通常は、Data Archiving ライブラリのreference.conf設定ファイルによって提供されたデフォルトの設定とは異なる、アプリケーション固有のすべての設定を含むapplication.confファイルを提供します。

Data Archiving ライブラリは、データ クライアント ライブラリを使用して HERE platform にアクセスします。そのため、データ アーカイブ ライブラリapplication.confSet Up a ProxySelecting a Connector TypeSelecting a Discovery Service Envなど、頻繁に設定されるデータ クライアント ライブラリ設定を簡単に指定できます。 さらに、必要に応じて設定を追加できます。 詳細について は、「データクライアントの設定」を参照してください。

メモ

データ クライアント ライブラリの一部のデフォルト設定は、データアーカイブライブラリによって上書きされます。 参照の Data Archiving Library custom default properties for Data Client Libraryために、これらの設定は以下のタブに表示されます。 でこれらの設定を変更しないことをお勧め application.conf します。変更すると、 Data Archiving ライブラリのフォールトトレランスが低下する可能性があります。

メモ

このプロパティ aggregation.window-seconds は、 type の index 属性とは異なり timewindowます。 このプロパティ aggregation.window-seconds は、データ処理パイプラインによってデータが集計および処理される頻度を指定します。 タイプの index 属性 timewindow には、属性 duration のフィールドで指定されたとおりに、データのインデックス付けと後でのクエリーを行う、最も正確な時間の精度が含まれています。

メモ

を使用する場合 http-connector、サブスクリプション数はストリーム レイヤーで許可されている並列処理数を超えることはできません。 このプロパティ source.consumer-id を使用すると、コンシューマグループのサブスクリプション数を特定できます。 したがって、 http-connector の場合 source.consumer-idは、プロパティを設定することをお勧めします。

コンシューマグループ内で一意である必要があります。 入力しないと、システムによって生成されます。 消費者が(障害が発生した場合に)復旧する必要がある場合、同じ consumer-group 値と consumer-id 値を使用すると、古いサブスクリプションが再使用されます。 詳細について は、「ストリーム レイヤーからデータを取得する」を参照してください。

ユーザー定義のプロパティ
Data Archiving ライブラリのデフォルトのプロパティ
ライブラリの Data Archiving データ クライアント ライブラリカスタムデフォルトプロパティ
############################################################
## Data Archiving Library Application Config File Example ##
############################################################

# FOR USING DATA ARCHIVING LIBRARY, IT IS MANDATORY FOR USERS TO PACKAGE application.conf FILE IN APPLICATION JAR.

#############################################
## Required properties in application.conf ##
#############################################

# These settings are for Data Archiving Library's Stream Execution Environment.
env {

  # Fully Qualified Class Name of any User Defined Function interface implementation provided by the user.
  # The class must be public and have a public constructor.
  udf = <UPDATE_ME>    # Eg:- udf = com.here.platform.data.archive.example.AvroSimpleKeyExample
}

# These settings are for Data Archiving Library's source (Stream Layer).
source {

  # Here Resource Name for Catalog which contains Stream Layer whose data is to be archived.
  hrn = "<UPDATE_ME>"    # Eg:- hrn = "hrn:here:data::olp-here:sensor-data-stream" or hrn = "hrn:here-cn:data::olp-cn-here:sensor-data-stream" (for China)

  # Stream Layer ID whose data is to be archived.
  layer = "<UPDATE_ME>"    # Eg:- layer = "stream-layer"

  # Any string that uniquely identifies the data archiving pipeline
  consumer-group = "<UPDATE_ME>"    # Eg:- consumer-group = "my-sensor-data-stream-avro-group"
}

# These settings are for Data Archiving Library's sink (Index Layer).
sink {

  # Here Resource Name for Catalog which contains Index Layer where data is to be archived.
  hrn = "<UPDATE_ME>"    # Eg:- hrn = "hrn:here:data::olp-here:sensor-data-archive" or hrn = "hrn:here-cn:data::olp-cn-here:sensor-data-archive" (for China)

  # Index Layer ID where data is to be archived.
  layer = "<UPDATE_ME>"    # Eg:- layer = "index-layer"
}

# These settings are for the Data Client Library used in the Data Archiving Library.
here.platform.data-client {

  # Discovery of baseUrls of various Data APIs like publish, metadata, query, etc.
  endpoint-locator {
    # Determines which environment to use for the discovery service's endpoints.
    # Possible values are: 'here', 'here-dev', 'here-cn', 'here-cn-dev', 'custom'.
    # If 'custom' is specified then the 'discovery-service-url' property MUST be set.
    discovery-service-env = <UPDATE_ME>    # Eg:- discovery-service-env = here or discovery-service-env = here-cn (for China)
    # Defines a URL for a custom discovery service endpoint.
    # discovery-service-url = "<custom discovery service URL>"
  }
}

##########################################################################
## Optional properties recommended to be overridden in application.conf ##
##########################################################################

# These settings are for Data Archiving Library's source (Stream Layer).
source {

  # This property is valid only when using http-connector for stream layer. The number of subscriptions cannot exceed the parallelism allowed by stream layer.
  # When consumer needs to recover (in case of failure), if same consumer-group and consumer-id values are used, then old subscription will be re-used.
  consumer-id = "<UPDATE_ME>"    # Eg:- consumer-id = "my-unique-consumer-id"
}

# These settings are for Data Archiving Library's aggregation logic.
aggregation {

  # The Data Archiving Library splits the stream into "buckets" of time interval.
  # For all elements in each bucket, the stream is also split into logical keyed streams based on indexing attributes of each element.
  # This property decides how long the user wants the Data Archiving Pipeline to aggregate the data in memory before archiving to Index Layer.
  # Note that if the value is very small, it will impact the performance of archiving as smaller files will be archived frequently.
  # However, if the value is very big, it requires higher storage requirement (more workers/worker units) to hold the data in disk and memory.
  # For state size consideration, note that Flink creates one copy of each element per window to which it belongs.
  # Default value is set to 15 minutes. The recommended value range is from 10 minutes to 60 minutes. Allowed value range is from 1 second to 24 hours.
  # The value should also be less than or equal to stream layer's time-to-live (ttl) retention period value. The recommendation for the value is to be much smaller than the stream layer ttl.
  # If these two values are too close, there is risk of data expiring from stream layer before it is processed by the pipeline.
  window-seconds = <UPDATE_ME>    # Eg:- window-seconds = 1200

  # Having a keyed stream allows the windowed computation to be performed in parallel by multiple tasks, as each logical keyed stream can be processed independently from the rest.
  # In each aggregation window, all elements referring to the same key will be sent to the same parallel task.
  # Assuming there is no hotspot problem (most if not all elements have the same key), having higher parallelism will improve the data archiving pipeline performance.
  # Required parallelism can be determined by different parameters like indexing attributes cardinality, uniqueness, etc. Please refer Best Practices section in the developer guide for details.
  # It is highly recommended to override this property. Note that the value of this property should be less than or equal to number of workers selected when creating a pipeline.
  parallelism = <UPDATE_ME>    # Eg:- parallelism = 10
}

# These settings allows you to specify what your pipeline will do when an error occurs in your User Defined Function (UDF) implementation.
# The Data Archiving Library will invoke the error handling strategy when:
# - A UDF implementation throws a non-parsable message
# - A UDF implementation returns an unchecked/runtime exception
# - A UDF implementation returns null
# - An indexing attribute value does not pass the Data Archiving Library's validation rules.
error {
  # You can choose one of the following error handling strategies:
  # - "fail" - The pipeline fails on any error from a User-Defined Function's implementation. The pipeline logs the action.
  # - "ignore" - The pipeline ignores the messages that encounter an error in a User-Defined Function's implementation. The pipeline logs the action and continues processing the next message. This is the default strategy.
  # - "deadletter" - The pipeline archives messages that encounter an error in a User-Defined Function's implementation. The messages are archived in a dead letter index layer, and processing continues. See the following section for more information about using this strategy.
  strategy = <UPDATE_ME>    # Eg:- strategy = "fail"

  # For "deadletter" strategy,
  # Create an index layer for archiving messages that encounter an error.
  # This special index layer must have following settings:
  # - Content Type must be "application/x-avro-binary".
  # - Content Encoding must be "uncompressed".
  # - There must be four indexing attributes with following names and settings:
  #   - A `timewindow` type attribute with the name "indexUploadTime". You can select the desired `duration`. This attribute stores the timestamp of the index upload, truncated by the duration value.
  #   - A `string` type attribute with the name "realm". This attribute stores the realm of the index layer where messages that were successfully processed are indexed.
  #   - A `string` type attribute with the name "catalogId". This attribute stores the `catalogId` of the index layer where messages that were successfully processed are indexed.
  #   - A `string` type attribute with name "layerId". This attribute stores the `layerId` of the index layer where messages that were successfully processed are indexed.
  # - You can select any TTL setting.
  # Add following two commented parameters in your application.conf (uncomment and update with correct values).
  # deadletter.hrn = "<CATALOG_HRN_WHERE_YOU_WANT_TO_STORE_MESSAGES_IN_CASE_OF_ERRORS>"    # Eg:- deadletter.hrn = "hrn:here:data::olp-here:sensor-data-deadletter-avro" or deadletter.hrn = "hrn:here-cn:data::olp-cn-here:sensor-data-deadletter-avro" (for China)
  # deadletter.layer = "<INDEX_LAYER_NAME_WHERE_YOU_WANT_TO_STORE_MESSAGES_IN_CASE_OF_ERRORS>"    # Eg:- deadletter.layer = "index-layer"
}

# These settings are for the Data Client Library used in the Data Archiving Library.
here.platform.data-client {

  # Stream layers are implemented as Kafka clusters.
  # To read from a stream layer, your application must use one of the following connector types:
  # "kafka-connector" - Direct Kafka is the preferred connector type since it directly communicates with the underlying Kafka cluster. It is the default connector.
  # "http-connector" - HTTP Connector is an HTTP wrapper on top of Kafka, and therefore implies a communication overhead. If your application needs to access data in the Marketplace or is running behind a proxy, use the HTTP connector.
  stream.connector.consumer = "<UPDATE_ME>"

  # Define the proxy configuration. The credentials key is optional.
  #
  # proxy {
  #   host = "localhost"
  #   port = 9999
  #
  #   credentials {
  #     username: "user"
  #     password: "pass"
  #   }
  # }
}
##################################################
## Data Archiving Library Reference Config File ##
##################################################

# This is the reference config file that contains default settings for Data Archiving Library.
# Any application-specific settings that differ from the default ones provided here should be set in your application.conf.

# These settings are for Data Archiving Library's Stream Execution Environment.
env {

  # This property helps Flink to define internal data structures of certain state backends.
  # These internal data structures help the state scale with the number of key-groups if the operator parallelism
  # is changed for stateful stream processing. This is Flink's internal implementation mechanism.
  # It is set to 200 because this is the limitation on number of nodes in the Pipeline API.
  max-parallelism = 200

  # By enabling this Flink's feature, periodic checkpoints will be persisted externally even after job failure.
  # This way, you will have a checkpoint around to resume from if your job fails. This feature is useful for recovering from cluster failure.
  # Note that this feature is currently not supported by the Pipeline API.
  externalized.checkpoint.enabled = false

  # Archiving process requires stateful stream processing. In order to make the state fault tolerant, Flink needs to checkpoint the state.
  # Checkpoint allows Flink to recover state and positions in the streams. It also allows Data Archiving Library to be fault-tolerant.
  checkpoint.enabled = true

  # Start a checkpoint every 5 minutes.
  checkpoint.interval-seconds = 300

  # Checkpoints have to complete within 1 minute, or are discarded.
  checkpoint.timeout-seconds = 60

  # Make sure 5 minutes of progress happen between checkpoints.
  checkpoint.minimum-pause-seconds = 300

  # This defines how many consecutive checkpoint failures will be tolerated, before the whole job is failed over.
  checkpoint.tolerable.failure.number = 3

  # Unaligned checkpoints contain in-flight data (i.e., data stored in buffers) as part of the checkpoint state, which allows checkpoint barriers to overtake these buffers.
  # Thus, the checkpoint duration becomes independent of the current throughput as checkpoint barriers are effectively not embedded into the stream of data anymore.
  # You should use unaligned checkpoints if your checkpointing durations are very high due to backpressure. Then, checkpointing time becomes mostly independent of the end-to-end latency.
  # Be aware unaligned checkpointing adds to I/O to the state backends, so you shouldn’t use it when the I/O to the state backend is actually the bottleneck during checkpointing.
  checkpoint.unaligned.enabled = true

  # Data Archiving Library uses Flink's Failure Rate Restart Strategy.
  # The failure rate restart strategy restarts the job after failure, but when failure rate (failures per time interval) is exceeded, the job eventually fails.
  # In-between two consecutive restart attempts, the restart strategy waits a fixed amount of time, see restart.delay-seconds.
  restart.enabled = true

  # Maximum number of restarts in given time interval before failing a job.
  restart.failure-rate = 5

  # Time interval for measuring failure rate.
  restart.failure-interval-seconds = 2700

  # Delay between two consecutive restart attempts.
  restart.delay-seconds = 60
}

# These settings are for Data Archiving Library's source (Stream Layer).
source {

  # This property represents number of consumers (tasks/workers) reading from the Stream Layer.
  # It is recommended to override this property only if there is any network limitation on your cluster.
  # Note that the value of this property should be less than or equal to number of workers selected when creating a pipeline.
  parallelism = 1

  # If there is no initial offset in source stream or the current offset does not exist anymore on the server,
  # then this property will automatically reset the offset to the latest offset (because default is set to latest).
  # Valid values are [latest, earliest]. It is highly recommended not to override this value.
  # For eg:- If the value is set to earliest, then based on stream's retention period, sufficient resources should be configured to handle the load.
  auto.offset.reset = "latest"
}

# These settings are for Data Archiving Library's parser logic.
parser {

  # This property represents message parsing parallelism.
  # Message parsing logic invokes an api, such as `getKeys`, to retrieve indexing attributes.
  # It is recommended to override this property if your logic to retrieve indexing attributes is compute intensive.
  # Note that the value of this property should be less than or equal to number of workers selected when creating a pipeline.
  parallelism = 1
}

# These settings are for Data Archiving Library's aggregation logic.
aggregation {

  # The Data Archiving Library splits the stream into "buckets" of time interval.
  # For all elements in each bucket, the stream is also split into logical keyed streams based on indexing attributes of each element.
  # This property decides how long the user wants the Data Archiving Pipeline to aggregate the data in memory before archiving to Index Layer.
  # Note that if the value is very small, it will impact the performance of archiving as smaller files will be archived frequently.
  # However, if the value is very big, it requires higher storage requirement (more workers/worker units) to hold the data in disk and memory.
  # For state size consideration, note that Flink creates one copy of each element per window to which it belongs.
  # Default value is set to 15 minutes. The recommended value range is from 10 minutes to 60 minutes. Allowed value range is from 1 second to 24 hours.
  # The value should also be less than or equal to stream layer's time-to-live (ttl) retention period value. The recommendation for the value is to be much smaller than the stream layer ttl.
  # If these two values are too close, there is risk of data expiring from stream layer before it is processed by the pipeline.
  window-seconds = 900

  # Having a keyed stream allows the windowed computation to be performed in parallel by multiple tasks, as each logical keyed stream can be processed independently from the rest.
  # In each aggregation window, all elements referring to the same key will be sent to the same parallel task.
  # Assuming there is no hotspot problem (most if not all elements have the same key), having higher parallelism will improve the data archiving pipeline performance.
  # Required parallelism can be determined by different parameters like indexing attributes cardinality, uniqueness, etc. Please refer Best Practices section in the developer guide for details.
  # It is highly recommended to override this property. Note that the value of this property should be less than or equal to number of workers selected when creating a pipeline.
  parallelism = 1
}

# These settings are for Data Archiving Library's sink (Index Layer).
sink {

  # The Data Archiving Library Sink uploads metadata (indexes) of archived files (each archived file has aggregated messages based on their indexing attributes) to Index Layer.
  # The Index Layer is updated when any of the following two conditions is true:
  # 1. Maximum number of indexes is received by sink operator (index.limit property).
  # 2. Timeout (index.timeout-seconds property).
  # The recommended maximum value for index.limit is 500 which is also the default value. User can override this value if the source stream data load is low.
  # User can also override index.timeout-seconds based on data load in the source stream. Maximum allowed value is 3600.
  index.limit = 500
  index.timeout-seconds = 150

  # This property represents number of tasks/workers uploading metadata to the Index Layer.
  # It is recommended to override this property only if there is any network limitation on your cluster.
  # Note that the value of this property should be less than or equal to number of workers selected when creating a pipeline.
  parallelism = 1
}

# These settings allows you to specify what your pipeline will do when an error occurs in your User Defined Function (UDF) implementation.
# The Data Archiving Library will invoke the error handling strategy when:
# - A UDF implementation throws a non-parsable message
# - A UDF implementation returns an unchecked/runtime exception
# - A UDF implementation returns null
# - An indexing attribute value does not pass the Data Archiving Library's validation rules.
error {
  # You can choose one of the following error handling strategies:
  # - "fail" - The pipeline fails on any error from a User-Defined Function's implementation. The pipeline logs the action.
  # - "ignore" - The pipeline ignores the messages that encounter an error in a User-Defined Function's implementation. The pipeline logs the action and continues processing the next message. This is the default strategy.
  # - "deadletter" - The pipeline archives messages that encounter an error in a User-Defined Function's implementation. The messages are archived in a dead letter index layer, and processing continues. See the following section for more information about using this strategy.
  strategy = "ignore"

  # For "deadletter" strategy,
  # Create an index layer for archiving messages that encounter an error.
  # This special index layer must have following settings:
  # - Content Type must be "application/x-avro-binary".
  # - Content Encoding must be "uncompressed".
  # - There must be four indexing attributes with following names and settings:
  #   - A `timewindow` type attribute with the name "indexUploadTime". You can select the desired `duration`. This attribute stores the timestamp of the index upload, truncated by the duration value.
  #   - A `string` type attribute with the name "realm". This attribute stores the realm of the index layer where messages that were successfully processed are indexed.
  #   - A `string` type attribute with the name "catalogId". This attribute stores the `catalogId` of the index layer where messages that were successfully processed are indexed.
  #   - A `string` type attribute with name "layerId". This attribute stores the `layerId` of the index layer where messages that were successfully processed are indexed.
  # - You can select any TTL setting.
  # Add following two commented parameters in your application.conf (uncomment and update with correct values).
  # deadletter.hrn = "<CATALOG_HRN_WHERE_YOU_WANT_TO_STORE_MESSAGES_IN_CASE_OF_ERRORS>"        # Eg:- "hrn:here:data::olp-here:sensor-data-deadletter-avro" or "hrn:here-cn:data::olp-cn-here:sensor-data-deadletter-avro" (for China)
  # deadletter.layer = "<INDEX_LAYER_NAME_WHERE_YOU_WANT_TO_STORE_MESSAGES_IN_CASE_OF_ERRORS>" # Eg:- "index-layer"
}
###############################################################################################
## Data Archiving Library Custom Config File Overriding Data Client Library Default Settings ##
###############################################################################################

# This is the custom reference config file that contains custom default settings of Data Client Library for Data Archiving Library.
# Any application-specific settings that differ from the custom default ones provided here should be set in your application.conf.

here.platform.data-client {
  retry-policy {
    type = "best-effort"
  }
}

」に一致する結果は 件です

    」に一致する結果はありません