Maven の原型を使用したバッチ パイプラインの構築 (Vaden Archetypes) ( Java 英語 )

データ プロセッシング ライブラリを使用してバッチ パイプラインを構築するには、 SDK の Maven 原型を使用してプロジェクトのスケルトンを作成します。 HERE platform ポータル は、資格情報の管理、カタログの作成、およびアクセス権の管理に使用されます。 SDK のアーカイブタイプを使用して、プロジェクトのスケルトンを作成します。

この例では、 HERE Map Content カタログの道路トポロジとジオメトリレイヤーを読み取り、各入力パーティションのすべてのトポロジノードのセグメント参照数 ( カーディナリティ ) を書き込むパイプラインを作成する方法を示します。

資格情報

必要な資格情報には、次の 2 種類があります。

  1. プラットフォーム資格情報は、プラットフォーム API へのアクセスを提供します。 まず 、 https://platform.here.com/profile/access-credentials で新しいアプリケーションを作成します。 アプリケーションが作成されたら 、 [ キーを作成 ] をクリックして資格情報をダウンロードします。 既定 $HOME/.here/credentials.properties では、データ プロセッシング ライブラリはファイル内の資格情報を探します。 資格情報ファイルがこの場所に配置されていることを確認してください。
  2. Repository 資格情報を使用すると、データ プロセッシング ライブラリが存在するリポジトリにアクセスできます。 https://platform.here.com/profile/repository にアクセスし、 Generate 資格情報をクリックします)。 settings.xml ファイルがダウンロードされます。 このファイルを $HOME/.m2/ フォルダにコピーします。

出力カタログを作成します

まず、パイプラインの出力カタログとして機能する新しいカタログを作成します。 カタログには、道路トポロジおよびジオメトリレイヤーの各パーティションに、そのパーティションのトポロジノードの基数を含むパーティションが含まれている 1 つのレイヤーがあります。 stateデータ プロセッシング ライブラリ用に予約されている追加のレイヤーも 1 つ必要です。

HERE platform にログインします。 [ データ ] タブを選択し、次の操作を行います。

  1. [ 新しいカタログを追加] をクリックします。
  2. バッチ処理クイックスタートユーザー名など、カタログのカタログ名カタログ IDを指定します。
  3. 次に 、カタログの概要連絡先の電子メールを追加します。
  4. [ 保存 ] をクリックし、 Data API が新しいカタログを作成するまで待ちます。

次に、次のように、アプリケーションにカタログへの読み取り / 書き込みアクセス権を付与します。

  1. [ データを検索 ] ボックスでカタログの名前を検索して、カタログを選択する。
  2. [ 共有 ] に移動し、 [ 共有の管理 ] でアプリとカタログを共有を選択します。 アプリケーション ID を入力し 、 [ 許可 ] をクリック して 、 [ 読み取り書き込み] をオンにします。
  3. 最後に、 [ 許可 ] をクリックして変更を有効にします。

カタログにレイヤーを追加します。

  1. [ 新しいレイヤーを追加 ] をクリックし、node-cardinality ID としてレイヤーを作成します。 レイヤーの名前としてもnode-cardinalityを使用できます。また、人間が読める別の名前を選択することもできます。
  2. HERE Tile レイヤーが必要です。ズーム レベルは、入力した道路トポロジおよびジオメトリレイヤー 12 と同じである必要があります。 [ レイヤータイプ] で [ バージョン管理 ] を選択します。このオプションは、バッチ パイプラインで処理されるすべてのレイヤーで使用する必要があります。
  3. Protobuf を使用してパーティションをエンコードできるように、 application/x-protobuf のデフォルト Content Type を使用します。 スキーマ フィールドをに設定したままに Noneします。
  4. 次に、 [ 保存 ] をクリックしてレイヤーの作成を完了します。
  5. 2 つ目のレイヤーに進み state、次の表の 2 行目に従って設定します。この表には、カタログ内のすべてのレイヤーの設定が一覧表示されます。
レイヤー ID パーティション分割 ズーム レベル レイヤータイプ コンテンツタイプ スキーマ
ノードカーディナリティ HERE Tile 12 バージョン付き application/x-protobuf なし
都道府県 汎用 該当なし バージョン付き application/octet-stream なし

カタログが完全に設定されました。 プロジェクトの作成に進みます。

プロジェクトを作成する

Data SDK には、新しいバッチパイプラインを作成するプロセスを簡素化するための Maven 原型が含まれています。 Maven の原型を使用して、いくつかのシェルコマンドを使用して完全なプロジェクト構造を構築できます。 原型によって、すべての基本的な依存関係、サンプル設定ファイル、および独自のロジックを実装するために編集できるソースファイルを含む POM ファイルが自動的に生成されます。 少なくとも 3 つのプロジェクトを作成する必要があります。

  • すべてのサブプロジェクトを 1 つの POM ファイルでコンパイルするための、便利なトップレベルのプロジェクト
  • プロトコルバッファスキーマの Java/Scala バインディングを構築するための、ネストされたスキーマプロジェクト
  • 処理ロジックを構築するためのプロセスプロジェクト

次の手順では、 Maven がインストールされ、 mvn 実行可能ファイルが PATH 変数に含まれていることを前提としています。 bash 以下のすべてのコマンドをシェルから実行する必要があります。 tree このコマンドは、フォルダ構造を表示するために使用します。 または ls -R 、を置き換えとして使用することもできます。

まず nodecardinality 、次のコマンドを実行してという名前のトップレベルのプロジェクトを作成し、を押し ENTER て確認します。

$ pwd
~/projects

$ mvn archetype:generate -DarchetypeGroupId=org.codehaus.mojo.archetypes \
                         -DarchetypeArtifactId=pom-root \
                         -DarchetypeVersion=1.1 \
                         -DgroupId=com.example \
                         -DartifactId=nodecardinality \
                         -Dversion=1.0.0 \
                         -Dpackage=com.example.nodecardinality

