Build a Batch Pipeline with Maven Archetypes (Java)

To build a Batch Pipeline using the Data Processing Library, you use the SDK Maven Archetypes to create a skeleton for the project. The HERE platform portal is used to manage credentials, to create a catalog and manage access rights. The SDK Maven Archetypes is used to create a skeleton of the project.

This example demonstrates how to create a pipeline that reads the Road Topology & Geometry layer in the HERE Map Content catalog and then writes the number of segment references (cardinality) for every topology node in each input partition.

Credentials

There are two types of credentials you require:

  1. Platform credentials provide access to the platform API. First, create a new application at https://platform.hereolp.cn/profile/access-credentials. Once the application is created, click Create a key to download the credentials. By default, Data Processing Library looks for credentials in the $HOME/.here/credentials.properties file. Make sure your credentials file is placed in this location.
  2. Repository credentials enable you to access the repository where the Data Processing Library is. Go to https://platform.hereolp.cn/profile/repository and click Generate Credentials). This downloads the settings.xml file. Copy this file to the $HOME/.m2/ folder.

Create the Output Catalog

First, create a new catalog to serve as the output catalog for the pipeline. The catalog has one layer where, for each partition of the Road Topology & Geometry layer, there is a partition containing the cardinalities of the topology nodes in that partition. You also need one additional layer, state, which is reserved for the Data Processing Library.

Log into the HERE platform. Select the Data tab and do the following:

  1. Click Add new catalog.
  2. Specify a CATALOG NAME, and a CATALOG ID for your catalog, such as batch-processing-quickstart-username.
  3. Next, add a CATALOG SUMMARY and a CONTACT EMAIL.
  4. Click Save and wait for the Data API to create your new catalog.

Then, give your application read/write access to the catalog, as follows:

  1. Select your catalog by searching for its name in the Search for data box.
  2. Go to Sharing, and in Manage Sharing select SHARE CATALOG WITH App. Insert your application ID, click Grant and check read and write.
  3. Finally, click Grant to enable your changes.

Add layers to the catalog:

  1. Click Add new layer and create a layer with node-cardinality as its ID. You can use node-cardinality as the layer's name too, or choose a different, human readable name.
  2. You need a HERE Tile layer, and the zoom level must be the same as the input Road Topology & Geometry layer, 12. Select Versioned for Layer Type, which you must use for every layer processed by a batch pipeline.
  3. Keep the default Content Type of application/x-protobuf so you can use Protobuf to encode the partitions. Leave the Schema field set to None.
  4. Then, click Save to complete the layer creation.
  5. Proceed with a second layer, state, and configure it according to the second row of the following table, which lists the configuration of all layers in the catalog.
Layer ID Partitioning Zoom Level Layer Type Content Type Schema
node-cardinality HERE Tile 12 Versioned application/x-protobuf None
state Generic N/A Versioned application/octet-stream None

The catalog is now fully configured. Proceed with creating a project.

Create a Project

The Data SDK includes Maven Archetypes to simplify the process of creating new batch pipelines. Using the Maven Archetypes, you can build a complete project structure using a few shell commands. The archetype automatically generates POM files that include all of the basic dependencies, sample configuration files, and source files you can edit to implement your own logic. You need to create at least three projects:

  • a top-level project, for convenience, to compile all sub-projects with a single POM file
  • a nested Schema project, to build Java/Scala bindings for the Protocol Buffers schema
  • a Process project, to build the processing logic

The following steps assume that Maven is installed and the mvn executable is in your PATH variable. You must run all of the commands below from a bash shell. The tree command is used to show the folder structures. Alternatively, you can use ls -R as a replacement.

First, create a top-level project named nodecardinality by running the following command, press ENTER to confirm:

$ pwd
~/projects

$ mvn archetype:generate -DarchetypeGroupId=org.codehaus.mojo.archetypes \
                         -DarchetypeArtifactId=pom-root \
                         -DarchetypeVersion=1.1 \
                         -DgroupId=com.example \
                         -DartifactId=nodecardinality \
                         -Dversion=1.0.0 \
                         -Dpackage=com.example.nodecardinality

This creates a nodecardinality folder in the current directory, containing the following files:

$ pwd
~/projects

$ tree
.
`-- nodecardinality
    `-- pom.xml

1 directory, 1 file

Sub-projects are created from within this folder. Navigate to the nodecardinality folder to create the sub-projects. First create a Model project by running the following command, press ENTER to confirm:

$ pwd
~/projects/nodecardinality

$ mvn archetype:generate -DarchetypeGroupId=com.here.platform.schema \
                         -DarchetypeArtifactId=project_archetype \
                         -DarchetypeVersion=X.Y.Z \
                         -DgroupId=com.example.nodecardinality \
                         -DartifactId=schema \
                         -Dversion=1.0.0 \
                         -Dpackage=com.example.nodecardinality.schema \
                         -DmajorVersion=0

