アプリケーションを設定します

データアーカイブライブラリは、typesafe 設定を使用して設定されます。 通常は、Data Archiving ライブラリのreference.conf設定ファイルによって提供されたデフォルトの設定とは異なる、アプリケーション固有のすべての設定を含むapplication.confファイルを提供します。

Data Archiving ライブラリは、データ クライアント ライブラリを使用して HERE platform にアクセスします。そのため、データ アーカイブ ライブラリapplication.confSet Up a ProxySelecting a Connector TypeSelecting a Discovery Service Envなど、頻繁に設定されるデータ クライアント ライブラリ設定を簡単に指定できます。 さらに、必要に応じて設定を追加できます。 詳細について は、「データクライアントの設定」を参照してください。

データ クライアント ライブラリの一部のデフォルト設定は、データアーカイブライブラリによって上書きされます。 参照の Data Archiving Library custom default properties for Data Client Libraryために、これらの設定は以下のタブに表示されます。 でこれらの設定を変更しないことをお勧め application.conf します。変更すると、 Data Archiving ライブラリのフォールトトレランスが低下する可能性があります。

このプロパティ aggregation.window-seconds は、 type の index 属性とは異なり timewindowます。 このプロパティ aggregation.window-seconds は、データ処理パイプラインによってデータが集計および処理される頻度を指定します。 タイプの index 属性 timewindow には、属性 duration のフィールドで指定されたとおりに、データのインデックス付けと後でのクエリーを行う、最も正確な時間の精度が含まれています。

を使用する場合 http-connector、サブスクリプション数はストリーム レイヤーで許可されている並列処理数を超えることはできません。 このプロパティ source.consumer-id を使用すると、コンシューマグループのサブスクリプション数を特定できます。 したがって、 http-connector の場合 source.consumer-idは、プロパティを設定することをお勧めします。

コンシューマグループ内で一意である必要があります。 入力しないと、システムによって生成されます。 消費者が(障害が発生した場合に)復旧する必要がある場合、同じ consumer-group 値と consumer-id 値を使用すると、古いサブスクリプションが再使用されます。 詳細について は、「ストリーム レイヤーからデータを取得する」を参照してください。

ユーザー定義のプロパティ
Data Archiving ライブラリのデフォルトのプロパティ
ライブラリの Data Archiving データ クライアント ライブラリカスタムデフォルトプロパティ
############################################################
## Data Archiving Library Application Config File Example ##
############################################################

# FOR USING DATA ARCHIVING LIBRARY, IT IS MANDATORY FOR USERS TO PACKAGE application.conf FILE IN APPLICATION JAR.

#############################################
## Required properties in application.conf ##
#############################################

# These settings are for Data Archiving Library's Stream Execution Environment.
env {

  # Fully Qualified Class Name of any User Defined Function interface implementation provided by the user.
  # The class must be public and have a public constructor.
  udf = <UPDATE_ME>    # Eg:- udf = com.here.platform.data.archive.example.AvroSimpleKeyExample
}

# These settings are for Data Archiving Library's source (Stream Layer).
source {

  # Here Resource Name for Catalog which contains Stream Layer whose data is to be archived.
  # This property can be passed through the pipeline-config.conf file.
  # If this property is passed in both the application.conf and pipeline-config.conf files,
  # then the value will be taken from the pipeline-config.conf file because it has a higher priority.
  hrn = "<UPDATE_ME>"    # Eg:- hrn = "hrn:here:data::olp-here:sensor-data-stream" or hrn = "hrn:here-cn:data::olp-cn-here:sensor-data-stream" (for China)

  # Stream Layer ID whose data is to be archived.
  layer = "<UPDATE_ME>"    # Eg:- layer = "stream-layer"

  # Any string that uniquely identifies the data archiving pipeline
  consumer-group = "<UPDATE_ME>"    # Eg:- consumer-group = "my-sensor-data-stream-avro-group"
}