nodecardinality 現在のディレクトリに次のファイルを含むフォルダが作成されます。

$ pwd
~/projects

$ tree
.
`-- nodecardinality
    `-- pom.xml

1 directory, 1 file

サブプロジェクトはこのフォルダ内から作成されます。 nodecardinality サブプロジェクトを作成するフォルダに移動します。 まず、次のコマンドを実行してモデルプロジェクトを作成し、を押し ENTER て確認します。

$ pwd
~/projects/nodecardinality

$ mvn archetype:generate -DarchetypeGroupId=com.here.platform.schema \
                         -DarchetypeArtifactId=project_archetype \
                         -DarchetypeVersion=X.Y.Z \
                         -DgroupId=com.example.nodecardinality \
                         -DartifactId=schema \
                         -Dversion=1.0.0 \
                         -Dpackage=com.example.nodecardinality.schema \
                         -DmajorVersion=0

SDK に含まれている最新バージョンの原型に関する特定のドキュメントについては、『 Archetypes 開発者ガイド』を参照してください。

nodecardinality/schema これにより、出力カタログのスキーマを構築するプロジェクトが含まれているフォルダにプロジェクトテンプレートが作成されます。

$ pwd
~/projects/nodecardinality

$ tree
.
|-- pom.xml
`-- schema
    |-- ds
    |   |-- pom.xml
    |   `-- src
    |       |-- assembly
    |       |   `-- proto.xml
    |       `-- main
    |           `-- resources
    |               |-- ResourcesReadMe.txt
    |               `-- renderers
    |                   `-- ReadMe.txt
    |-- java
    |   `-- pom.xml
    |-- pom.xml
    |-- proto
    |   |-- pom.xml
    |   `-- src
    |       |-- assembly
    |       |   `-- proto.xml
    |       `-- main
    |           |-- proto
    |           |   `-- com
    |           |       `-- example
    |           |           `-- nodecardinality
    |           |               `-- schema
    |           |                   `-- v0
    |           |                       `-- schema.proto
    |           `-- resources
    |               `-- description.md
    |-- scala
    |   `-- pom.xml
    `-- schema.yml

20 directories, 13 files

最後に nodecardinality 、フォルダ内で次のコマンドを実行して、を含むプロセッサテンプレートを作成 Direct1ToNCompilerします。を押し ENTER て確認します。

$ pwd
~/projects/nodecardinality

$ mvn archetype:generate -DarchetypeGroupId=com.here.platform \
                         -DarchetypeArtifactId=batch-direct1ton-java-archetype \
                         -DarchetypeVersion=X.Y.Z \
                         -DgroupId=com.example.nodecardinality \
                         -DartifactId=processor \
                         -Dversion=1.0.0 \
                         -Dpackage=com.example.nodecardinality.processor

SDK に含まれている最新バージョンの原型に関する特定のドキュメントについては、『 Archetypes 開発者ガイド』を参照してください。

これで、 nodecardinality プロジェクトに追加の processor フォルダーが追加されました。

$ pwd
~/projects/nodecardinality

$ tree
.
|-- pom.xml
|-- processor
|   |-- config
|   |   |-- pipeline-config.conf
|   |   `-- pipeline-job.conf
|   |-- pom.xml
|   `-- src
|       `-- main
|           |-- java
|           |   `-- com
|           |       `-- example
|           |           `-- nodecardinality
|           |               `-- processor
|           |                   |-- Compiler.java
|           |                   |-- CompilerConfig.java
|           |                   |-- IntermediateData.java
|           |                   `-- Main.java
|           `-- resources
|               |-- application.conf
|               `-- log4j.properties
`-- schema
    |-- ds
    |   |-- pom.xml
    |   `-- src
    |       |-- assembly
    |       |   `-- proto.xml
    |       `-- main
    |           `-- resources
    |               |-- ResourcesReadMe.txt
    |               `-- renderers
    |                   `-- ReadMe.txt
    |-- java
    |   `-- pom.xml
    |-- pom.xml
    |-- proto
    |   |-- pom.xml
    |   `-- src
    |       |-- assembly
    |       |   `-- proto.xml
    |       `-- main
    |           |-- proto
    |           |   `-- com
    |           |       `-- example
    |           |           `-- nodecardinality
    |           |               `-- schema
    |           |                   `-- v0
    |           |                       `-- schema.proto
    |           `-- resources
    |               `-- description.md
    |-- scala
    |   `-- pom.xml
    `-- schema.yml

30 directories, 22 files

スキーマサブプロジェクト

このnodecardinality/schemaフォルダには、 Scala および Java のライブラリ ( 通常はバインディングと呼ばれます ) をビルドし、 Protos としてエンコードされたパーティションをデシリアライズ / シリアライズする Maven プロジェクトのスケルトンが含まれています。 これは、出力レイヤーのパーティションを node-cardinality Protobuf としてエンコードし、カスタムパーティションスキーマを指定するために必要です。

プロジェクトフォルダには、次のコンポーネントがあります。

  • pom.xmlプロジェクトのコンパイルに使用されるメインの POM ファイルです
  • java プロトコルバッファの Java バインディングを構築するための POM ファイルを含むフォルダ
  • scala プロトコルバッファの Scala バインディングを構築するための POM ファイルを含むフォルダ
  • ds 結果のバインディングおよび Protobuf 定義を ZIP ファイルにバンドルするサブプロジェクトを含むフォルダ。このフォルダをプラットフォーム Artifactory リポジトリに公開して、プラットフォームポータルからパーティションをデコードできます
  • proto Protobuf 定義を含むフォルダ。 出力スキーマを指定するには、このサブプロジェクトをカスタマイズする必要があります。

