Maven の原型を使用したバッチ パイプラインの構築 (Vaden Archetypes) ( Scala 英語 )
データ プロセッシング ライブラリを使用してバッチ パイプラインを構築するには、 SDK の Maven 原型を使用してプロジェクトのスケルトンを作成します。 HERE platform ポータル は、資格情報の管理、カタログの作成、およびアクセス権の管理に使用されます。 SDK のアーカイブタイプを使用して、プロジェクトのスケルトンを作成します。
この例では、 HERE Map Content カタログの道路トポロジとジオメトリレイヤーを読み取り、各入力パーティションのすべてのトポロジノードのセグメント参照数 ( カーディナリティ ) を書き込むパイプラインを作成する方法を示します。
資格情報
必要な資格情報には、次の 2 種類があります。
- プラットフォーム資格情報 - これらの資格情報は、プラットフォーム API へのアクセスを提供します。 まず 、 https://platform.here.com/profile/access-credentials で新しいアプリケーションを作成します。 アプリケーションが作成されたら 、 [ キーを作成 ] をクリックして資格情報をダウンロードします。 既定
$HOME/.here/credentials.properties
では、データ プロセッシング ライブラリはファイル内の資格情報を探します。 資格情報ファイルがこの場所に配置されていることを確認してください。 - Repository 資格情報 - これらの資格情報を使用すると、データ プロセッシング ライブラリが存在するリポジトリにアクセスできます。 https://platform.here.com/profile/repository にアクセスし、 Generate 資格情報をクリックします)。
settings.xml
ファイルがダウンロードされます。 このファイルを $HOME/.m2/
フォルダにコピーします。
出力カタログを作成します
まず、パイプラインの出力カタログとして機能する新しいカタログを作成します。 カタログには、道路トポロジおよびジオメトリレイヤーの各パーティションに、そのパーティションのトポロジノードの基数を含むパーティションが含まれている 1 つのレイヤーがあります。 state
データ プロセッシング ライブラリ用に予約されている追加のレイヤーも 1 つ必要です。
HERE platform にログインします。 [ データ ] タブを選択し、次の操作を行います。
- [ 新しいカタログを追加] をクリックします。
- バッチ処理クイックスタートユーザー名など、カタログのカタログ名とカタログ IDを指定します。
- 次に 、カタログの概要 と 連絡先の電子メールを追加します。
- [ 保存 ] をクリックし、 Data API が新しいカタログを作成するまで待ちます。
次に、次のように、アプリケーションにカタログへの読み取り / 書き込みアクセス権を付与します。
- [ データを検索 ] ボックスでカタログの名前を検索して、カタログを選択する。
- [ 共有 ] に移動し、 [ 共有の管理 ] でアプリとカタログを共有を選択します。 アプリケーション ID を入力し 、 [ 許可 ] をクリック して 、 [ 読み取り と 書き込み] をオンにします。
- 最後に、 [ 許可 ] をクリックして変更を有効にします。
カタログにレイヤーを追加します。
- [ 新しいレイヤーを追加 ] をクリックし、
node-cardinality
ID としてレイヤーを作成します。 レイヤーの名前としてもnode-cardinality
を使用できます。また、人間が読める別の名前を選択することもできます。 - HERE Tile レイヤーが必要です。ズーム レベルは、入力した道路トポロジおよびジオメトリレイヤー 12 と同じである必要があります。 [ レイヤータイプ] で [ バージョン管理 ] を選択します。このオプションは、バッチ パイプラインで処理されるすべてのレイヤーで使用する必要があります。
- Protocol Buffers を使用してパーティションをエンコードできるように、
application/x-protobuf
のデフォルト Content Type
を使用します。 スキーマ フィールドは None のままにしておきます。 - 次に、 [ 保存 ] をクリックしてレイヤーの作成を完了します。
- 2 つ目のレイヤーに進み
state
、次の表の 2 行目に従って設定します。この表には、カタログ内のすべてのレイヤーの設定が一覧表示されます。
レイヤー ID | パーティション分割 | ズーム レベル | レイヤータイプ | コンテンツタイプ | スキーマ |
ノードカーディナリティ | HERE Tile | 12 | バージョン付き | application/x-protobuf | なし |
都道府県 | 汎用 | 該当なし | バージョン付き | application/octet-stream | なし |
カタログが完全に設定されました。 プロジェクトの作成に進みます。
プロジェクトを作成する
Data SDK には、新しいバッチパイプラインを作成するプロセスを簡素化するための Maven 原型が含まれています。 Maven の原型を使用して、いくつかのシェルコマンドを使用して完全なプロジェクト構造を構築できます。 原型によって、すべての基本的な依存関係、サンプル設定ファイル、および独自のロジックを実装するために編集できるソースファイルを含む POM ファイルが自動的に生成されます。 少なくとも 3 つのプロジェクトを作成する必要があります。
- すべてのサブプロジェクトを 1 つの POM ファイルでコンパイルするための、便利なトップレベルのプロジェクト
- プロトコルバッファスキーマの Java/Scala バインディングを構築するための、ネストされたスキーマプロジェクト
- 処理ロジックを構築するためのプロセスプロジェクト
次の手順では、 Maven がインストールされ、 mvn
実行可能ファイルが PATH
変数に含まれていることを前提としています。 bash
以下のすべてのコマンドをシェルから実行する必要があります。 tree
このコマンドは、フォルダ構造を表示するために使用します。 または ls -R
、を置き換えとして使用することもできます。
まず nodecardinality
、次のコマンドを実行してという名前のトップレベルのプロジェクトを作成し、を押し ENTER
て確認します。
$ pwd
~/projects
$ mvn archetype:generate -DarchetypeGroupId=org.codehaus.mojo.archetypes \
-DarchetypeArtifactId=pom-root \
-DarchetypeVersion=1.1 \
-DgroupId=com.example \
-DartifactId=nodecardinality \
-Dversion=1.0.0 \
-Dpackage=com.example.nodecardinality
nodecardinality
現在のディレクトリに次のファイルを含むフォルダが作成されます。
$ pwd
~/projects
$ tree
.
`-- nodecardinality
`-- pom.xml
1 directory, 1 file
サブプロジェクトはこのフォルダ内から作成されます。 nodecardinality
サブプロジェクトを作成するフォルダに移動します。 まず、次のコマンドを実行してモデルプロジェクトを作成し、を押し ENTER
て確認します。
$ pwd
~/projects/nodecardinality
$ mvn archetype:generate -DarchetypeGroupId=com.here.platform.schema \
-DarchetypeArtifactId=project_archetype \
-DarchetypeVersion=X.Y.Z \
-DgroupId=com.example.nodecardinality \
-DartifactId=schema \
-Dversion=1.0.0 \
-Dpackage=com.example.nodecardinality.schema \
-DmajorVersion=0
SDK に含まれている最新バージョンの原型に関する特定のドキュメントについて は、『 Archetypes 開発者ガイド』を参照してください。 。
nodecardinality/schema
これにより、出力カタログのスキーマを構築するプロジェクトが含まれているフォルダにプロジェクトテンプレートが作成されます。
$ pwd
~/projects/nodecardinality
$ tree
.
|-- pom.xml
`-- schema
|-- ds
| |-- pom.xml
| `-- src
| |-- assembly
| | `-- proto.xml
| `-- main
| `-- resources
| |-- ResourcesReadMe.txt
| `-- renderers
| `-- ReadMe.txt
|-- java
| `-- pom.xml
|-- pom.xml
|-- proto
| |-- pom.xml
| `-- src
| |-- assembly
| | `-- proto.xml
| `-- main
| |-- proto
| | `-- com
| | `-- example
| | `-- nodecardinality
| | `-- schema
| | `-- v0
| | `-- schema.proto
| `-- resources
| `-- description.md
|-- scala
| `-- pom.xml
`-- schema.yml
20 directories, 13 files
最後に nodecardinality
、フォルダ内で次のコマンドを実行して、を含むプロセッサテンプレートを作成 Direct1ToNCompiler
します。を押し ENTER
て確認します。
$ pwd
~/projects/nodecardinality
$ mvn archetype:generate -DarchetypeGroupId=com.here.platform \
-DarchetypeArtifactId=batch-direct1ton-scala-archetype \
-DarchetypeVersion=X.Y.Z \
-DgroupId=com.example.nodecardinality \
-DartifactId=processor \
-Dversion=1.0.0 \
-Dpackage=com.example.nodecardinality.processor
SDK に含まれている最新バージョンの原型に関する特定のドキュメントについては、 Archetypes 開発者ガイドを参照してください。
これで、 nodecardinality
プロジェクトに追加の processor
フォルダーが追加されました。
$ pwd
~/projects/nodecardinality
$ tree
.
|-- pom.xml
|-- processor
| |-- config
| | |-- pipeline-config.conf
| | `-- pipeline-job.conf
| |-- pom.xml
| `-- src
| `-- main
| |-- resources
| | |-- application.conf
| | `-- log4j.properties
| `-- scala
| `-- com
| `-- example
| `-- nodecardinality
| `-- processor
| |-- Compiler.scala
| |-- CompilerConfig.scala
| |-- IntermediateData.scala
| |-- LayersDef.scala
| `-- Main.scala
`-- schema
|-- ds
| |-- pom.xml
| `-- src
| |-- assembly
| | `-- proto.xml
| `-- main
| `-- resources
| |-- ResourcesReadMe.txt
| `-- renderers
| `-- ReadMe.txt
|-- java
| `-- pom.xml
|-- pom.xml
|-- proto
| |-- pom.xml
| `-- src
| |-- assembly
| | `-- proto.xml
| `-- main
| |-- proto
| | `-- com
| | `-- example
| | `-- nodecardinality
| | `-- schema
| | `-- v0
| | `-- schema.proto
| `-- resources
| `-- description.md
|-- scala
| `-- pom.xml
`-- schema.yml
30 directories, 23 files
スキーマサブプロジェクト
このnodecardinality/schema
フォルダには、 プロトコルバッファとしてエンコードされたパーティションをデシリアライズ / シリアライズするための Java および Scala ライブラリ ( 通常はバインディングと呼ばれます ) を構築する Maven プロジェクトのスケルトンが含まれています。 これは、出力レイヤーのパーティションを node-cardinality
プロトコルバッファとしてエンコードする場合に必要で、カスタムパーティションスキーマを指定する場合には + が必要です。
プロジェクトのフォルダーには、次のものがあります。
pom.xml
プロジェクトのコンパイルに使用されるメインの POM ファイルです java
プロトコルバッファの Java バインディングを構築するための POM ファイルを含むフォルダ scala
プロトコルバッファの Scala バインディングを構築するための POM ファイルを含むフォルダ ds
結果のバインディングおよびプロトコルバッファ定義を ZIP ファイルにバンドルするサブプロジェクトを含むフォルダ。このフォルダをプラットフォーム Artifactory リポジトリに公開して、プラットフォームポータルからパーティションをデコードできます proto
プロトコルバッファ定義を含むフォルダ。 これは、出力スキーマを指定するためにカスタマイズするサブプロジェクトです。
カスタムプロトコルバッファスキーマを作成するには、 nodecardinality/schema/proto/src
フォルダに .proto
ファイルを追加する必要があります。 プロトコルバッファの詳細については、プロトコルバッファのドキュメントを参照してください。
作成したスケルトンプロジェクトには、 .proto
出力パーティションのスキーマをすばやく定義するために編集できるファイルがすでに含まれています。
nodecardinality/schema/proto/src/main/proto/com/example/nodecardinality/schema/v0/schema.proto
ファイルを開き、メインメッセージ定義を検索します。
syntax = "proto3";
package com.example.nodecardinality.schema.v0;
message MainProtobufMessage {
int32 lat = 1;
int32 lon = 2;
}
メインメッセージの名前を MainProtobufMessage
から NodeCardinalityPartition
に変更し、サンプルフィールド lat
lon
を削除して、 node_cardinality
type NodeCardinality
という名前の repeated
フィールドを追加します。
次に、ノード NodeCardinality
の ID (id
)とノードのカーディナリティ(cardinality
)の 2 つのフィールドを使用して、補助メッセージタイプを定義します。 この新しい Protobuf 定義は次のようになります。
syntax = "proto3";
package com.example.nodecardinality.schema.v0;
message NodeCardinalityPartition {
repeated NodeCardinality node_cardinality = 1;
}
message NodeCardinality {
string id = 1;
uint32 cardinality = 2;
}
メインメッセージの名前を変更したため、スキーマバンドルの作成に使用した設定を更新してください。 ds
サブプロジェクトの POM ファイル () を開きnodecardinality/schema/ds/pom.xml
、次の設定を探します layer-manifest-plugin
。
<plugin>
<groupId>com.here.platform.schema.maven_plugins</groupId>
<artifactId>layer-manifest-plugin</artifactId>
<version>${here.plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>write-manifest</goal>
</goals>
</execution>
</executions>
<configuration>
<mainMessage>com.example.nodecardinality.schema.v0.MainProtobufMessage</mainMessage>
<inputDir>${project.build.directory}/proto</inputDir>
<writeManifest>true</writeManifest>
</configuration>
</plugin>
mainMessage
プラグイン設定でパスを変更します。置き換え com.example.nodecardinality.schema.v0.MainProtobufMessage
られるのは com.example.nodecardinality.schema.v0.NodeCardinalityPartition
次のとおりです。
<plugin>
<groupId>com.here.platform.schema.maven_plugins</groupId>
<artifactId>layer-manifest-plugin</artifactId>
<version>${here.plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>write-manifest</goal>
</goals>
</execution>
</executions>
<configuration>
<mainMessage>com.example.nodecardinality.schema.v0.NodeCardinalityPartition</mainMessage>
<inputDir>${project.build.directory}/proto</inputDir>
<writeManifest>true</writeManifest>
</configuration>
</plugin>
次に、スキーマプロジェクトを nodecardinality/schema
フォルダーから mvn install
実行してコンパイルします。 または、トップレベルのプロジェクトからこのコマンドを実行することもできます。
schema_v0_java
と schema_v0_scala_${scala.compat.version}
の 2 つのライブラリが構築されています。このライブラリは、 NodeCardinalityPartition
オブジェクトの作成、 ByteArray
へのシリアル化、 ByteArray
からの逆シリアル化を行うための API を提供します。
出力パーティションを書き込むに schema_v0_scala_${scala.compat.version}
は、処理ロジックでを使用します。 ロードトポロジおよびジオメトリ層から入力パーティションを読み取るに com.here.schema.rib.topology-geometry_v2_scala
は、で提供されている対応する Java バインディングを使用します。
次のセクションでは、これらの依存関係をプロセッサのサブプロジェクトに追加します。
処理ロジック
nodecardinality/processor
このフォルダには、バッチ処理パイプラインのスケルトンプロジェクトが含まれています。 このプロジェクトでは、最終的な処理アプリケーションを構築します。
プロジェクトフォルダには、次のコンポーネントが含まれています。
pom.xml
プロジェクトのコンパイルに使用されるメインの POM ファイル。 src
ロジックを実装する Scala ソースファイルを含むフォルダ。 config
パイプライン API の外部でパイプラインをローカルに実行するために使用できる設定ファイルが含まれているフォルダ。
パイプラインの設定
バッチ処理ジョブを実行するには、データ プロセッシング ライブラリで次の条件を満たす必要があります。
- すべての入力カタログおよび出力カタログの HERE リソース名 (HRNS) 。
- バッチジョブで処理するすべての入力カタログのバージョン。
SCHEDULED
状態のパイプラインの場合 ' この情報は 'HOCON 形式の 2 つのコンフィギュレーションファイルを介してパイプライン API によって自動的に提供されます
-
pipeline-config.conf
入力カタログおよび出力カタログの HERE リソースネーム を提供します。 -
pipeline-job.conf
処理する入力カタログのバージョンを提供します。
パイプラインがデプロイされると、 HERE Workspace パイプラインに最初のファイルがアップロードされます。コンパイルごとに変更されることはありません。 一方、ジョブ設定は、新しいジョブが実行されたと見なされたときに、パイプライン API によってオンザフライで作成されます。 たとえば、入力カタログの新しいバージョンが存在し、出力カタログを更新する必要があります。
ローカル開発中にパイプライン API を使用せずにバッチ パイプラインをローカルで実行する場合は、次の 2 つの Java システムプロパティを設定して、これらの設定ファイルを自分自身で提供する必要があります。
-
pipeline-config.file
ファイルへのパスが含まれています pipeline-config.conf
-
pipeline-job.file
ファイルへのパスが含まれています pipeline-job.conf
nodecardinality/processor/config
このフォルダには、両方のファイルのテンプレートが含まれています。このテンプレートは編集してローカル開発に使用できます。
パイプライン設定のテンプレートファイル ()nodecardinality/processor/config/pipeline-config.conf
は次のようになります。
pipeline.config {
output-catalog { hrn = "hrn:here:data:::myoutput" }
input-catalogs {
input-catalog-1 { hrn = "hrn:here:data:::myinput1" }
input-catalog-2 { hrn = "hrn:here:data:::myinput2" }
}
}
pipeline.config.output-catalog.hrn
出力カタログの HERE リソースネーム を示します。 作成したカタログの HERE リソースネーム を読み取るには、プラットフォームポータルでカタログを開きます。 カタログ ID が batch-processing-quickstart
の場合、対応する HERE リソースネームは hrn:here:data:::batch-processing-quickstart
です。
pipeline.config.input-catalogs
任意のシンボリック識別子によってインデックス化されたすべての入力カタログの HERE リソースネーム が含まれます。この ID は、ジョブ設定および処理ロジックで特定の入力カタログを識別するために使用されます。 テンプレートファイルには、識別子 input-catalog-1
と input-catalog-2
対応するサンプルの HRNS を含む 2 つの入力カタログが含まれています。 このプロジェクトでは、 HERE リソースネーム を使用する入力カタログ、 HERE Map Content は 1 hrn:here:data::olp-here:rib-2
つだけです。 これらの 2 つのサンプル入力カタログを削除し、前述の HERE リソースネーム で追加して rib
から、カタログ ID としてを選択します。
pipeline.config {
output-catalog { hrn = "hrn:here:data:::batch-processing-quickstart" }
input-catalogs {
rib { hrn = "hrn:here:data::olp-here:rib-2" }
}
}
nodecardinality/processor/config/pipeline-job.conf
このファイルは、ジョブ設定のテンプレートです。 次のものが含まれています。
pipeline.job.catalog-versions {
input-catalogs {
input-catalog-1 {
processing-type = "reprocess"
version = 0
}
input-catalog-2 {
processing-type = "reprocess"
version = 2
}
}
}
で指定された入力カタログごと pipeline-config.conf
に pipeline.job.catalog-versions.input-catalogs
、には次のものが含まれます。
は pipeline.job.catalog-versions.input-catalogs.
input-catalog-ID
.processing-type
、 3 つの処理タイプを示す 3 つの異なる値を持つことができます。
- 再処理: このタイプは、カタログを完全に処理する必要があることを示します。前のコンパイルの結果は、処理されるデータ量の削減には使用されません。 手動で記述されたジョブ設定を処理する場合に使用する処理の最も簡単なタイプです。 これは、データ プロセッシング ライブラリの機能であるインクリメンタル・コンパイルを効果的に無効にします。この機能を使用すると、前のコンパイルの結果を使用して処理されるデータ量を削減できます。 このタイプの処理で
version
は、処理するカタログのを指定する必要があります。 - no_changes:このタイプは、出力カタログが最後にコンパイルされたときに使用されたカタログの同じバージョンを再利用することを示します。 このタイプの処理では、データ プロセッシング ライブラリはいくつかのコンパイルステップをスキップできます。
version
処理するカタログのを指定する必要があります。 有効な処理コンフィグレーションでは、バージョンが最後のコンパイルで使用されたバージョンと同じであることが必要です。 データ プロセッシング ライブラリは、処理が開始される前にこの条件が true であることを確認 します。true でない場合は、インクリメンタル・コンパイルが無効になります。 - 変更: このタイプは
version
、最後のコンパイルで処理されたバージョン () に基づいて、カタログ () の新しいバージョンを処理することを示しsince-version
ます。 このタイプの処理は、データ プロセッシング ライブラリによって処理の最適化に使用され、実際に再処理されるデータ量を削減できます。 処理設定は、最後のコンパイルで使用されたカタログのバージョンが実際に使用されている限り有効です since-version
。バージョンが実際でない場合 、インクリメンタル コンパイルは無効になります。
no_changes とchanges の両方 が、データ プロセッシング ライブラリで内部的に最適化を有効にするためにのみ使用されることを理解しておくことが重要です。 概念的には、任意の入力カタログについて、処理中のライブラリは常に特定のバージョンを完全に処理します。 このクイックスタートおよびローカル開発では 、再処理 の処理タイプのみに依存する必要があります。 パイプラインを展開した後、データ処理ライブラリの最適化機能を最大限に活用する有効なジョブ設定を提供することは、パイプライン API の義務です。
執筆時点 1
で利用可能な HERE Map Content の最新バージョンはです。
processing-type
と 1
の再処理version
を使用して、 rib
カタログを構成しましょう。
pipeline-job.conf
ここでは、次の内容を確認します。
pipeline.job.catalog-versions {
input-catalogs {
rib {
processing-type = "reprocess"
version = 1
}
}
}
依存関係
SDK のアーカイブタイプは、 processor
サブプロジェクトの pom.xml
ファイルにあるすべての基本的な依存関係を提供します。 処理ロジック HERE で使用されるカスタムの依存関係を手動で追加する必要があります。 このプロジェクトでは、さらに 2 つの依存関係が必要です。
-
com.here.schema.rib.topology-geometry_v2_scala
をクリックして、入力パーティションをデシリアライズします -
com.example.nodecardinality.schema_v0_scala_${scala.compat.version}
出力パーティションをシリアル化するために作成した
nodecardinality/processor/pom.xml
ファイルを開きます。 スキーマプロジェクトによって作成された Java バインディングのプレースホルダがすでに存在します。 検索するには DATA_MODEL_PROJECT_NAME
、ファイルでを検索します。
その依存関係をコメント解除し、 {DATA_MODEL_PROJECT_NAME}
プレースホルダーと {DATA_MODEL_PROJECT_VERSION}
プレースホルダーにそれぞれ schema_v0
AND 1.0.0
を入力します。
<dependency>
<groupId>com.example.nodecardinality</groupId>
<artifactId>schema_v0_scala_${scala.compat.version}</artifactId>
<version>1.0.0</version>
</dependency>
次に、次の項目に依存関係を追加します com.here.schema.rib.topology-geometry_v2_scala
。
<dependency>
<groupId>com.here.schema.rib</groupId>
<artifactId>topology-geometry_v2_scala</artifactId>
<version>2.8.0</version>
</dependency>
処理ロジック
ここで 'Maven の原型が作成した Scala ソースファイルを編集して ' 実際の処理ロジックを実装します processor/src/main/scala/com/example/nodecardinality/processor
このフォルダには 5 つのソースファイルがあります。
-
Main.scala
: のサブクラスとして、処理アプリケーションのメインエントリポイントを含み PipelineRunner
ます。 Driver
は、 1 つ Direct1ToNCompiler
を含む 1 つの DriverTask
を使用して設定されます( Compiler.scala
で実装)。 -
CompilerConfig.scala
: コンパイラーの構成が含ま application.conf
れています。このクラスは、構成ファイルを使用してビジネスロジックを構成するために定義できます。 ただし、ビジネスロジックで設定パラメータを公開する必要はありません。そのため、既定の実装で十分です。 -
IntermediateData.scala
: IntermediateData
で定義されているコンパイラで使用されるクラスを定義 Compiler.scala
します。 -
Compiler.scala
: 実際の処理ロジックをとして実装 Direct1ToNCompiler
します。 -
LayersDef.scala
: 入力および出力レイヤーを定義します。
まず、タスクに使用するコンパイルパターンと中間データを決定します。 このクイックスタートでは、ファンクショナルパターンを中心に説明します。 RDD ベースのパターンと比較すると、介入なしでインクリメンタルコンパイルを実行でき、パーティション分割、シャフリング、持続性などのSpark
警告に対処する必要はありません。
機能 Spark
的なパターンの仕組みをより的確に把握するために、基盤となるアプリケーションが関心を持っています。 すべてのパターンで、次のスキームの異なるフレーバーが実装されます。
- 入力メタデータが取得され、
Key
と Meta
のペアの RDD
が作成されます。 Key
パーティションを一意に識別し、カタログ ID 、レイヤー ID 、およびパーティション名を含みます。 Meta
に、パーティションのペイロードに関する情報を示します。 およびを、対応する Key
とともに使用して、パーティション(ペイロード)のコンテンツを取得できます。 - CompileIn 変換が入力
RDD
に適用されます。 このステップの目的は、入力パーティションと出力パーティションの間のマッピングを定義し、入力データを定義した中間リプレゼンテーションにプリプロセスすることです。 ほとんどのコンパイルパターンでは、このステップは flatMap
に対応しています。ここ compileInFn
では、 1 つの (Key, Meta)
ペアを返す (Key, IntermediateData)
が RDD
のすべての要素に適用されます。 その後 groupBy
、同じ出力キーを持つすべての中間リプレゼンテーションをグループ化する変換が行われます。 Key
AND Iterable<IntermediateData>
ペアの結果RDD
は 、 CompileOut 変換を適用して処理 されます。この変換では、Payload
がグループ化された中間リプレゼンテーションからそれぞれKey
に作成されます。
このプロジェクトでは、 HERE マップコンテンツの topology-geometry
レイヤーの各入力パーティションについて、出力カタログの node-cardinality
レイヤーに同じタイル ID を持つ出力パーティションを作成します。 入力と出力の間のマッピングは、入力パーティションの内容には依存 Key
しません。パーティションの一部であるタイル ID のみが必要です。 このため 、直接コンパイラーを使用できます。
各入力パーティションが 1 つの出力パーティションを生成するために使用されるため、直接1:1
コンパイルを実装します。 1:N
これはコンパイルの特殊なケースで Direct1ToNCompiler
あるため、必要なのはパターンのみです。
CompileIn とCompileOut の間で使用するIntermediateData
を決定する必要があります。 直接 1:1
コンパイルを実行するの 、 CompileOut に処理ロジックを直接実装できます。 つまり、入力パーティションの Key
オブジェクトおよび Meta
オブジェクトを CompileIn 変換から CompileOut 変換に転送し、入力データを処理できます。
IntermediateData
処理済みバージョンの入力パーティションを含むより複雑なクラスが必要になることに注意してください。特に、入力パーティションを使用して複数の出力パーティションをコンパイルし、同じデータを複数回処理しないようにする場合に注意してください。
Maven の原型が提供するデフォルトの IntermediateData
クラスは単に Key
と Meta
クラスをラップするだけなので、この場合は変更せずに使用できます。
IntermediateData
次のクラスは、原型によって提供されています。
case class IntermediateData(attribute1: String, attribute2: String)
Key
と Meta
のペアをラップするように実装を書き換えてみましょう。
import com.here.platform.data.processing.compiler.{InKey, InMeta}
case class IntermediateData(key: InKey, meta: InMeta)
そのファイルは変更せずに開き processor/src/main/scala/com/example/nodecardinality/processor/LayersDef.scala
ます。 HERE で、入力および出力レイヤーのプレースホルダーを置き換えます。
object In {
val CatalogId = Catalog.Id("input-catalog-1")
val LayerId = Layer.Id("input-layer-1")
}
object Out {
val LayerId = Layer.Id("output-layer-1")
}
input-catalog-1
の場合は、 pipeline-config.conf
(rib
) で設定したシンボリック ID を使用する必要があります。 input-layer-1
IS topology
および output-layer-1
IS : node-cardinality
object In {
val CatalogId = Catalog.Id("rib")
val LayerId = Layer.Id("topology-geometry")
}
object Out {
val LayerId = Layer.Id("node-cardinality")
}
processor/src/main/scala/com/example/nodecardinality/processor/Compiler.scala
ファイルを開きます。
HERE Map Content topology-geometry
レイヤーおよび出力レイヤーの Scala バインディングをインポートする必要があります。
import com.example.nodecardinality.schema.v0.schema._
import com.here.schema.rib.v2.topology_geometry_partition._
直接コンパイラーでは、 CompileIn 関数は次のように分割されます。
- 入力
Key
された出力 Key
のシーケンスを返す関数 mappingFn
です - 入力
Key
および Meta
を指定して IntermediateData
オブジェクトを返す関数 compileInFn
mappingFn
によって確立されたマッピング は 、対応する出力Key
にIntermediateData
オブジェクトを送信するために使用 され、その後compileOutFn
関数でコンパイルされます。
マッピング機能を検索します。
override def mappingFn(inKey: InKey): Iterable[OutKey] = ???
各入力 Key
は Key
、同じパーティション名、出力カタログ () と同じカタログ IDDefault.OutCatalogId()
、および出力レイヤー () と同じレイヤー ID を持つ出力にマップする必要OUT_LAYER
があります。
まず Default
、オブジェクトをインポートします。
import com.here.platform.data.processing.driver.Default
次に mappingFn
、次のように実装します。
override def mappingFn(inKey: InKey): Iterable[OutKey] =
Iterable(OutKey(Default.OutCatalogId, Out.LayerId, inKey.partition))
compileInFn
では、入力パーティション Key
および Meta
の IntermediateData
ビルトインを返却するだけです。
既存 compileInFn
のメソッドを次のメソッドに置き換えます。
override def compileInFn(in: (InKey, InMeta)): IntermediateData =
IntermediateData(in.key, in.meta)
次に compileOutFn
、を次のように置き換えます。
override def compileOutFn(outKey: OutKey, intermediate: IntermediateData): Option[Payload] = {
val payload = retriever.getPayload(intermediate.key, intermediate.meta)
val partition = TopologyGeometryPartition.parseFrom(payload.content)
val outputPartition =
NodeCardinalityPartition(
partition.node.map(node => NodeCardinality(node.identifier, node.segmentRef.size)))
Some(Payload(outputPartition.toByteArray))
}
HERE では、上記の mappingFn
の戻り値とその compileInFn
戻り値が出力パーティションの OutKey
として compileOutFn
渡され、対応する入力パーティションの InKey
AND InMeta
ペアが IntermediateData
含まれます。
Compiler
オブジェクトの作成時に初期化された retriever
オブジェクトを使用して、入力パーティションのペイロードを取得します。
val payload = retriever.getPayload(intermediate.key, intermediate.meta)
HERE マップコンテンツ topology-geometry
のレイヤーの Scala バインディングを使用して、対応するトポロジパーティションをデコードします。
val partition = TopologyGeometryPartition.parseFrom(payload.content)
次に Schema
、プロジェクトの Scala バインディングを使用して、対応する出力パーティションを作成します。 プロトコルバッファメッセージごと NodeCardinalityPartition
に、 case クラスのインスタンスを作成します。 これには、 node_cardinality
スキーマで定義されているように、繰り返しフィールドを表す単一のコンストラクタ引数があります。 この引数の値は、トポロジパーティション内のノードのリストを反復処理して、そのような各ノードに NodeCardinality
ノードの ID()node.identifier
とカーディナリティ () を持つオブジェクトを作成することによって作成node.segmentRef.size
されます。
val outputPartition =
NodeCardinalityPartition(
partition.node.map(node => NodeCardinality(node.identifier, node.segmentRef.size)))
データを公開するに は、出力パーティションをバイト配列にシリアライズし(outputPartition.toByteArray
)、そこから Payload
オブジェクトを解析して、オプションのペイロードを返します。 出力カタログで特定の出力パーティションを使用しない None
場合は、を使用できます。 ただし、この場合は、利用可能な各入力パーティションの出力パーティションを公開します。
Some(Payload(outputPartition.toByteArray))
次に、トップレベルのフォルダーからプロジェクト全体をビルドします。
$ pwd
~/projects/nodecardinality
$ mvn install
プロセッサをローカルで実行します
HERE Map Content のようなグローバルカタログの処理には時間がかかることがあります。 ただし、 1 つ以上 のパーティションフィルターをapplication.conf
ファイルに追加することで、ローカル開発中に処理するパーティションの数を制限できます。
この場合、を使用 BoundingBoxFilter
して、ベルリンの都市を含むバウンディング ボックス内のパーティションのみを処理します。 processor/src/main/resources/application.conf
ファイルを開き、次のパーティションフィルタ設定を追加します。
here.platform.data-processing.executors.partitionKeyFilters = [
{
className = "BoundingBoxFilter"
param = {
boundingBox {
north = 52.67551
south = 52.338261
east = 13.76116
west = 13.08835
}
}
}
]
mvn install
この変更を行った後、必ず再実行してください。 プロセッサモジュールフォルダから、上記で設定したコンフィギュレーションファイルを使用してコンパイルジョブを実行できます。
$ pwd
~/projects/nodecardinality/processor
$ mvn exec:java -Dexec.mainClass=com.example.nodecardinality.processor.Main \
-Dpipeline-config.file=config/pipeline-config.conf \
-Dpipeline-job.file=config/pipeline-job.conf \
-Dexec.args="--master local[*]"
Maven exec
プラグインは、および com.example.nodecardinality.processor.Main
設定ファイル config/pipeline-config.conf
およびに設定されたメインクラスで使用 config/pipeline-job.conf
されます。 PipelineRunner
main
このメソッドは、オプション --master
spark-master
のコマンド ライン引数を受け入れて、クラスタのマスター URL を設定します。 local[*]
Spark をローカルで実行するために使用します。
カタログを検査します
新しいカタログを検査するには、次の操作を行います。
- HERE platform にログインします。 [ データ ] タブでカタログを検索し、選択してカタログビューに切り替えます。
node-cardinality
レイヤーを選択し 、「検査」 タブを選択します。 - ズーム レベルを 8 に設定し、地図でベルリンを検索します。
- 出力パーティションがマップ上で強調表示され、指定したベルリンのバウンディング ボックスがカバーされます。
パーティションをデコードします
出力パーティションをデコードするには、カスタムスキーマを使用するようにレイヤーを設定する必要があります。 まず、スキーマをプラットフォームにアップロードします。 nodecardinality/schema/
フォルダから mvn deploy
、次のコマンドを実行します。
$ pwd
~/projects/nodecardinality/schema
$ mvn deploy
同じグループ ID 、アーティファクト ID 、およびバージョンのスキーマは、一度だけ展開できます。 衝突を避けるcom.example.nodecardinality
には、グループ ID (この例では)が一意であることを確認してください。
ポータルのデータビューで、 [ スキーマの参照] をクリックして、ユーザーがアクセスできる展開済みのスキーマのリストを表示し、スキーマschema_v0
が存在することを確認します。
ポータルのnode-cardinality
レイヤービューで、詳細をクリック し、 レイヤーを再設定を選択 してレイヤー設定ページにアクセスします。 レイヤーの作成時に設定したほとんどのパラメーター(スキーマ設定を含む)を変更できるようになりました。 スキーマ設定を探し 、上部のメニューからschema_v0
を選択して、ページ下部の [ 保存 ] をクリックします。
[ 検査 ] タブで任意のパーティションを選択できるようになりました。 デコードされたパーティションがパネルの右側に表示されます。
パイプライン API を使用して、クラスタで作成したバッチ パイプラインを実行できます。 パイプラインは、次の 2 つの方法のいずれかで展開できます。
- パイプラインの新しいバージョンが存在するたびに自動的に実行される Scheduled 入力カタログ
- ユーザーが指定した
pipeline-job.conf
設定を使用する Run-Once パイプライン。
パイプライン API を使用するには、アプリ ID がグループに属している必要があります。 管理者がアカウントでグループ ID を設定し、そのグループにアプリケーションを割り当てる必要があります。 最後に、そのグループ ID に出力カタログへの読み取りおよび書き込みアクセス権を付与する必要があります。
HERE Workspace パイプラインにプロセッサを配備するには、まず CPU を脂肪質の JAR にパッケージ化します。 pom.xml
Archetypes によって生成されたファイルに platform
は、この目的のためのプロファイルが含まれています。
$ pwd
~/projects/nodecardinality/processor
$ mvn -Pplatform package
上記のコマンド processor/target/processor-1.0-SNAPSHOT-platform.jar
を実行すると、プロセッサ AS のファット JAR が作成されます。
パイプライン テンプレートを作成するには、このファイルを使用する必要があります。
HERE platform ポータルからパイプラインを展開して実行する方法の詳細については、『 Pipelines 開発者ガイド』を参照してください。
さらに、 OLP CLI を使用してパイプラインを展開および実行できます。詳細については、『 OLP CLI ユーザー ガイド』を参照してください。