# These settings are for Data Archiving Library's sink (Index Layer).
sink {

  # Here Resource Name for Catalog which contains Index Layer where data is to be archived.
  # This property can be passed through the pipeline-config.conf file.
  # If this property is passed in both the application.conf and pipeline-config.conf files,
  # then the value will be taken from the pipeline-config.conf file because it has a higher priority.
  hrn = "<UPDATE_ME>"    # Eg:- hrn = "hrn:here:data::olp-here:sensor-data-archive" or hrn = "hrn:here-cn:data::olp-cn-here:sensor-data-archive" (for China)

  # Index Layer ID where data is to be archived.
  layer = "<UPDATE_ME>"    # Eg:- layer = "index-layer"
}

# These settings are for the Data Client Library used in the Data Archiving Library.
here.platform.data-client {

  # Discovery of baseUrls of various Data APIs like publish, metadata, query, etc.
  endpoint-locator {
    # Determines which environment to use for the discovery service's endpoints.
    # Possible values are: 'here', 'here-dev', 'here-cn', 'here-cn-dev', 'custom'.
    # If 'custom' is specified then the 'discovery-service-url' property MUST be set.
    discovery-service-env = <UPDATE_ME>    # Eg:- discovery-service-env = here or discovery-service-env = here-cn (for China)
    # Defines a URL for a custom discovery service endpoint.
    # discovery-service-url = "<custom discovery service URL>"
  }
}

##########################################################################
## Optional properties recommended to be overridden in application.conf ##
##########################################################################

# These settings are for Data Archiving Library's source (Stream Layer).
source {

  # This property is valid only when using http-connector for stream layer. The number of subscriptions cannot exceed the parallelism allowed by stream layer.
  # When consumer needs to recover (in case of failure), if same consumer-group and consumer-id values are used, then old subscription will be re-used.
  consumer-id = "<UPDATE_ME>"    # Eg:- consumer-id = "my-unique-consumer-id"
}

# These settings are for Data Archiving Library's aggregation logic.
aggregation {

  # The Data Archiving Library splits the stream into "buckets" of time interval.
  # For all elements in each bucket, the stream is also split into logical keyed streams based on indexing attributes of each element.
  # This property decides how long the user wants the Data Archiving Pipeline to aggregate the data in memory before archiving to Index Layer.
  # Note that if the value is very small, it will impact the performance of archiving as smaller files will be archived frequently.
  # However, if the value is very big, it requires higher storage requirement (more workers/worker units) to hold the data in disk and memory.
  # For state size consideration, note that Flink creates one copy of each element per window to which it belongs.
  # Default value is set to 15 minutes. The recommended value range is from 10 minutes to 60 minutes. Allowed value range is from 1 second to 24 hours.
  # The value should also be less than or equal to stream layer's time-to-live (ttl) retention period value. The recommendation for the value is to be much smaller than the stream layer ttl.
  # If these two values are too close, there is risk of data expiring from stream layer before it is processed by the pipeline.
  window-seconds = <UPDATE_ME>    # Eg:- window-seconds = 1200

  # Having a keyed stream allows the windowed computation to be performed in parallel by multiple tasks, as each logical keyed stream can be processed independently from the rest.
  # In each aggregation window, all elements referring to the same key will be sent to the same parallel task.
  # Assuming there is no hotspot problem (most if not all elements have the same key), having higher parallelism will improve the data archiving pipeline performance.
  # Required parallelism can be determined by different parameters like indexing attributes cardinality, uniqueness, etc. Please refer Best Practices section in the developer guide for details.
  # It is highly recommended to override this property. Note that the value of this property should be less than or equal to number of workers selected when creating a pipeline.
  parallelism = <UPDATE_ME>    # Eg:- parallelism = 10
}