カスタム Protobuf スキーマを作成するには、 nodecardinality/schema/proto/src フォルダに .proto ファイルを追加します。 Protobuf の詳細については、プロトコルバッファのドキュメントを参照してください。

作成したスケルトンプロジェクトには、 .proto 出力パーティションのスキーマをすばやく定義するために編集できるファイルがすでに含まれています。

nodecardinality/schema/proto/src/main/proto/com/example/nodecardinality/schema/v0/schema.proto ファイルを開き、メインメッセージ定義を検索します。

syntax = "proto3";

package com.example.nodecardinality.schema.v0;

// Any dependent resources should be declared in the main pom, and the files referenced here:
//import "com/company/dependentGroupId/filename.proto";

// MainProtobufMessage is a placeholder, must match package/messagename in the mainMessage tag of the layer-manifest-plugin in the schema_ds module.
message MainProtobufMessage {
    int32 lat = 1;
    int32 lon = 2;
}

メインメッセージの名前を MainProtobufMessage から NodeCardinalityPartition に変更し、サンプルフィールド lat lon を削除して、 node_cardinality type NodeCardinality という名前の repeated フィールドを追加します。

次に、ノード NodeCardinality の ID (id)とノードのカーディナリティ(cardinality)の 2 つのフィールドを使用して、補助メッセージタイプを定義します。 この新しい Protobuf 定義は次のようになります。

syntax = "proto3";

package com.example.nodecardinality.schema.v0;

// Any dependent resources should be declared in the main pom, and the files referenced here:
//import "com/company/dependentGroupId/filename.proto";

message NodeCardinalityPartition {
  repeated NodeCardinality node_cardinality = 1;
}

message NodeCardinality {
  string id = 1;
  uint32 cardinality = 2;
}

メインメッセージの名前を変更したため、スキーマバンドルの作成に使用した設定を更新してください。 ds サブプロジェクトの POM ファイル () を開きnodecardinality/schema/ds/pom.xml、次の設定を探します layer-manifest-plugin

      <!-- build the layer manifest file -->
      <plugin>
        <groupId>com.here.platform.schema.maven_plugins</groupId>
        <artifactId>layer-manifest-plugin</artifactId>
        <version>${here.plugin.version}</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>write-manifest</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <!-- Message class used to de/serialize data held in a DataStore layer.
               NOTE: If changed here, must correspond to a message in Protobuf definitions. -->
          <mainMessage>com.example.nodecardinality.schema.v0.MainProtobufMessage</mainMessage>
          <inputDir>${project.build.directory}/proto</inputDir>
          <writeManifest>true</writeManifest>
        </configuration>
      </plugin>

mainMessage プラグイン設定でパスを変更します。置き換え com.example.nodecardinality.schema.v0.MainProtobufMessage られるのは com.example.nodecardinality.schema.v0.NodeCardinalityPartition次のとおりです。

      <!-- build the layer manifest file -->
      <plugin>
        <groupId>com.here.platform.schema.maven_plugins</groupId>
        <artifactId>layer-manifest-plugin</artifactId>
        <version>${here.plugin.version}</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>write-manifest</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <!-- Message class used to de/serialize data held in a DataStore layer.
               NOTE: If changed here, must correspond to a message in Protobuf definitions. -->
          <mainMessage>com.example.nodecardinality.schema.v0.NodeCardinalityPartition</mainMessage>
          <inputDir>${project.build.directory}/proto</inputDir>
          <writeManifest>true</writeManifest>
        </configuration>
      </plugin>

次に、スキーマプロジェクトを nodecardinality/schema フォルダーから mvn install 実行してコンパイルします。 または、トップレベルのプロジェクトからこのコマンドを実行します。

schema_v0_javaschema_v0_scala_${scala.compat.version} の 2 つのライブラリが構築されています。このライブラリは、 NodeCardinalityPartition オブジェクトの作成、 ByteArray へのシリアル化、 ByteArray からの逆シリアル化を行うための API を提供します。

出力パーティションを書き込むに schema_v0_java は、処理ロジックでを使用します。 ロードトポロジおよびジオメトリ層から入力パーティションを読み取るに com.here.schema.rib.topology-geometry_v2_java は、で提供されている対応する Java バインディングを使用します。

次のセクションでは、これらの依存関係をプロセッサのサブプロジェクトに追加します。

処理ロジック

nodecardinality/processor このフォルダには、バッチ処理パイプラインのスケルトンプロジェクトが含まれています。 このプロジェクトでは、最終的な処理アプリケーションを構築します。

プロジェクトフォルダには、次のコンポーネントが含まれています。

  • pom.xmlプロジェクトのコンパイルに使用されるメインの POM ファイル。
  • src ロジックを実装する Java ソースファイルを含むフォルダ。
  • config パイプライン API の外部でパイプラインをローカルに実行するために使用できる設定ファイルが含まれているフォルダ。

パイプラインの設定

バッチ処理ジョブを実行するには、データ プロセッシング ライブラリで次の条件を満たす必要があります。

  1. すべての入力カタログおよび出力カタログの HERE リソース名 (HRNS) 。
  2. バッチジョブで処理するすべての入力カタログのバージョン。

SCHEDULED状態のパイプラインの場合 ' この情報は 'HOCON 形式の 2 つのコンフィギュレーションファイルを介してパイプライン API によって自動的に提供されます

  1. pipeline-config.conf入力カタログおよび出力カタログの HERE リソースネーム を提供します。
  2. pipeline-job.conf処理する入力カタログのバージョンを提供します。