For specific documentation about the latest version of the archetype included in the SDK, see the Archetypes Developer Guide.

This creates a project template in the nodecardinality/schema folder containing a project to build the schema for the output catalog:

$ pwd
~/projects/nodecardinality

$ tree
.
|-- pom.xml
`-- schema
    |-- ds
    |   |-- pom.xml
    |   `-- src
    |       |-- assembly
    |       |   `-- proto.xml
    |       `-- main
    |           `-- resources
    |               |-- ResourcesReadMe.txt
    |               `-- renderers
    |                   `-- ReadMe.txt
    |-- java
    |   `-- pom.xml
    |-- pom.xml
    |-- proto
    |   |-- pom.xml
    |   `-- src
    |       |-- assembly
    |       |   `-- proto.xml
    |       `-- main
    |           |-- proto
    |           |   `-- com
    |           |       `-- example
    |           |           `-- nodecardinality
    |           |               `-- schema
    |           |                   `-- v0
    |           |                       `-- schema.proto
    |           `-- resources
    |               `-- description.md
    |-- scala
    |   `-- pom.xml
    `-- schema.yml

20 directories, 13 files

Finally, still within the nodecardinality folder, run the following command to create a processor template including a Direct1ToNCompiler, press ENTER to confirm:

$ pwd
~/projects/nodecardinality

$ mvn archetype:generate -DarchetypeGroupId=com.here.platform \
                         -DarchetypeArtifactId=batch-direct1ton-java-archetype \
                         -DarchetypeVersion=X.Y.Z \
                         -DgroupId=com.example.nodecardinality \
                         -DartifactId=processor \
                         -Dversion=1.0.0 \
                         -Dpackage=com.example.nodecardinality.processor

For specific documentation about the latest version of the archetype included in the SDK, see the Archetypes Developer Guide.

An additional processor folder is now added to the nodecardinality project:

$ pwd
~/projects/nodecardinality

$ tree
.
|-- pom.xml
|-- processor
|   |-- config
|   |   |-- pipeline-config.conf
|   |   `-- pipeline-job.conf
|   |-- pom.xml
|   `-- src
|       `-- main
|           |-- java
|           |   `-- com
|           |       `-- example
|           |           `-- nodecardinality
|           |               `-- processor
|           |                   |-- Compiler.java
|           |                   |-- CompilerConfig.java
|           |                   |-- IntermediateData.java
|           |                   `-- Main.java
|           `-- resources
|               |-- application.conf
|               `-- log4j.properties
`-- schema
    |-- ds
    |   |-- pom.xml
    |   `-- src
    |       |-- assembly
    |       |   `-- proto.xml
    |       `-- main
    |           `-- resources
    |               |-- ResourcesReadMe.txt
    |               `-- renderers
    |                   `-- ReadMe.txt
    |-- java
    |   `-- pom.xml
    |-- pom.xml
    |-- proto
    |   |-- pom.xml
    |   `-- src
    |       |-- assembly
    |       |   `-- proto.xml
    |       `-- main
    |           |-- proto
    |           |   `-- com
    |           |       `-- example
    |           |           `-- nodecardinality
    |           |               `-- schema
    |           |                   `-- v0
    |           |                       `-- schema.proto
    |           `-- resources
    |               `-- description.md
    |-- scala
    |   `-- pom.xml
    `-- schema.yml

30 directories, 22 files

Schema Sub-project

The nodecardinality/schema folder contains the skeleton of a Maven project that builds Java and Scala libraries (usually referred to as bindings) to de/serialize partitions encoded as Protos. This is necessary to encode partitions in the output node-cardinality layer as Protobuf and to specify a custom partition schema.

In the project folder, there are the following components:

  • the main POM file, pom.xml, used to compile the project
  • a java folder containing a POM file to build the Java bindings for the protocol buffers
  • a scala folder containing a POM file to build the Scala bindings for the protocol buffers
  • a ds folder containing a sub-project to bundle the resulting bindings and Protobuf definitions in a ZIP file, that can be published to the Platform Artifactory repository to enable the decoding of partitions from the platform portal
  • a proto folder containing the Protobuf definitions. To specify the output schema, you need to customize this sub-project.

To create a custom Protobuf schema, add .proto files to the nodecardinality/schema/proto/src folder. For more information on Protobuf, see the Protocol Buffers Documentation.

The skeleton project you have just created already contains a .proto file you can edit to quickly define the schema of the output partitions.

Open the nodecardinality/schema/proto/src/main/proto/com/example/nodecardinality/schema/v0/schema.proto file and search for main message definition:

syntax = "proto3";

package com.example.nodecardinality.schema.v0;

