productName variable is missing Metrics

The productName variable is missing provides a following set of metrics to monitor the archiving process:

Metric Description
messageCounter number of messages read by operator (should be same as number of messages consumed by source)
getDataSuccessCounter number of messages successfully downloaded from blob-store
getDataFailCounter number of messages failed to download from blob-store
sizeCounter size of messages (byte array size)
getKeysNullCounter number of times getKeys/getMultipleKeys/getSplittedKeys API returns null
getKeysFailCounter number of times getKeys/getMultipleKeys/getSplittedKeys API results in error (null case is included in this metric)
timeWindowMissingCounter number of times the value of timewindow type index attribute was null
timeWindowInvalidCounter number of times the value of timewindow type index attribute was invalid (expects long value)
hereTileInvalidZoomLevelCounter number of times the value of heretile type index attribute is associated with wrong zoom level
hereTileNonParsablelCounter number of times the value of heretile type index attribute was invalid (expects long value)
invalidColumnValueCounter number of times the value of any index attribute had unsupported data type (string, int, long, boolean are supported data types)
parserIgnoreCounter number of messages ignored by the library because of validation error
parserDeadLetterCounter number of messages marked by the library to archive in dead-letter storage because of validation error
keyPartitionCounter number of unique index attribute groups in a time-window
aggregateSuccessMessageCounter number of messages successfully processed by aggregate API
aggregateSuccessSizeCounter size of messages successfully processed by aggregate API
aggregateNullMessageCounter number of messages for which aggregate API returns null
aggregateNullSizeCounter size of messages for which aggregate API returns null
aggregateFailMessageCounter number of messages for which aggregate API results in error (null case is included in this metric)
aggregateFailSizeCounter size of messages for which aggregate API results in error (null case is included in this metric)
aggregateIgnoreCounter number of times the result of aggregate API is ignored
aggregateDeadLetterCounter number of times the messages passed to aggregate API are aggregated for dead-letter storage
windowDeadLetterMessageCounter number of messages setup for archival in dead-letter storage
windowDeadLetterSizeCounter size of messages setup for archival in dead-letter storage
transformDeadletterSuccessCounter number of messages successfully transformed using transformForDeadLetter API for dead-letter storage
transformDeadletterFailCounter number of messages failed to transform using transformForDeadLetter API for dead-letter storage
blobSuccessRequestCounter number of files successfully uploaded to blob-store
blobSizeCounter size of files successfully uploaded to blob-store
blobTimeCounter time taken by the library to upload the files to blob-store
indexInvokeCounter number of index records received by the sink operator (should be same as blobSuccessRequestCounter)
indexUploadCounter number of index records successfully published to index-store
indexValidCounter number of index records received by the sink operator for normal archival storage
indexDeadLetterCounter number of index records received by the sink operator for special dead-letter storage
indexSuccessRequestCounter number of successful requests to index-store (1 request can have N number of index records)
indexTimeCounter time taken by the library to publish the index records to index-store
indexBatchLimitCounter number of times the sink operator reached batch limit for number of index records to be published in 1 request
indexTimeoutCounter number of times the request to publish index records was triggered by timeout

Add User-Defined Metric

While the productName variable is missing provides a comprehensive set of the archiving process metrics, the user has the option to define their own metrics using the standard Flink APIs. To define a new custom metric:

  • Add your custom metric as an instance variable in your UDF implementation
    public class MyExampleImplementation implements UDF {
    private Counter customMetric;
    ...
    }
    
  • Override the open(Configuration parameters, RuntimeContext runtimeContext) API and register the custom metric which you added in the previous step with the MetricGroup of the provided RuntimeContext. For example:
    ...
    @Override
    public void open(Configuration parameters, RuntimeContext runtimeContext) {
      customMetric = runtimeContext.getMetricGroup().counter("myCustomMetric");
    }
    
  • Record the metric in your UDF implementation:
    customMetric.inc();
    
  • The following dependencies should be added with provided scope. Note that this example is for maven build tool.
    <dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-core</artifactId>
     <scope>provided</scope>
    </dependency>
    <dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-metrics-core</artifactId>
     <scope>provided</scope>
    </dependency>
    

Refer to com/here/platform/data/archive/example/AvroSimpleKeyExample.java in the Data Archive Avro Example for an example implementation.

Monitoring Metrics

Metrics registered with the Flink runtime context will be available in Grafana prefixed with flink_taskmanager_job_task_operator_. For example:

flink_taskmanager_job_task_operator_messageCounter
flink_taskmanager_job_task_operator_myCustomMetric

Refer to the Flink documention to learn more about the scope and naming of metrics.

results matching ""

    No results matching ""