パイプラインがデプロイされると、 HERE Workspace パイプラインに最初のファイルがアップロードされます。コンパイルごとに変更されることはありません。 一方、ジョブ設定は、新しいジョブが実行されたと見なされたときに、パイプライン API によってオンザフライで作成されます。 たとえば、入力カタログの新しいバージョンが存在し、出力カタログを更新する必要があります。

ローカル開発中にパイプライン API を使用せずにバッチ パイプラインをローカルで実行する場合は、次の 2 つの Java システムプロパティを設定して、これらの設定ファイルを自分自身で提供する必要があります。

  1. pipeline-config.fileファイルへのパスが含まれています pipeline-config.conf
  2. pipeline-job.fileファイルへのパスが含まれています pipeline-job.conf

nodecardinality/processor/config このフォルダには、両方のファイルのテンプレートが含まれています。このテンプレートは編集してローカル開発に使用できます。

パイプライン設定のテンプレートファイル ()nodecardinality/processor/config/pipeline-config.confは次のようになります。


// This configuration file is provided to specify input and
// output catalogs for pipelines during local development.
// In addition, the CLI uses this file when uploading the pipeline
// to the platform to create the relevant configuration.
// For more information on the HERE platform, see the
// platform documentation at https://here-tech.skawa.fun/documentation.
//
// Do not include this file in the pipeline fat JAR you upload to the platform.
// When running your pipeline locally, use the command below.
//
// -Dpipeline-config.file=config/pipeline-config.conf

pipeline.config {

  output-catalog { hrn = "hrn:here:data:::myoutput" } // TODO: Specify the output catalog HRN.

  input-catalogs {
    // The following keys are symbolic IDs for catalogs
    // passed to the pipeline that the platform can use to
    // bind to and identify specific inputs. Use one line
    // for each input catalog in your project. Add and
    // delete lines as necessary.
    input-catalog-1 { hrn = "hrn:here:data:::myinput1" } // TODO: Specify the input catalog HRN.
    input-catalog-2 { hrn = "hrn:here:data:::myinput2" } // TODO: Specify the input catalog HRN.
  }
}

pipeline.config.output-catalog.hrn 出力カタログの HERE リソースネーム を示します。 作成したカタログの HERE リソースネーム を読み取るには、プラットフォームポータルでカタログを開きます。 カタログ ID が batch-processing-quickstart の場合、対応する HERE リソースネームは hrn:here:data:::batch-processing-quickstart です。

pipeline.config.input-catalogs 任意のシンボリック識別子によってインデックス化されたすべての入力カタログの HERE リソースネーム が含まれます。この ID は、ジョブ設定および処理ロジックで特定の入力カタログを識別するために使用されます。 テンプレートファイルには、識別子 input-catalog-1input-catalog-2 対応するサンプルの HRNS を含む 2 つの入力カタログが含まれています。 このプロジェクトでは、 HERE リソースネーム を使用する入力カタログ、 HERE Map Content は 1 hrn:here:data::olp-here:rib-2つだけです。 これらの 2 つのサンプル入力カタログを削除し、前述の HERE リソースネーム で追加して rib から、カタログ ID としてを選択します。


pipeline.config {

  output-catalog { hrn = "hrn:here:data:::batch-processing-quickstart" }

  input-catalogs {
    rib { hrn = "hrn:here:data::olp-here:rib-2" }
  }
}

nodecardinality/processor/config/pipeline-job.conf このファイルは、ジョブ設定のテンプレートです。 次のものが含まれています。


// This configuration file is provided to facilitate local development
// of pipelines with the HERE Data SDK and
// for use with pipelines that are not scheduled to run
// in reaction to changes in input catalogs.
// For more information on the HERE platform, see the
// platform documentation at https://here-tech.skawa.fun/documentation.
//
// Do not include this file in the pipeline fat JAR you upload to the platform.
// When running your pipeline locally, use the command below.
//
// -Dpipeline-job.file=config/pipeline-job.conf

pipeline.job.catalog-versions {

  input-catalogs {

    // The following keys are symbolic IDs for catalogs
    // passed to the pipeline that the platform can use to
    // bind to and identify specific inputs.

    // Specify only one processing-type/version pair per input catalog.
    // Add an entry in the appropriate object (for example,
    // input-catalog-1, input catalog-2, input-catalog-3, ...)
    // for each input catalog in your project.
    // For more information on the processing type options, see
    // Build a Batch Pipeline with Maven Archetypes in the
      // Data Processing Library documentation.

    // Use case: Ignore results of previous compilation and fully process catalogs.
    // TODO: If you want to use incremental processing, comment out these sections.
    input-catalog-1 {
      processing-type = "reprocess"
      version = 0 // TODO: Specify the version of the catalog to be processed.
    }
    input-catalog-2 {
      processing-type = "reprocess"
      version = 2 // TODO: Specify the version of the catalog to be processed.
    }

    // Use case: Process catalogs incrementally. The processing
    // types below are not recommended for local development. They are
    // mostly used by platform pipelines to enable incremental compilation
    // with the Data Processing Library.
    // TODO: Comment out the section above, remove the comments below, and
    // specify the versions.
    //  input-catalog-1 {
    //    processing-type = "no_changes"
    //    version = 0 // TODO: Specify the version of the catalog to be processed.
    //  }
    //  input-catalog-2 {
    //    processing-type = "changes"
    //    since-version = 1 // TODO: Specify the correct starting version of the changeset to be processed.
    //    version = 4 // TODO: Specify the ending version of the changeset to be processed.
    //  }
  }
}

で指定された入力カタログごと pipeline-config.confpipeline.job.catalog-versions.input-catalogs 、には次のものが含まれます。