// Any dependent resources should be declared in the main pom, and the files referenced here:
//import "com/company/dependentGroupId/filename.proto";

// MainProtobufMessage is a placeholder, must match package/messagename in the mainMessage tag of the layer-manifest-plugin in the schema_ds module.
message MainProtobufMessage {
    int32 lat = 1;
    int32 lon = 2;
}

Change the name of the main message from MainProtobufMessage to NodeCardinalityPartition, remove the sample fields lat and lon, and add a repeated field named node_cardinality of type NodeCardinality.

Then, define an auxiliary message type NodeCardinality with two fields, the ID of the node (id) and the cardinality of the node (cardinality). This new Protobuf definition looks like this:

syntax = "proto3";

package com.example.nodecardinality.schema.v0;

// Any dependent resources should be declared in the main pom, and the files referenced here:
//import "com/company/dependentGroupId/filename.proto";

message NodeCardinalityPartition {
  repeated NodeCardinality node_cardinality = 1;
}

message NodeCardinality {
  string id = 1;
  uint32 cardinality = 2;
}

Since you changed the name of the main message, remember to update the configuration used to build the Schema bundle. Open the POM file of the ds sub-project (nodecardinality/schema/ds/pom.xml) and locate the configuration for the layer-manifest-plugin:

      <!-- build the layer manifest file -->
      <plugin>
        <groupId>com.here.platform.schema.maven_plugins</groupId>
        <artifactId>layer-manifest-plugin</artifactId>
        <version>${here.plugin.version}</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>write-manifest</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <!-- Message class used to de/serialize data held in a DataStore layer.
               NOTE: If changed here, must correspond to a message in Protobuf definitions. -->
          <mainMessage>com.example.nodecardinality.schema.v0.MainProtobufMessage</mainMessage>
          <inputDir>${project.build.directory}/proto</inputDir>
          <writeManifest>true</writeManifest>
        </configuration>
      </plugin>

Change the mainMessage path in the plugin configuration, replacing com.example.nodecardinality.schema.v0.MainProtobufMessage with com.example.nodecardinality.schema.v0.NodeCardinalityPartition:

      <!-- build the layer manifest file -->
      <plugin>
        <groupId>com.here.platform.schema.maven_plugins</groupId>
        <artifactId>layer-manifest-plugin</artifactId>
        <version>${here.plugin.version}</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>write-manifest</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <!-- Message class used to de/serialize data held in a DataStore layer.
               NOTE: If changed here, must correspond to a message in Protobuf definitions. -->
          <mainMessage>com.example.nodecardinality.schema.v0.NodeCardinalityPartition</mainMessage>
          <inputDir>${project.build.directory}/proto</inputDir>
          <writeManifest>true</writeManifest>
        </configuration>
      </plugin>

Now compile the Schema project by running mvn install from the nodecardinality/schema folder. Alternatively, run this command from the top-level project.

Two libraries are built, schema_v0_java and schema_v0_scala_${scala.compat.version}, that provide APIs to create a NodeCardinalityPartition object, serialize it to a ByteArray, and deserialize it from a ByteArray.

To write the output partitions, use schema_v0_java in your processing logic. To read the input partitions from the Road Topology & Geometry Layer, use the corresponding Java bindings, provided by com.here.schema.rib.topology-geometry_v2_java instead.

In the next section, you add those dependencies to the processor's sub-project.

Processing Logic

The nodecardinality/processor folder contains the skeleton project of a batch processing pipeline. This project builds the final processing application.

The project folder contains the following components::

  • the main POM file pom.xml, used to compile the project;
  • a src folder containing the Java source files implementing the logic;
  • a config folder containing configuration files that can be used to run the pipeline locally, outside the Pipeline API.

Pipeline Configuration

To perform a batch processing job, the Data Processing Library requires the following:

  1. HERE Resource Names (HRNs) for all the input catalogs and the output catalog.
  2. Versions for all the input catalogs for the batch job to process.

For pipelines in SCHEDULED state, this information is automatically provided by the Pipeline API via two configuration files in HOCON format:

  1. pipeline-config.conf, providing the HRN of the input catalogs and output catalog.
  2. pipeline-job.conf, providing the versions of the input catalogs to be processed.

You upload the first file to the HERE Workspace pipeline once the pipeline is deployed; it never changes between compilations. The job configuration, on the other hand, is created on the fly by the Pipeline API, when a new job is deemed to be run. For example, a new version of an input catalog exists, and the output catalog needs to be updated.

During local development, when a batch pipeline is run locally without the Pipeline API, you must provide these configuration files ourselves, by setting two Java System Properties:

  1. pipeline-config.file, containing the path to the pipeline-config.conf file
  2. pipeline-job.file, containing the path to the pipeline-job.conf file

The nodecardinality/processor/config folder contains templates for both files, which you can edit and use for local development.