# These settings allows you to specify what your pipeline will do when an error occurs in your User Defined Function (UDF) implementation.
# The Data Archiving Library will invoke the error handling strategy when:
# - A UDF implementation throws a non-parsable message
# - A UDF implementation returns an unchecked/runtime exception
# - A UDF implementation returns null
# - An indexing attribute value does not pass the Data Archiving Library's validation rules.
error {
  # You can choose one of the following error handling strategies:
  # - "fail" - The pipeline fails on any error from a User-Defined Function's implementation. The pipeline logs the action.
  # - "ignore" - The pipeline ignores the messages that encounter an error in a User-Defined Function's implementation. The pipeline logs the action and continues processing the next message. This is the default strategy.
  # - "deadletter" - The pipeline archives messages that encounter an error in a User-Defined Function's implementation. The messages are archived in a dead letter index layer, and processing continues. See the following section for more information about using this strategy.
  strategy = <UPDATE_ME>    # Eg:- strategy = "fail"

  # For "deadletter" strategy,
  # Create an index layer for archiving messages that encounter an error.
  # This special index layer must have following settings:
  # - Content Type must be "application/x-avro-binary".
  # - Content Encoding must be "uncompressed".
  # - There must be four indexing attributes with following names and settings:
  #   - A `timewindow` type attribute with the name "indexUploadTime". You can select the desired `duration`. This attribute stores the timestamp of the index upload, truncated by the duration value.
  #   - A `string` type attribute with the name "realm". This attribute stores the realm of the index layer where messages that were successfully processed are indexed.
  #   - A `string` type attribute with the name "catalogId". This attribute stores the `catalogId` of the index layer where messages that were successfully processed are indexed.
  #   - A `string` type attribute with name "layerId". This attribute stores the `layerId` of the index layer where messages that were successfully processed are indexed.
  # - You can select any TTL setting.
  # Add following two commented parameters in your application.conf (uncomment and update with correct values).
  # deadletter.hrn = "<CATALOG_HRN_WHERE_YOU_WANT_TO_STORE_MESSAGES_IN_CASE_OF_ERRORS>"    # Eg:- deadletter.hrn = "hrn:here:data::olp-here:sensor-data-deadletter-avro" or deadletter.hrn = "hrn:here-cn:data::olp-cn-here:sensor-data-deadletter-avro" (for China)
  # deadletter.layer = "<INDEX_LAYER_NAME_WHERE_YOU_WANT_TO_STORE_MESSAGES_IN_CASE_OF_ERRORS>"    # Eg:- deadletter.layer = "index-layer"
}

# These settings are for the Data Client Library used in the Data Archiving Library.
here.platform.data-client {

  # Stream layers are implemented as Kafka clusters.
  # To read from a stream layer, your application must use one of the following connector types:
  # "kafka-connector" - Direct Kafka is the preferred connector type since it directly communicates with the underlying Kafka cluster. It is the default connector.
  # "http-connector" - HTTP Connector is an HTTP wrapper on top of Kafka, and therefore implies a communication overhead. If your application needs to access data in the Marketplace or is running behind a proxy, use the HTTP connector.
  stream.connector.consumer = "<UPDATE_ME>"

  # Define the proxy configuration. The credentials key is optional.
  #
  # proxy {
  #   host = "localhost"
  #   port = 9999
  #
  #   credentials {
  #     username: "user"
  #     password: "pass"
  #   }
  # }
}
##################################################
## Data Archiving Library Reference Config File ##
##################################################

# This is the reference config file that contains default settings for Data Archiving Library.
# Any application-specific settings that differ from the default ones provided here should be set in your application.conf.