pipeline.job.catalog-versions.input-catalogs.input-catalog-ID.processing-type 、 3 つの処理タイプを示す 3 つの異なる値を持つことができます。

  1. 再処理: このタイプは、カタログを完全に処理する必要があることを示します。前のコンパイルの結果は、処理されるデータ量の削減には使用されません。 手動で記述されたジョブ設定を処理する場合に使用する処理の最も簡単なタイプです。 これは、データ プロセッシング ライブラリの機能であるインクリメンタル・コンパイルを効果的に無効にします。この機能を使用すると、前のコンパイルの結果を使用して処理されるデータ量を削減できます。 このタイプの処理で version は、処理するカタログのを指定する必要があります。
  2. no_changes:このタイプは、出力カタログが最後にコンパイルされたときに使用されたカタログの同じバージョンを再利用することを示します。 このタイプの処理では、データ プロセッシング ライブラリはいくつかのコンパイルステップをスキップできます。 version 処理するカタログのを指定する必要があります。 有効な処理コンフィグレーションでは、バージョンが最後のコンパイルで使用されたバージョンと同じであることが必要です。 データ プロセッシング ライブラリは、処理が開始される前にこの条件が true であることを確認 します。true でない場合は、インクリメンタル・コンパイルが無効になります。
  3. 変更: このタイプはversion、最後のコンパイルで処理されたバージョン () に基づいて、カタログ () の新しいバージョンを処理することを示しsince-versionます。 このタイプの処理は、データ プロセッシング ライブラリによって処理の最適化に使用され、実際に再処理されるデータ量を削減できます。 処理設定は、最後のコンパイルで使用されたカタログのバージョンが実際に使用されている限り有効です since-version。バージョンが実際でない場合 、インクリメンタル コンパイルは無効になります。

no_changeschanges の両方 が、データ プロセッシング ライブラリで内部的に最適化を有効にするためにのみ使用されることを理解しておくことが重要です。 概念的には、任意の入力カタログについて、処理中のライブラリは常に特定のバージョンを完全に処理します。 このクイックスタートおよびローカル開発では 、再処理 の処理タイプのみに依存する必要があります。 パイプラインを展開した後、データ処理ライブラリの最適化機能を最大限に活用する有効なジョブ設定を提供することは、パイプライン API の義務です。

執筆時点 1で利用可能な HERE Map Content の最新バージョンはです。

processing-type1再処理version を使用して、 rib カタログを構成しましょう。

pipeline-job.conf ここでは、次の内容を確認します。

pipeline.job.catalog-versions {

  input-catalogs {

    rib {
      processing-type = "reprocess"
      version = 1
    }
  }
}

依存関係

SDK のアーカイブタイプは、 processor サブプロジェクトの pom.xml ファイルにあるすべての基本的な依存関係を提供します。 処理ロジック HERE で使用されるカスタムの依存関係を手動で追加する必要があります。 このプロジェクトでは、さらに 2 つの依存関係が必要です。

  • com.here.schema.rib.topology-geometry_v2_javaをクリックして、入力パーティションをデシリアライズします
  • com.example.nodecardinality.schema_v0_java出力パーティションをシリアル化するために作成した

nodecardinality/processor/pom.xml ファイルを開きます。 スキーマプロジェクトによって作成された Java バインディングのプレースホルダがすでに存在します。 検索するには DATA_MODEL_PROJECT_NAME 、ファイルでを検索します。

    <!--
      List data model artifacts that this compiler depends on,
      for input, for output or for both, as dependency here.
    -->
    <!--
      <dependency>
        <groupId>com.example.nodecardinality</groupId>
        <artifactId>{DATA_MODEL_PROJECT_NAME}_java</artifactId>
        <version>{DATA_MODEL_PROJECT_VERSION}</version>
      </dependency>
    -->

その依存関係をコメント解除し、 {DATA_MODEL_PROJECT_NAME} プレースホルダーと {DATA_MODEL_PROJECT_VERSION} プレースホルダーにそれぞれ schema_v0 AND 1.0.0 を入力します。

    <!--
      List data model artifacts that this compiler depends on,
      for input, for output or for both, as dependency here.
    -->
    <dependency>
      <groupId>com.example.nodecardinality</groupId>
      <artifactId>schema_v0_java</artifactId>
      <version>1.0.0</version>
    </dependency>

次に、次の項目に依存関係を追加します com.here.schema.rib.topology-geometry_v2_java

    <!--
      HERE Content Map Road Topology & Geometry layer Java bindings
    -->
    <dependency>
      <groupId>com.here.schema.rib</groupId>
      <artifactId>topology-geometry_v2_java</artifactId>
      <version>2.8.0</version>
    </dependency>

処理ロジック

ここで 'Maven の原型が作成した Java ソースファイルを編集して ' 実際の処理ロジックを実装します processor/src/main/java/com/example/nodecardinality/processor このフォルダには、 4 つのソースファイルがあります。

  1. Main.java: のサブクラスとして、処理アプリケーションのメインエントリポイントを含み PipelineRunnerます。 Driver は、 1 つ Direct1ToNCompiler を含む 1 つの DriverTask を使用して設定されます( Compiler.java で実装)。
  2. CompilerConfig.java: コンパイラーの構成が含ま application.conf れています。このクラスは、構成ファイルを使用してビジネスロジックを構成するために定義できます。 ただし、ビジネスロジックで設定パラメータを公開する必要はありません。そのため、既定の実装で十分です。
  3. IntermediateData.java: IntermediateData で定義されているコンパイラーが使用するクラスを定義 Compiler.javaします。
  4. Compiler.java: 実際の処理ロジックをとして実装 Direct1ToNCompilerします。

まず、タスクに使用するコンパイルパターンと中間データを決定します。 このクイックスタートでは、ファンクショナルパターンを中心に説明します。 RDD ベースのパターンと比較すると、介入なしでインクリメンタルコンパイルを実行でき、パーティション分割、シャフリング、持続性などのSpark警告に対処する必要はありません。