The pipeline configuration's template file (nodecardinality/processor/config/pipeline-config.conf) looks like this:


// This configuration file is provided to specify input and
// output catalogs for pipelines during local development.
// In addition, the CLI uses this file when uploading the pipeline
// to the platform to create the relevant configuration.
// For more information on the HERE platform, see the
// platform documentation at https://here-tech.skawa.fun/cn/documentation.
//
// Do not include this file in the pipeline fat JAR you upload to the platform.
// When running your pipeline locally, use the command below.
//
// -Dpipeline-config.file=config/pipeline-config.conf

pipeline.config {

  output-catalog { hrn = "hrn:here-cn:data:::myoutput" } // TODO: Specify the output catalog HRN.

  input-catalogs {
    // The following keys are symbolic IDs for catalogs
    // passed to the pipeline that the platform can use to
    // bind to and identify specific inputs. Use one line
    // for each input catalog in your project. Add and
    // delete lines as necessary.
    input-catalog-1 { hrn = "hrn:here-cn:data:::myinput1" } // TODO: Specify the input catalog HRN.
    input-catalog-2 { hrn = "hrn:here-cn:data:::myinput2" } // TODO: Specify the input catalog HRN.
  }
}

pipeline.config.output-catalog.hrn indicates the output catalog's HRN. To read the HRN of the catalog you just created, open the catalog in the platform portal. If the catalog ID is batch-processing-quickstart, the corresponding HRN is hrn:here:data:::batch-processing-quickstart.

pipeline.config.input-catalogs contains the HRN of all input catalogs, indexed by arbitrary symbolic identifiers, used to identify a specific input catalog in the job configuration and in the processing logic. The template file contains two input catalogs, with identifiers input-catalog-1 and input-catalog-2 and their corresponding sample HRNs. In this project, there is only one input catalog, HERE Map Content, with the HRN hrn:here-cn:data::olp-here-cn:here-map-content-china-2. Delete those two sample input catalogs, add one with the HRN mentioned above, and then choose rib as its catalog ID:


pipeline.config {

  output-catalog { hrn = "hrn:here-cn:data:::batch-processing-quickstart" }

  input-catalogs {
    rib { hrn = "hrn:here-cn:data::olp-here-cn:here-map-content-china-2" }
  }
}

The nodecardinality/processor/config/pipeline-job.conf file is a template for the job configuration. It contains the following:


// This configuration file is provided to facilitate local development
// of pipelines with the HERE Data SDK and
// for use with pipelines that are not scheduled to run
// in reaction to changes in input catalogs.
// For more information on the HERE platform, see the
// platform documentation at https://here-tech.skawa.fun/cn/documentation.
//
// Do not include this file in the pipeline fat JAR you upload to the platform.
// When running your pipeline locally, use the command below.
//
// -Dpipeline-job.file=config/pipeline-job.conf

pipeline.job.catalog-versions {

  input-catalogs {

    // The following keys are symbolic IDs for catalogs
    // passed to the pipeline that the platform can use to
    // bind to and identify specific inputs.

    // Specify only one processing-type/version pair per input catalog.
    // Add an entry in the appropriate object (for example,
    // input-catalog-1, input catalog-2, input-catalog-3, ...)
    // for each input catalog in your project.
    // For more information on the processing type options, see
    // Build a Batch Pipeline with Maven Archetypes in the
      // Data Processing Library documentation.

    // Use case: Ignore results of previous compilation and fully process catalogs.
    // TODO: If you want to use incremental processing, comment out these sections.
    input-catalog-1 {
      processing-type = "reprocess"
      version = 0 // TODO: Specify the version of the catalog to be processed.
    }
    input-catalog-2 {
      processing-type = "reprocess"
      version = 2 // TODO: Specify the version of the catalog to be processed.
    }

    // Use case: Process catalogs incrementally. The processing
    // types below are not recommended for local development. They are
    // mostly used by platform pipelines to enable incremental compilation
    // with the Data Processing Library.
    // TODO: Comment out the section above, remove the comments below, and
    // specify the versions.
    //  input-catalog-1 {
    //    processing-type = "no_changes"
    //    version = 0 // TODO: Specify the version of the catalog to be processed.
    //  }
    //  input-catalog-2 {
    //    processing-type = "changes"
    //    since-version = 1 // TODO: Specify the correct starting version of the changeset to be processed.
    //    version = 4 // TODO: Specify the ending version of the changeset to be processed.
    //  }
  }
}

For each input catalog specified in pipeline-config.conf, the pipeline.job.catalog-versions.input-catalogs contains:

  • the version of the catalog to use for processing
  • the type of processing that the Data Processing Library uses for incremental compilation