# These settings are for Data Archiving Library's Stream Execution Environment.
env {

  # This property helps Flink to define internal data structures of certain state backends.
  # These internal data structures help the state scale with the number of key-groups if the operator parallelism
  # is changed for stateful stream processing. This is Flink's internal implementation mechanism.
  # It is set to 200 because this is the limitation on number of nodes in the Pipeline API.
  max-parallelism = 200

  # By enabling this Flink's feature, periodic checkpoints will be persisted externally even after job failure.
  # This way, you will have a checkpoint around to resume from if your job fails. This feature is useful for recovering from cluster failure.
  # Note that this feature is currently not supported by the Pipeline API.
  externalized.checkpoint.enabled = false

  # Archiving process requires stateful stream processing. In order to make the state fault tolerant, Flink needs to checkpoint the state.
  # Checkpoint allows Flink to recover state and positions in the streams. It also allows Data Archiving Library to be fault-tolerant.
  checkpoint.enabled = true

  # Start a checkpoint every 5 minutes.
  checkpoint.interval-seconds = 300

  # Checkpoints have to complete within 1 minute, or are discarded.
  checkpoint.timeout-seconds = 60

  # Make sure 5 minutes of progress happen between checkpoints.
  checkpoint.minimum-pause-seconds = 300

  # This defines how many consecutive checkpoint failures will be tolerated, before the whole job is failed over.
  checkpoint.tolerable.failure.number = 3

  # Unaligned checkpoints contain in-flight data (i.e., data stored in buffers) as part of the checkpoint state, which allows checkpoint barriers to overtake these buffers.
  # Thus, the checkpoint duration becomes independent of the current throughput as checkpoint barriers are effectively not embedded into the stream of data anymore.
  # You should use unaligned checkpoints if your checkpointing durations are very high due to backpressure. Then, checkpointing time becomes mostly independent of the end-to-end latency.
  # Be aware unaligned checkpointing adds to I/O to the state backends, so you shouldn’t use it when the I/O to the state backend is actually the bottleneck during checkpointing.
  checkpoint.unaligned.enabled = true

  # Data Archiving Library uses Flink's Failure Rate Restart Strategy.
  # The failure rate restart strategy restarts the job after failure, but when failure rate (failures per time interval) is exceeded, the job eventually fails.
  # In-between two consecutive restart attempts, the restart strategy waits a fixed amount of time, see restart.delay-seconds.
  restart.enabled = true

  # Maximum number of restarts in given time interval before failing a job.
  restart.failure-rate = 5

  # Time interval for measuring failure rate.
  restart.failure-interval-seconds = 2700

  # Delay between two consecutive restart attempts.
  restart.delay-seconds = 60
}

# These settings are for Data Archiving Library's source (Stream Layer).
source {

  # This property represents number of consumers (tasks/workers) reading from the Stream Layer.
  # It is recommended to override this property only if there is any network limitation on your cluster.
  # Note that the value of this property should be less than or equal to number of workers selected when creating a pipeline.
  parallelism = 1

  # If there is no initial offset in source stream or the current offset does not exist anymore on the server,
  # then this property will automatically reset the offset to the latest offset (because default is set to latest).
  # Valid values are [latest, earliest]. It is highly recommended not to override this value.
  # For eg:- If the value is set to earliest, then based on stream's retention period, sufficient resources should be configured to handle the load.
  auto.offset.reset = "latest"
}

# These settings are for Data Archiving Library's parser logic.
parser {

  # This property represents message parsing parallelism.
  # Message parsing logic invokes an api, such as `getKeys`, to retrieve indexing attributes.
  # It is recommended to override this property if your logic to retrieve indexing attributes is compute intensive.
  # Note that the value of this property should be less than or equal to number of workers selected when creating a pipeline.
  parallelism = 1
}

# These settings are for Data Archiving Library's aggregation logic.
aggregation {

  # The Data Archiving Library splits the stream into "buckets" of time interval.
  # For all elements in each bucket, the stream is also split into logical keyed streams based on indexing attributes of each element.
  # This property decides how long the user wants the Data Archiving Pipeline to aggregate the data in memory before archiving to Index Layer.
  # Note that if the value is very small, it will impact the performance of archiving as smaller files will be archived frequently.
  # However, if the value is very big, it requires higher storage requirement (more workers/worker units) to hold the data in disk and memory.
  # For state size consideration, note that Flink creates one copy of each element per window to which it belongs.
  # Default value is set to 15 minutes. The recommended value range is from 10 minutes to 60 minutes. Allowed value range is from 1 second to 24 hours.
  # The value should also be less than or equal to stream layer's time-to-live (ttl) retention period value. The recommendation for the value is to be much smaller than the stream layer ttl.
  # If these two values are too close, there is risk of data expiring from stream layer before it is processed by the pipeline.
  window-seconds = 900

  # Having a keyed stream allows the windowed computation to be performed in parallel by multiple tasks, as each logical keyed stream can be processed independently from the rest.
  # In each aggregation window, all elements referring to the same key will be sent to the same parallel task.
  # Assuming there is no hotspot problem (most if not all elements have the same key), having higher parallelism will improve the data archiving pipeline performance.
  # Required parallelism can be determined by different parameters like indexing attributes cardinality, uniqueness, etc. Please refer Best Practices section in the developer guide for details.
  # It is highly recommended to override this property. Note that the value of this property should be less than or equal to number of workers selected when creating a pipeline.
  parallelism = 1
}