機能 Spark 的なパターンの仕組みをより的確に把握するために、基盤となるアプリケーションが関心を持っています。 すべてのパターンで、次のスキームの異なるフレーバーが実装されます。

  1. 入力メタデータが取得され、 KeyMeta のペアの RDD が作成されます。 Key パーティションを一意に識別し、カタログ ID 、レイヤー ID 、およびパーティション名を含みます。 Meta に、パーティションのペイロードに関する情報を示します。 およびを、対応する Key とともに使用して、パーティション(ペイロード)のコンテンツを取得できます。
  2. CompileIn 変換が入力RDDに適用されます。 このステップの目的は、入力パーティションと出力パーティションの間のマッピングを定義し、入力データを定義した中間リプレゼンテーションにプリプロセスすることです。 ほとんどのコンパイルパターンでは、このステップは flatMap に対応しています。ここ compileInFn では、 1 つの (Key, Meta) ペアを返す (Key, IntermediateData)RDD のすべての要素に適用されます。 その後 groupBy 、同じ出力キーを持つすべての中間リプレゼンテーションをグループ化する変換が行われます。
  3. KeyAND Iterable<IntermediateData> ペアの結果RDDは 、 CompileOut 変換を適用して処理 されます。この変換では、Payloadがグループ化された中間リプレゼンテーションからそれぞれKeyに作成されます。

このプロジェクトでは、 HERE マップコンテンツの topology-geometry レイヤーの各入力パーティションについて、出力カタログの node-cardinality レイヤーに同じタイル ID を持つ出力パーティションを作成します。 入力と出力の間のマッピングは、入力パーティションの内容には依存 しません。パーティションのKey一部であるタイル ID のみが必要です。 このため 、直接コンパイラーを使用できます。

各入力パーティションが 1 つの出力パーティションを生成するために使用されるため、直接1:1コンパイルを実装します。 1:N これはコンパイルの特殊なケースで Direct1ToNCompiler あるため、必要なのはパターンのみです。

CompileInCompileOut の間で使用するIntermediateDataを決定する必要があります。 直接 1:1 コンパイルを実行するの 、 CompileOut に処理ロジックを直接実装できます。 つまり、入力パーティションの Key オブジェクトおよび Meta オブジェクトを CompileIn 変換から CompileOut 変換に転送し、入力データを処理できます。

IntermediateData 処理済みバージョンの入力パーティションを含むより複雑なクラスが必要になることに注意してください。特に、入力パーティションを使用して複数の出力パーティションをコンパイルし、同じデータを複数回処理しないようにする場合に注意してください。

IntermediateData 次のクラスは、原型によって提供されています。


package com.example.nodecardinality.processor;

/**
 * Intermediate compilation result holder used for data exchange between
 * {@link Compiler#compileInFn(Pair)} and
 * {@link Compiler#compileOutFn(Key, Iterable)} stages.
 */
public class IntermediateData {
    // TODO: Configure your data definition, which is used during processing on the cluster.
    // For more information, see the Data Processing Library documentation
    // at https://here-tech.skawa.fun/documentation.
    private String attribute1;
    private String attribute2;

    public IntermediateData(String attribute1, String attribute2) {
        this.attribute1 = attribute1;
        this.attribute2 = attribute2;
    }

    public String getAttribute1() {
        return attribute1;
    }

    public String getAttribute2() {
        return attribute2;
    }
}

KeyMeta のペアをラップするように実装を書き換えてみましょう。

package com.example.nodecardinality.processor;

import com.here.platform.data.processing.java.catalog.partition.Key;
import com.here.platform.data.processing.java.catalog.partition.Meta;

public class IntermediateData {
    private Key key;
    private Meta meta;

    IntermediateData(Key key, Meta meta) {
        this.key = key;
        this.meta = meta;
    }

    Key getKey() {
        return key;
    }

    Meta getMeta() {
        return meta;
    }
}

その後 processor/src/main/java/com/example/nodecardinality/processor/Compiler.java 、ファイルを開きます。

HERE Map Content topology-geometry レイヤーおよび出力レイヤーの Java バインディングをインポートする必要があります。

import com.example.nodecardinality.schema.v0.Schema;
import com.here.platform.pipeline.logging.java.ContextAwareLogger;
import com.here.schema.rib.v2.TopologyGeometryPartitionOuterClass;
import com.here.schema.rib.v2.TopologyGeometry;

ここで、入力レイヤーおよび出力レイヤーのプレースホルダーを置き換えます。

    /**
     * Input catalog ID
     * The example code below uses only one input catalog and one input layer.
     * If you need to use more than one input catalog and/or one input layer, then
     * you need to add additional definitions here and handle these definitions in
     * your retriever and Map implementations below.
     * private Map<String, Retriever> catalogNameToRetrieverMap = new HashMap<>();
     * catalogNameToRetrieverMap.put(IN_CATALOG, retriever);
     */
    // TODO: Specify the name of your input catalog, use the same symbolic ID as that
    // specified in the pipeline-config.conf file.
    final private String IN_CATALOG = "YOUR_INPUT_CATALOG_NAME";

    /** Name of input layer */
    // TODO: Specify the name of your input layer, use the same symbolic ID as that
    // used for the layer in the catalog on the platform portal.
    final private String IN_LAYER = "your-input-layer";

    /** Name of output layer */
    // TODO: Specify the name of your output layer, use the same symbolic ID as you
    // used when you created the layer.
    final private String OUT_LAYER = "your-output-layer";

IN_CATALOGpipeline-config.conf 、 () で設定したシンボリック ID を使用ribします。 IN_LAYER IS topology-geometry および OUT_LAYER IS : node-cardinality