The pipeline.job.catalog-versions.input-catalogs.input-catalog-ID.processing-type can have three different values to denote three types of processing:

  1. reprocess: this type indicates that the catalog should be fully processed, results from a previous compilation are not used to reduce the amount of data being processed. This is the simplest type of processing to use, when you are dealing with a manually written job configuration. It effectively disables incremental compilation, a feature of the Data Processing Library that allows you to reduce the amount of data processed using the results of a previous compilation. With this type of processing, you must provide the version of the catalog to process.
  2. no_changes: this type indicates that you want to reuse the same version of the catalog used when the output catalog was last compiled. This type of processing lets the Data Processing Library skip some compilation steps. You must provide the version of the catalog to process. A valid processing configuration requires the version to be equal to the version used in the last compilation. The Data Processing Library makes sure this condition holds true before the processing starts, otherwise incremental compilation is disabled.
  3. changes: this type indicates that you want to process a new version of the catalog (version) given the version processed in the last compilation (since-version). This type of processing may be used by the Data Processing Library to optimize processing, reducing the amount of data actually reprocessed. The processing configuration is valid as long the version of the catalog used in the last compilation is indeed since-version; otherwise incremental compilation is disabled.

It is important to understand that both no_changes and changes are only used to enable optimizations internally, in the Data Processing Library. Conceptually, for any input catalog, the processing library fully processes a given version, always. For this quick start, and for local development, you should rely exclusively on the reprocess processing type. Once a pipeline is deployed, it is the Pipeline API duty to provide a valid job configuration that takes maximum advantage of the Data Processing Library's optimization capabilities.

At the time of writing, the latest version of the HERE Map Content available is 1.

Let's configure the rib catalog using reprocess for processing-type and 1 for version.

This is how pipeline-job.conf should look now:

pipeline.job.catalog-versions {

  input-catalogs {

    rib {
      processing-type = "reprocess"
      version = 1
    }
  }
}

Dependencies

The SDK Maven Archetypes provides all basic dependencies in the pom.xml file in the processor sub-project. You must manually add custom dependencies used by your processing logic here. For this project two more dependencies are necessary:

  • com.here.schema.rib.topology-geometry_v2_java, to deserialize the input partitions
  • com.example.nodecardinality.schema_v0_java, which you have just created, to serialize the output partitions

Open the nodecardinality/processor/pom.xml file. There is already a placeholder for the Java bindings created by the Schema project. To find it, search for DATA_MODEL_PROJECT_NAME in the file:

    <!--
      List data model artifacts that this compiler depends on,
      for input, for output or for both, as dependency here.
    -->
    <!--
      <dependency>
        <groupId>com.example.nodecardinality</groupId>
        <artifactId>{DATA_MODEL_PROJECT_NAME}_java</artifactId>
        <version>{DATA_MODEL_PROJECT_VERSION}</version>
      </dependency>
    -->

Uncomment that dependency, then fill in the {DATA_MODEL_PROJECT_NAME} and {DATA_MODEL_PROJECT_VERSION} placeholders with schema_v0 and 1.0.0, respectively:

    <!--
      List data model artifacts that this compiler depends on,
      for input, for output or for both, as dependency here.
    -->
    <dependency>
      <groupId>com.example.nodecardinality</groupId>
      <artifactId>schema_v0_java</artifactId>
      <version>1.0.0</version>
    </dependency>

Then, add a dependency on com.here.schema.rib.topology-geometry_v2_java:

    <!--
      HERE Content Map Road Topology & Geometry layer Java bindings
    -->
    <dependency>
      <groupId>com.here.schema.rib</groupId>
      <artifactId>topology-geometry_v2_java</artifactId>
      <version>2.8.0</version>
    </dependency>

Processing Logic

Now implement the actual processing logic by editing the Java source files that the Maven archetypes created. In the processor/src/main/java/com/example/nodecardinality/processor folder there are four source files:

  1. Main.java: contains the main entry point of the processing application, as a subclass of PipelineRunner. The Driver is configured with a single DriverTask containing one Direct1ToNCompiler (implemented in Compiler.java).
  2. CompilerConfig.java: contains the compiler configuration, a class you may define to configure the business logic through the application.conf configuration file. However, the business logic does not need to expose any configuration parameters, thus the default implementation is sufficient.
  3. IntermediateData.java: defines the IntermediateData class used by the compiler defined in Compiler.java;
  4. Compiler.java: implements the actual processing logic as a Direct1ToNCompiler.

First, decide which compilation pattern and intermediate data to use for the task at hand. This quick start focuses on functional patterns. Compared to the RDD-based patterns, you get incremental compilation with no intervention, and you do not have to deal with Spark caveats such as partitioning, shuffling, or persistence.