# These settings are for Data Archiving Library's sink (Index Layer).
sink {

  # The Data Archiving Library Sink uploads metadata (indexes) of archived files (each archived file has aggregated messages based on their indexing attributes) to Index Layer.
  # The Index Layer is updated when any of the following two conditions is true:
  # 1. Maximum number of indexes is received by sink operator (index.limit property).
  # 2. Timeout (index.timeout-seconds property).
  # The recommended maximum value for index.limit is 500 which is also the default value. User can override this value if the source stream data load is low.
  # User can also override index.timeout-seconds based on data load in the source stream. Maximum allowed value is 3600.
  index.limit = 500
  index.timeout-seconds = 150

  # This property represents number of tasks/workers uploading metadata to the Index Layer.
  # It is recommended to override this property only if there is any network limitation on your cluster.
  # Note that the value of this property should be less than or equal to number of workers selected when creating a pipeline.
  parallelism = 1
}

# These settings allows you to specify what your pipeline will do when an error occurs in your User Defined Function (UDF) implementation.
# The Data Archiving Library will invoke the error handling strategy when:
# - A UDF implementation throws a non-parsable message
# - A UDF implementation returns an unchecked/runtime exception
# - A UDF implementation returns null
# - An indexing attribute value does not pass the Data Archiving Library's validation rules.
error {
  # You can choose one of the following error handling strategies:
  # - "fail" - The pipeline fails on any error from a User-Defined Function's implementation. The pipeline logs the action.
  # - "ignore" - The pipeline ignores the messages that encounter an error in a User-Defined Function's implementation. The pipeline logs the action and continues processing the next message. This is the default strategy.
  # - "deadletter" - The pipeline archives messages that encounter an error in a User-Defined Function's implementation. The messages are archived in a dead letter index layer, and processing continues. See the following section for more information about using this strategy.
  strategy = "ignore"

  # For "deadletter" strategy,
  # Create an index layer for archiving messages that encounter an error.
  # This special index layer must have following settings:
  # - Content Type must be "application/x-avro-binary".
  # - Content Encoding must be "uncompressed".
  # - There must be four indexing attributes with following names and settings:
  #   - A `timewindow` type attribute with the name "indexUploadTime". You can select the desired `duration`. This attribute stores the timestamp of the index upload, truncated by the duration value.
  #   - A `string` type attribute with the name "realm". This attribute stores the realm of the index layer where messages that were successfully processed are indexed.
  #   - A `string` type attribute with the name "catalogId". This attribute stores the `catalogId` of the index layer where messages that were successfully processed are indexed.
  #   - A `string` type attribute with name "layerId". This attribute stores the `layerId` of the index layer where messages that were successfully processed are indexed.
  # - You can select any TTL setting.
  # Add following two commented parameters in your application.conf (uncomment and update with correct values).
  # deadletter.hrn = "<CATALOG_HRN_WHERE_YOU_WANT_TO_STORE_MESSAGES_IN_CASE_OF_ERRORS>"        # Eg:- "hrn:here:data::olp-here:sensor-data-deadletter-avro" or "hrn:here-cn:data::olp-cn-here:sensor-data-deadletter-avro" (for China)
  # deadletter.layer = "<INDEX_LAYER_NAME_WHERE_YOU_WANT_TO_STORE_MESSAGES_IN_CASE_OF_ERRORS>" # Eg:- "index-layer"
}
###############################################################################################
## Data Archiving Library Custom Config File Overriding Data Client Library Default Settings ##
###############################################################################################

# This is the custom reference config file that contains custom default settings of Data Client Library for Data Archiving Library.
# Any application-specific settings that differ from the custom default ones provided here should be set in your application.conf.

here.platform.data-client {
  retry-policy {
    type = "best-effort"
  }
}

」に一致する結果は 件です

    」に一致する結果はありません