/**
 * Input catalog ID The example code below uses only one input catalog and one input layer. If you
 * need to use more than one input catalog and/or one input layer, then you need to add additional
 * definitions here and handle these definitions in your retriever and Map implementations below.
 * private Map<String, Retriever> catalogNameToRetrieverMap = new HashMap<>();
 * catalogNameToRetrieverMap.put(IN_CATALOG, retriever);
 */
// TODO: Specify the name of your input catalog, use the same symbolic ID as that
// specified in the pipeline-config.conf file.
private final String IN_CATALOG = "rib";

/** Name of input layer */
// TODO: Specify the name of your input layer, use the same symbolic ID as that
// used for the layer in the catalog on the platform portal.
private final String IN_LAYER = "topology-geometry";

/** Name of output layer */
// TODO: Specify the name of your output layer, use the same symbolic ID as you
// used when you created the layer.
private final String OUT_LAYER = "node-cardinality";

直接コンパイラーでは、 CompileIn 関数は次のように分割されます。

  • 入力 Key された出力 Key のシーケンスを返す関数 mappingFn です
  • 入力 Key および Meta を指定して IntermediateData オブジェクトを返す関数 compileInFn

mappingFnによって確立されたマッピング は 、対応する出力KeyIntermediateDataオブジェクトを送信するために使用 され、その後compileOutFn関数でコンパイルされます。

マッピング機能を検索します。

    public java.lang.Iterable<Key> mappingFn(Key inKey) {
        List<Key> outputs = new ArrayList<>();

        // TODO: Add code that defines the mapping. Uncomment the line below to add a 1:1 mapping.
    //        outputs.add(new Key(Default.OutCatalogId(), OUT_LAYER, inKey.partition()));
        throw new RuntimeException("NOT Implemented");
        // return outputs;
    }

各入力 KeyKey 、同じパーティション名、出力カタログ () と同じカタログ IDDefault.OutCatalogId()、および出力レイヤー () と同じレイヤー ID を持つ出力にマップする必要OUT_LAYERがあります。

まず Default 、オブジェクトをインポートします。

import com.here.platform.data.processing.java.driver.Default;

次に mappingFn 、次のように実装します。

public Iterable<Key> mappingFn(Key inKey) {
  List<Key> outputs = new ArrayList<>();

  outputs.add(inKey
                .withCatalog(Default.OutCatalogId())
                .withLayer(OUT_LAYER));
  return outputs;
}

compileInFn では、入力パーティションの Key および Meta からビルドされた IntermediateData を戻すだけです。

既存 compileInFn のメソッドを次のメソッドに置き換えます。

public IntermediateData compileInFn(Pair<Key, Meta> in) {
  return new IntermediateData(in.getKey(), in.getValue());
}

次に compileOutFn 、を次のように置き換えます。

public Optional<Payload> compileOutFn(Key outKey, IntermediateData intermediate) {
  Payload payload = retriever.getPayload(intermediate.getKey(), intermediate.getMeta());
  try {
    TopologyGeometryPartitionOuterClass.TopologyGeometryPartition partition =
        TopologyGeometryPartitionOuterClass.TopologyGeometryPartition.parseFrom(
            payload.content());

    Schema.NodeCardinalityPartition.Builder builder =
        Schema.NodeCardinalityPartition.newBuilder();

    // in the output partition node cardinalities will have the same order as the input
    // nodes in HERE Map Content.
    for (TopologyGeometry.Node node : partition.getNodeList()) {
      builder.addNodeCardinality(
          Schema.NodeCardinality.newBuilder()
              .setId(node.getIdentifier())
              .setCardinality(node.getSegmentRefCount())
              .build());
    }

    Schema.NodeCardinalityPartition outputPartition = builder.build();

    return Optional.of(new Payload(outputPartition.toByteArray()));

  } catch (InvalidProtocolBufferException ex) {
    throw new RuntimeException(ex);
  }
}

HERE では、上記の mappingFn の戻り値とその compileInFn 戻り値が出力パーティションの OutKey として compileOutFn 渡され、対応する入力パーティションの InKey AND InMeta ペアが IntermediateData 含まれます。

入力パーティションのペイロードは、 Compiler オブジェクトの作成時に初期化された retriever オブジェクトを使用して取得します。

Payload payload = retriever.getPayload(intermediate.getKey(), intermediate.getMeta());

HERE マップコンテンツ topology-geometry のレイヤーの Java バインディングを使用して、対応するトポロジパーティションをデコードします。

TopologyGeometryPartitionOuterClass.TopologyGeometryPartition partition =
    TopologyGeometryPartitionOuterClass.TopologyGeometryPartition.parseFrom(
        payload.content());

次に Schema 、プロジェクトの Java バインディングを使用して、対応する出力パーティションを作成します。 各プロトコルバッファメッセージについて、この Schema.NodeCardinalityPartition 場合は、メッセージ内のすべてのフィールドを設定するインターフェイスを提供する Builder(newBuilder) を取得します。

Schema.NodeCardinalityPartition.Builder builder =
    Schema.NodeCardinalityPartition.newBuilder();

繰り返しフィールド node_cardinality のみがあります。このフィールドには、要素をシーケンスに追加するための addNodeCardinality メソッドが自動的に生成されます。 入力パーティション (partition.getNodeList()) の各ノードについて、このメソッドを使用してノードの ID(node.getIdentifier()) とカーディナリティ (node.getSegmentRefCount()) を持つ NodeCardinality オブジェクトを追加します。 NodeCardinality このオブジェクトは、再度ビルダーで作成されます。

// in the output partition node cardinalities will have the same order as the input
// nodes in RIB.
for (TopologyGeometry.Node node : partition.getNodeList()) {
  builder.addNodeCardinality(
      Schema.NodeCardinality.newBuilder()
          .setId(node.getIdentifier())
          .setCardinality(node.getSegmentRefCount())
          .build());
}