The underlying Spark application is still interesting to better understand how the functional patterns work. All of the patterns implement different flavors of the following scheme:

  1. The input metadata is retrieved and an RDD of Key and Meta pairs is created. Key uniquely identifies a partition, and contains the catalog ID, the layer ID, and the partition name. Meta contains information about the payload of the partition, and can be used together with the corresponding Key to retrieve the content of a partition (payload).
  2. A CompileIn transformation is applied to the input RDD. The purpose of this step is to define the mapping between input and output partitions and to preprocess the input data into an intermediate representation that you define. In most compilation patterns, this step corresponds to a flatMap, where a compileInFn that returns a sequence of (Key, IntermediateData) pairs given a single (Key, Meta) pair is applied to all elements of the RDD. This is then followed by a groupBy transformation to group all intermediate representations with the same output key together.
  3. The resulting RDD of Key and Iterable<IntermediateData> pairs is then processed by applying a CompileOut transformation, where a Payload is produced for each Key from the grouped intermediate representations.

In this project, for each input partition of the HERE Map Content's topology-geometry layer, you create an output partition with the same Tile ID in the output catalog's node-cardinality layer. The mapping between input and output does not depend on the content of the input partitions; you just need the Tile ID that is part of the partition's Key. For this reason you can use a direct compiler.

You will implement a direct 1:1 compilation since each input partition is used to produce one output partition. This is a special case of 1:N compilation, and therefore you only need a Direct1ToNCompiler pattern.

You still have to decide what IntermediateData to use between CompileIn and CompileOut. Since you are performing a direct 1:1 compilation, you can implement the processing logic in CompileOut directly. That means you can forward the Key and Meta objects of the input partition from the CompileIn transformation to the CompileOut transformation and process the input data.

Notice that more complex IntermediateData classes containing a processed version of the input partition are usually necessary, especially when an input partition is used to compile multiple output partitions and you want to avoid processing the same data multiple times.

This is the IntermediateData class provided by the archetypes:


package com.example.nodecardinality.processor;

/**
 * Intermediate compilation result holder used for data exchange between
 * {@link Compiler#compileInFn(Pair)} and
 * {@link Compiler#compileOutFn(Key, Iterable)} stages.
 */
public class IntermediateData {
    // TODO: Configure your data definition, which is used during processing on the cluster.
    // For more information, see the Data Processing Library documentation
    // at https://here-tech.skawa.fun/cn/documentation.
    private String attribute1;
    private String attribute2;

    public IntermediateData(String attribute1, String attribute2) {
        this.attribute1 = attribute1;
        this.attribute2 = attribute2;
    }

    public String getAttribute1() {
        return attribute1;
    }

    public String getAttribute2() {
        return attribute2;
    }
}

Let's rewrite the implementation to wrap a Key and Meta pair:

package com.example.nodecardinality.processor;

import com.here.platform.data.processing.java.catalog.partition.Key;
import com.here.platform.data.processing.java.catalog.partition.Meta;

public class IntermediateData {
    private Key key;
    private Meta meta;

    IntermediateData(Key key, Meta meta) {
        this.key = key;
        this.meta = meta;
    }

    Key getKey() {
        return key;
    }

    Meta getMeta() {
        return meta;
    }
}

After that, open the processor/src/main/java/com/example/nodecardinality/processor/Compiler.java file.

You need to import the Java bindings for the HERE Map Content topology-geometry layer and for your output layer:

import com.example.nodecardinality.schema.v0.Schema;
import com.here.platform.pipeline.logging.java.ContextAwareLogger;
import com.here.schema.rib.v2.TopologyGeometryPartitionOuterClass;
import com.here.schema.rib.v2.TopologyGeometry;

Now, replace the placeholders for the input and output layers:

    /**
     * Input catalog ID
     * The example code below uses only one input catalog and one input layer.
     * If you need to use more than one input catalog and/or one input layer, then
     * you need to add additional definitions here and handle these definitions in
     * your retriever and Map implementations below.
     * private Map<String, Retriever> catalogNameToRetrieverMap = new HashMap<>();
     * catalogNameToRetrieverMap.put(IN_CATALOG, retriever);
     */
    // TODO: Specify the name of your input catalog, use the same symbolic ID as that
    // specified in the pipeline-config.conf file.
    final private String IN_CATALOG = "YOUR_INPUT_CATALOG_NAME";

    /** Name of input layer */
    // TODO: Specify the name of your input layer, use the same symbolic ID as that
    // used for the layer in the catalog on the platform portal.
    final private String IN_LAYER = "your-input-layer";

    /** Name of output layer */
    // TODO: Specify the name of your output layer, use the same symbolic ID as you
    // used when you created the layer.
    final private String OUT_LAYER = "your-output-layer";

For IN_CATALOG, use the symbolic ID configured in pipeline-config.conf (rib). The IN_LAYER is topology-geometry and the OUT_LAYER is node-cardinality:

/**
 * Input catalog ID The example code below uses only one input catalog and one input layer. If you
 * need to use more than one input catalog and/or one input layer, then you need to add additional
 * definitions here and handle these definitions in your retriever and Map implementations below.
 * private Map<String, Retriever> catalogNameToRetrieverMap = new HashMap<>();
 * catalogNameToRetrieverMap.put(IN_CATALOG, retriever);
 */
// TODO: Specify the name of your input catalog, use the same symbolic ID as that
// specified in the pipeline-config.conf file.
private final String IN_CATALOG = "rib";

/** Name of input layer */
// TODO: Specify the name of your input layer, use the same symbolic ID as that
// used for the layer in the catalog on the platform portal.
private final String IN_LAYER = "topology-geometry";

/** Name of output layer */
// TODO: Specify the name of your output layer, use the same symbolic ID as you
// used when you created the layer.
private final String OUT_LAYER = "node-cardinality";

In a direct compiler, the CompileIn function is split into:

  • a mappingFn function that returns a sequence of output Keys given an input Key
  • a compileInFn function that returns an IntermediateData object given an input Key and Meta

The mapping established by mappingFn is used to send the IntermediateData object to the corresponding output Key, which is then compiled in the compileOutFn function.

Search for the mapping function:

    public java.lang.Iterable<Key> mappingFn(Key inKey) {
        List<Key> outputs = new ArrayList<>();

        // TODO: Add code that defines the mapping. Uncomment the line below to add a 1:1 mapping.
    //        outputs.add(new Key(Default.OutCatalogId(), OUT_LAYER, inKey.partition()));
        throw new RuntimeException("NOT Implemented");
        // return outputs;
    }

Each input Key has to be mapped to an output Key with the same partition name, a catalog ID equal to the output catalog (Default.OutCatalogId()), and a layer ID equal to the output layer (OUT_LAYER).

First, import the Default object:

import com.here.platform.data.processing.java.driver.Default;

Then implement mappingFn as follows:

public Iterable<Key> mappingFn(Key inKey) {
  List<Key> outputs = new ArrayList<>();

  outputs.add(inKey
                .withCatalog(Default.OutCatalogId())
                .withLayer(OUT_LAYER));
  return outputs;
}

In compileInFn, simply return an IntermediateData built out of the Key and Meta of the input partition.

Replace the existing compileInFn method with the following:

public IntermediateData compileInFn(Pair<Key, Meta> in) {
  return new IntermediateData(in.getKey(), in.getValue());
}

Then replace compileOutFn with the following:

public Optional<Payload> compileOutFn(Key outKey, IntermediateData intermediate) {
  Payload payload = retriever.getPayload(intermediate.getKey(), intermediate.getMeta());
  try {
    TopologyGeometryPartitionOuterClass.TopologyGeometryPartition partition =
        TopologyGeometryPartitionOuterClass.TopologyGeometryPartition.parseFrom(
            payload.content());

    Schema.NodeCardinalityPartition.Builder builder =
        Schema.NodeCardinalityPartition.newBuilder();

    // in the output partition node cardinalities will have the same order as the input
    // nodes in HERE Map Content.
    for (TopologyGeometry.Node node : partition.getNodeList()) {
      builder.addNodeCardinality(
          Schema.NodeCardinality.newBuilder()
              .setId(node.getIdentifier())
              .setCardinality(node.getSegmentRefCount())
              .build());
    }

    Schema.NodeCardinalityPartition outputPartition = builder.build();

    return Optional.of(new Payload(outputPartition.toByteArray()));

  } catch (InvalidProtocolBufferException ex) {
    throw new RuntimeException(ex);
  }
}

Here, the return values of mappingFn and compileInFn from above are passed compileOutFn as the OutKey of the output partition and the IntermediateData containing the InKey and InMeta pair from the corresponding input partition.

You retrieve the payload of the input partition using the retriever object that is initialized when the Compiler object is constructed:

Payload payload = retriever.getPayload(intermediate.getKey(), intermediate.getMeta());

Decode the corresponding topology partition using the Java bindings of the HERE Map Content's topology-geometry layer:

TopologyGeometryPartitionOuterClass.TopologyGeometryPartition partition =
    TopologyGeometryPartitionOuterClass.TopologyGeometryPartition.parseFrom(
        payload.content());

Next, create the corresponding output partition using the Java bindings from the Schema project. For each Protocol Buffer message, in this case Schema.NodeCardinalityPartition, you get a builder (newBuilder) that provides interfaces to set all fields in the message:

Schema.NodeCardinalityPartition.Builder builder =
    Schema.NodeCardinalityPartition.newBuilder();

You only have a repeated field node_cardinality, for which an addNodeCardinality method is automatically generated to add elements to the sequence. For each node in the input partition (partition.getNodeList()), use that method to add a NodeCardinality object with the node's ID (node.getIdentifier()) and its cardinality (node.getSegmentRefCount()). The NodeCardinality object is once again constructed with a builder:

// in the output partition node cardinalities will have the same order as the input
// nodes in RIB.
for (TopologyGeometry.Node node : partition.getNodeList()) {
  builder.addNodeCardinality(
      Schema.NodeCardinality.newBuilder()
          .setId(node.getIdentifier())
          .setCardinality(node.getSegmentRefCount())
          .build());
}

Next, use the build method to create a new Schema.NodeCardinalityPartition:

Schema.NodeCardinalityPartition outputPartition = builder.build();

To publish the data, you serialize the output partition to a byte array (outputPartition.toByteArray()), parse a Payload object from it, and return the optional payload. If you don't want a specific output partition in the output catalog, you can use an empty Java.util.Optional object. But in this case, you want to publish an output partition for each input partition available:

return Optional.of(new Payload(outputPartition.toByteArray()));

Now you can build the entire project from the top-level folder:

$ pwd
~/projects/nodecardinality

$ mvn install

Run The Processor Locally

Processing a global catalog like the HERE Map Content can be time consuming. However, you can limit the number of partitions to process during local development by adding one or more partition filters to the application.conf file.

In this case you use a BoundingBoxFilter to process only the partitions inside a bounding box containing the city of Beijing. Open the processor/src/main/resources/application.conf file and append this partition filter configuration:


here.platform.data-processing.executors.partitionKeyFilters = [
  {
    className = "BoundingBoxFilter"
    param = {
      boundingBox {
        // Beijing
        north = 40.158941
        south = 39.7704
        east = 116.60867
        west = 116.170955
      }
    }
  }
]

Make sure to rerun mvn install after making this change. From the processor module folder, you can then run the compilation job using the configuration files set up above:

$ pwd
~/projects/nodecardinality/processor

$ mvn exec:java -Dexec.mainClass=com.example.nodecardinality.processor.Main \
                -Dpipeline-config.file=config/pipeline-config.conf          \
                -Dpipeline-job.file=config/pipeline-job.conf                \
                -Dexec.args="--master local[*]"

The Maven exec plugin is used with the main class set to com.example.nodecardinality.processor.Main and configuration files config/pipeline-config.conf and config/pipeline-job.conf. The PipelineRunner main method accepts an optional --masterspark-master command line argument to set the master URL for the cluster. Use local[*] to run Spark locally.

Inspect The Catalog

To inspect the new catalog, do the following:

  1. Log into the HERE platform. Search for the catalog from the Data tab and select it to switch to the catalog view.
  2. Select the node-cardinality layer and select the Inspect tab.
  3. Set the zoom level to 8 and search the map for Beijing.
  4. The output partitions are highlighted on the map, covering the bounding box of Beijing you specified.

Decode Partitions

To decode the output partitions, you need to configure the layer to use your custom schema. First, upload your schema to the platform. From the nodecardinality/schema/ folder, run mvn deploy:

$ pwd
~/projects/nodecardinality/schema

$ mvn deploy

A schema with the same group ID, artifact ID, and version can be only deployed once. To avoid collisions, ensure that your group ID (com.example.nodecardinality in this example) is unique.

In the Data view of the portal, click Browse schemas to display a list of deployed schemas your user can access, and make sure your schema schema_v0 is there.

From the node-cardinality layer view of the portal, click More and then select Reconfigure layer to access the layer configuration page. You have now the possibility to change most of the parameters you set during the creation of the layer, including the schema configuration. Locate the Schema configuration, select schema_v0 from the top down menu and then click Save at the bottom of the page.

From the Inspect tab, you can now select any partition. The decoded partition is displayed on the panel to the right.

Run The Processor as the Pipeline on the HERE Platform

Using the Pipeline API, you can run the batch pipeline you just created on a cluster. You can deploy your pipeline in one of two ways:

  • scheduled pipeline, that automatically runs every time there is a new version of an input catalog
  • run-once pipeline, that uses a pipeline-job.conf configuration you provide.

To use the Pipeline API, your App ID must belong to a Group. Your administrator must set up a Group ID in your account and assign your application to the group. Finally, you must grant that Group ID Read and Write access to the output catalog.

To deploy the processor to the HERE Workspace pipeline, you need to package it into a fat JAR. The pom.xml file generated by the Archetypes contains a platform profile for this purpose:

$ pwd
~/projects/nodecardinality/processor

$ mvn -Pplatform package

The above command creates a fat JAR of the processor as processor/target/processor-1.0-SNAPSHOT-platform.jar.

You must use this file to create a Pipeline Template.

Refer to the Pipelines Developer's Guide for detailed instructions on deploying and running pipelines from the platform.

Additionally, you can deploy and run a pipeline with the OLP CLI. Refer to the OLP CLI User Guide for details.

results matching ""

    No results matching ""