次に build 、メソッドを使用して新しい Schema.NodeCardinalityPartitionを作成します。

Schema.NodeCardinalityPartition outputPartition = builder.build();

データを公開するには、出力パーティションをバイト配列 (outputPartition.toByteArray()) にシリアライズし、そこから Payload オブジェクトを解析して、オプションのペイロードを返します。 出力カタログで特定の出力パーティションを使用しない場合 Java.util.Optional は、空のオブジェクトを使用できます。 ただし、この場合は、利用可能な各入力パーティションの出力パーティションを公開する必要があります。

return Optional.of(new Payload(outputPartition.toByteArray()));

トップレベルのフォルダーからプロジェクト全体をビルドできるようになりました。

$ pwd
~/projects/nodecardinality

$ mvn install

プロセッサをローカルで実行します

HERE Map Content のようなグローバルカタログの処理には時間がかかることがあります。 ただし、 1 つ以上 のパーティションフィルターをapplication.conf ファイルに追加することで、ローカル開発中に処理するパーティションの数を制限できます。

この場合、を使用 BoundingBoxFilter して、ベルリンの都市を含むバウンディング ボックス内のパーティションのみを処理します。 processor/src/main/resources/application.conf ファイルを開き、次のパーティションフィルタ設定を追加します。


here.platform.data-processing.executors.partitionKeyFilters = [
  {
    className = "BoundingBoxFilter"
    param = {
      boundingBox {
        // Berlin
        north = 52.67551
        south = 52.338261
        east = 13.76116
        west = 13.08835
      }
    }
  }
]

mvn install この変更を行った後、必ず再実行してください。 プロセッサモジュールフォルダから、上記で設定したコンフィギュレーションファイルを使用してコンパイルジョブを実行できます。

$ pwd
~/projects/nodecardinality/processor

$ mvn exec:java -Dexec.mainClass=com.example.nodecardinality.processor.Main \
                -Dpipeline-config.file=config/pipeline-config.conf          \
                -Dpipeline-job.file=config/pipeline-job.conf                \
                -Dexec.args="--master local[*]"

Maven exec プラグインは、および com.example.nodecardinality.processor.Main 設定ファイル config/pipeline-config.conf およびに設定されたメインクラスで使用 config/pipeline-job.confされます。 PipelineRunnermain このメソッドは、オプション --masterspark-master のコマンド ライン引数を受け入れて、クラスタのマスター URL を設定します。 local[*] Spark をローカルで実行するために使用します。

カタログを検査します

新しいカタログを検査するには、次の操作を行います。

  1. HERE platform にログインします。 [ データ ] タブでカタログを検索し、選択してカタログビューに切り替えます。
  2. node-cardinalityレイヤーを選択し 、「検査」 タブを選択します。
  3. ズーム レベルを 8 に設定し、地図でベルリンを検索します。
  4. 出力パーティションがマップ上で強調表示され、指定したベルリンのバウンディング ボックスがカバーされます。

パーティションをデコードします

出力パーティションをデコードするには、カスタムスキーマを使用するようにレイヤーを設定する必要があります。 まず、スキーマをプラットフォームにアップロードします。 nodecardinality/schema/ フォルダから mvn deploy、次のコマンドを実行します。

$ pwd
~/projects/nodecardinality/schema

$ mvn deploy

同じグループ ID 、アーティファクト ID 、およびバージョンのスキーマは、一度だけ展開できます。 衝突を避けるcom.example.nodecardinality には、グループ ID (この例では)が一意であることを確認してください。

ポータルのデータビューで、 [ スキーマの参照 ] をクリックして、ユーザーがアクセスできる展開済みのスキーマのリストを表示し、スキーマschema_v0が存在することを確認します。

ポータルのnode-cardinalityレイヤービューで、詳細をクリック し、 レイヤーを再設定を選択 してレイヤー設定ページにアクセスします。 レイヤーの作成時に設定したほとんどのパラメーター(スキーマ設定を含む)を変更できるようになりました。 スキーマ設定を探し 、上部のメニューからschema_v0を選択して、ページ下部の [ 保存 ] をクリックします。

[ 検査 ] タブで任意のパーティションを選択できるようになりました。 デコードされたパーティションがパネルの右側に表示されます。

HERE platform でプロセッサをパイプラインとして実行します

パイプライン API を使用して、クラスタで作成したバッチ パイプラインを実行できます。 パイプラインは、次の 2 つの方法のいずれかで展開できます。

  • パイプラインの新しいバージョンが存在するたびに自動的に実行される Scheduled 入力カタログ
  • ユーザーが指定した pipeline-job.conf 設定を使用する Run-Once パイプライン。

パイプライン API を使用するには、アプリ IDグループに属している必要があります。 管理者がアカウントでグループ ID を設定し、そのグループにアプリケーションを割り当てる必要があります。 最後に、そのグループ ID に出力カタログへの読み取りおよび書き込みアクセス権を付与する必要があります。

HERE Workspace パイプラインにプロセッサを展開するには、プロセッサをファット JAR にパッケージ化する必要があります。 pom.xml Archetypes によって生成されたファイルに platform は、この目的のためのプロファイルが含まれています。

$ pwd
~/projects/nodecardinality/processor

$ mvn -Pplatform package

上記のコマンド processor/target/processor-1.0-SNAPSHOT-platform.jarを実行すると、プロセッサ AS のファット JAR が作成されます。

パイプライン テンプレートを作成するには、このファイルを使用する必要があります。

プラットフォームからパイプラインを展開して実行する方法の詳細については、『 Pipelines 開発者ガイド』を参照してください。

さらに、 OLP CLI を使用してパイプラインを展開および実行できます。詳細については、『 OLP CLI ユーザー ガイド』を参照してください。

」に一致する結果は 件です

    」に一致する結果はありません