データ プロセッシング ライブラリ を使用してカタログをコピーします
目的: データ プロセッシング ライブラリを使用して、ソースカタログの GeoJSON コンテンツをコピーします。
複雑さ: 中級者です
所要時間: 30 分
前提条件: 資格情報 の確認、 Maven の設定の確認、 プロジェクトでの作業の整理を行います
ソースコード: ダウンロード
この例では、 OLP CLI を使用してカタログを作成し、新しいバージョン付レイヤーを使用してカタログを設定し、レイヤーに GeoJSON の内容を取り込み、これらの内容を別のカタログにコピーするデータ プロセッシング ライブラリアプリケーションを記述する方法を示します。
ソースカタログを作成します
OLP CLI では、データサービスで自身を認証するために有効な HERE Credentials のセットが必要 です。そのため、「資格情報の確認」チュートリアルで予期した結果が返されていることを確認してください。
まだダウンロードしていない場合は、 Java and Scala の例と CLI をダウンロードして解凍します。 解凍したファイルのtools/OLP_CLI
フォルダを${PATH}
に追加 します。
ソースカタログに一意の識別子と名前を付け {{YOUR_USERNAME}}-geojson-dpl-tutorial
ます ( 例 : ) 。
の後の{{YOUR_SRC_CATALOG_ID}}
をユーザー独自の識別子{{YOUR_PROJECT_HRN}}
で置き換え、 プロジェクト での作業を整理 するチュートリアルの Project HERE リソースネーム で置き換えて、次のコマンドを実行します。
olp catalog create {{YOUR_SRC_CATALOG_ID}} "{{YOUR_USERNAME}} geojson example" \
--summary "geojson example" \
--description "geojson example" \
--scope {{YOUR_PROJECT_HRN}}
CLI は次のように戻ります。
Catalog {{YOUR_SRC_CATALOG_HRN}} has been created.
未加工の GeoJSON コンテンツを含むレイヤーを追加します。 このバージョン付レイヤーには、ズーム レベル 12 でパーティション分割されたコンテンツタイプ "application/vnd.geo+json"
"heretile"
があり、 "DE"
(ドイツ)の管理領域をカバーしています。
olp catalog layer add {{YOUR_SRC_CATALOG_HRN}} geojson "geojson example" \
--versioned \
--content-type=application/vnd.geo+json \
--partitioning=heretile:12 \
--coverage=DE \
--summary "geojson example" \
--description "geojson example" \
--scope {{YOUR_PROJECT_HRN}}
CLI は次のように戻ります。
Layer GeoJSON has been added to the catalog.
注
レルムで請求タグが必要な場合 --billing-tags: "YOUR_BILLING_TAG"
は、パラメータを使用します。
という名前のフォルダーを作成 source-partitions
します。
mkdir source-partitions
source-partitions
名前付きでファイルを作成 23618359
し、次の GeoJSON を使用してファイルに入力します。 パーティション内のポリゴンを表し 23618359
ます。
{
"type": "FeatureCollection",
"features": [{
"type": "Feature",
"geometry": {
"type": "Polygon",
"coordinates": [[
[13.351969844791705, 52.52978386622147],
[13.330565116221472, 52.551188594791704],
[13.300294258778528, 52.551188594791704],
[13.278889530208295, 52.52978386622147],
[13.278889530208295, 52.49951300877853],
[13.300294258778528, 52.478108280208296],
[13.330565116221472, 52.478108280208296],
[13.351969844791705, 52.49951300877853],
[13.351969844791705, 52.52978386622147]
]]
},
"properties": {
"tooltip": "GREEN",
"style" : {
"color" : "#228B22",
"fill" : "#228B22",
"opacity": 1.0
},
"width": 5
}
}]
}
source-partitions
名前付きでファイルを作成 23618402
し、次の GeoJSON を使用してファイルに入力します。 パーティション内のポリゴンを表し 23618402
ます。
{
"type": "FeatureCollection",
"features": [{
"type": "Feature",
"geometry": {
"type": "Polygon",
"coordinates": [[
[13.439860469791705, 52.52978386622147],
[13.418455741221472, 52.551188594791704],
[13.388184883778528, 52.551188594791704],
[13.366780155208295, 52.52978386622147],
[13.366780155208295, 52.49951300877853],
[13.388184883778528, 52.478108280208296],
[13.418455741221472, 52.478108280208296],
[13.439860469791705, 52.49951300877853],
[13.439860469791705, 52.52978386622147]
]]
},
"properties": {
"tooltip": "GREEN",
"style" : {
"color" : "#228B22",
"fill" : "#228B22",
"opacity": 1.0
},
"width": 5
}
}]
}
GeoJSON パーティションをカタログの GeoJSON レイヤーにアップロードします。
olp catalog layer partition put {{YOUR_SRC_CATALOG_HRN}} geojson --input=./source-partitions --scope {{YOUR_PROJECT_HRN}}
CLI は次のように戻ります。
Partitions 23618402, 23618359 were successfully uploaded.
プラットフォームポータルでパーティションの内容を検査できます。 この操作を行うに は、ポータルに移動し、リストで{{YOUR_SRC_CATALOG_HRN}}
検索してこのカタログを開きます。 GeoJSON レイヤーを選択 して詳細を表示し、 [ 検査 ] タブでデータを検査します。
2 つのパーティションのアウトラインは、地図のベルリンエリアに表示されます。
緑色の八角形をレンダリングするには、これらのパーティションをズームインしてクリックします。 テキストが「 green 」のツールチップを開くには、八角形の上にマウスを置きます。
また、次のコマンドを実行して、 OLP CLI でパーティションを一覧表示することもできます。
olp catalog layer partition list {{YOUR_SRC_CATALOG_HRN}} geojson --scope {{YOUR_PROJECT_HRN}}
生の GeoJSON コンテンツをワークスペースデータとして使用する方法の詳細について は、このドキュメントを参照してください。
宛先カタログを作成します
宛先カタログに一意の識別子と名前を付け {{YOUR_USERNAME}}-geojson-copy-dpl-tutorial
ます ( 例 : ) 。
{{YOUR_DST_CATALOG_ID}}
次の例に示すように、独自の識別子で置き換え、前に示したソースカタログと同じ方法で宛先カタログと GeoJSON レイヤーを作成します。
olp catalog create {{YOUR_DST_CATALOG_ID}} "{{YOUR_USERNAME}} geojson example copy"
--summary "geojson example copy"
--description "geojson example copy"
--scope {{YOUR_PROJECT_HRN}}
olp catalog layer add {{YOUR_DST_CATALOG_HRN}} geojson "geojson example" \
--versioned \
--content-type=application/vnd.geo+json \
--partitioning=heretile:12 \
--coverage=DE \
--summary "geojson example copy"
--description "geojson example copy"
--scope {{YOUR_PROJECT_HRN}}
または、ソースカタログの GeoJSON レイヤーのレイヤー設定を JSON としてクエリし、それを使用して宛先カタログを設定することもできます。
olp catalog layer show {{YOUR_SRC_CATALOG_HRN}} geojson --json --scope {{YOUR_PROJECT_HRN}}
CLI は次のように戻ります。
{
"coverage": {"adminAreas": ["DE"]},
"summary": "geojson example",
"volume": {"volumeType": "Durable"},
"layerType": "Versioned",
"billingTags": [],
"name": "geojson example",
"contentEncoding": "",
"description": "geojson example",
"partitioning": {
"scheme": "heretile",
"tileLevels": [12]
},
"id": "geojson",
"contentType": "application/vnd.geo+json",
"tags": []
}
次に、出力を config.json
ファイルに追加して、 GeoJSON レイヤー設定でカタログを更新できます。
{
"id": {{YOUR_CATALOG_ID}},
"name": "{{YOUR_USERNAME}} geojson example copy",
"summary": "geojson example copy",
"description": "geojson example copy",
"layers": [
"// json output from the "olp catalog layer show" command above"
]
}
olp catalog update {{YOUR_DST_CATALOG_HRN}} --config config.json --scope {{YOUR_PROJECT_HRN}}
内部コンパイル状態を含むレイヤーを追加します。 データ プロセッシング ライブラリは、このレイヤーを後続の実行でのステートフル処理に使用します。 状態レイヤーの詳細について は、『開発者ガイド』を参照してください。
olp catalog layer add {{YOUR_DST_CATALOG_HRN}} state "internal compilation state" \
--content-type="application/octet-stream" \
--versioned --partitioning=generic \
--summary "internal compilation state" \
--description "internal compilation state" \
--scope {{YOUR_PROJECT_HRN}}
GeoJSON レイヤーの内容をコピーします
プロジェクトの次のフォルダー構造を作成します。
copy-geojson
└── src
└── main
├── java
└── resources
└── scala
単一 bash
のコマンドでこの操作を行うには、次の手順を実行
mkdir -p copy-geojson/src/main/{java,resources,scala}
この例の POM は、最初の Maven の例の POM と同じですが、 parent
AND dependencies
セクションが異なります。
<parent>
<groupId>com.here.platform</groupId>
<artifactId>sdk-batch-bom_2.12</artifactId>
<version>2.54.3</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>com.here.platform.data.processing</groupId>
<artifactId>batch-core_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.data.processing</groupId>
<artifactId>pipeline-runner_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.data.processing</groupId>
<artifactId>batch-catalog-dataservice_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.pipeline</groupId>
<artifactId>context-logging_${scala.compat.version}</artifactId>
</dependency>
</dependencies>
<dependencies>
<dependency>
<groupId>com.here.platform.data.processing</groupId>
<artifactId>batch-core-java_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.data.processing</groupId>
<artifactId>pipeline-runner-java_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.data.processing</groupId>
<artifactId>batch-catalog-dataservice_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.pipeline</groupId>
<artifactId>context-logging_${scala.compat.version}</artifactId>
</dependency>
</dependencies>
resources
フォルダーで、という名前のファイルを作成 application.conf
し、次の内容を入力します。
here.platform.data-processing.driver {
appName = "CopyCatalogScalaTutorial"
}
の次の内容を使用して、ロガーを設定 resources/log4j.properties
します。
log4j.rootLogger=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS} %-7p %c{1}: %m%n
log4j.logger.org.apache.spark=WARN
log4j.logger.org.spark_project=WARN
GeoJSON の内容をコピーする 1:1 コンパイラのコードは、次のとおりです。
import com.here.platform.data.processing.blobstore.{Payload, Retriever}
import com.here.platform.data.processing.build.BuildInfo
import com.here.platform.data.processing.catalog.Catalog
import com.here.platform.data.processing.catalog.Layer
import com.here.platform.data.processing.compiler._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.runner.pipeline.PipelineRunner
import com.here.platform.data.processing.driver.{
DriverBuilder,
DriverContext,
DriverSetupWithBuilder
}
import com.here.platform.pipeline.logging.{ContextLogging, LogContext}
import com.here.platform.data.processing.spark.partitioner.Partitioner
object CopyCatalogScala extends PipelineRunner with DriverSetupWithBuilder {
type Data = Array[Byte]
val srcCatalogId = Catalog.Id("src")
val geojsonLayerId = Layer.Id("geojson")
class Compiler(retriever: Retriever)
extends Direct1ToNCompiler[Data]
with InputLayers
with OutputLayers
with ContextLogging {
override def inLayers: Map[Catalog.Id, Set[Layer.Id]] = Map(srcCatalogId -> Set(geojsonLayerId))
override def outLayers: Set[Layer.Id] = Set(geojsonLayerId)
override def mappingFn(inKey: InKey): Iterable[OutKey] =
Iterable(inKey.copy(catalog = outCatalogId))
override def compileInFn(in: (InKey, InMeta)): Data = {
logger.info("compileInFn (" + in.key + ")")
retriever.getPayload(in.key, in.meta).content
}
override def compileOutFn(outKey: OutKey, intermediate: Data): Option[Payload] = {
logger.info("compileOutFn (" + outKey + ")")
Some(Payload(intermediate))
}
override def inPartitioner(parallelism: Int): Option[Partitioner[InKey]] = None
override def outPartitioner(parallelism: Int): Option[Partitioner[OutKey]] = None
}
val applicationVersion: String = BuildInfo.version
def configureCompiler(completeConfig: CompleteConfig,
context: DriverContext,
builder: DriverBuilder): builder.type = {
val taskBuilder = builder.newTaskBuilder("copycatalogscala")
val compiler = new Compiler(context.inRetriever(srcCatalogId))
builder.addTask(taskBuilder.withDirect1ToNCompiler(compiler).build())
}
}
import com.here.platform.data.processing.build.BuildInfo;
import com.here.platform.data.processing.driver.runner.pipeline.java.PipelineRunner;
import com.here.platform.data.processing.java.Pair;
import com.here.platform.data.processing.java.blobstore.Payload;
import com.here.platform.data.processing.java.blobstore.Retriever;
import com.here.platform.data.processing.java.catalog.partition.Key;
import com.here.platform.data.processing.java.catalog.partition.Meta;
import com.here.platform.data.processing.java.compiler.Direct1ToNCompiler;
import com.here.platform.data.processing.java.compiler.InputLayers;
import com.here.platform.data.processing.java.compiler.OutputLayers;
import com.here.platform.data.processing.java.driver.Default;
import com.here.platform.data.processing.java.driver.DriverBuilder;
import com.here.platform.data.processing.java.driver.DriverContext;
import com.here.platform.data.processing.java.driver.TaskBuilder;
import com.here.platform.data.processing.java.driver.config.CompleteConfig;
import com.here.platform.data.processing.java.spark.partitioner.PartitionerOfKey;
import com.here.platform.pipeline.logging.java.ContextAwareLogger;
import java.util.*;
class Compiler implements Direct1ToNCompiler<byte[]>, InputLayers, OutputLayers {
private String srcCatalogId = "src";
private String geojsonLayerId = "geojson";
private Retriever retriever;
private ContextAwareLogger contextAwareLogger;
Compiler(DriverContext driverContext) {
this.retriever = driverContext.inRetriever(srcCatalogId);
this.contextAwareLogger = new ContextAwareLogger(getClass());
}
@Override
public Map<String, Set<String>> inLayers() {
return Collections.unmodifiableMap(
new HashMap<String, Set<String>>() {
{
put(
srcCatalogId,
new HashSet<String>() {
{
add(geojsonLayerId);
}
});
}
});
}
@Override
public Set<String> outLayers() {
return Collections.unmodifiableSet(
new HashSet<String>() {
{
add(geojsonLayerId);
}
});
}
@Override
public Iterable<Key> mappingFn(Key inKey) {
return Collections.singletonList(
new Key(Default.OutCatalogId(), inKey.layer(), inKey.partition()));
}
@Override
public byte[] compileInFn(Pair<Key, Meta> in) {
contextAwareLogger.info("compileInFn (" + in.getKey() + ")");
return retriever.getPayload(in.getKey(), in.getValue()).content();
}
@Override
public Optional<Payload> compileOutFn(Key outKey, byte[] intermediate) {
contextAwareLogger.info("compileOutFn (" + outKey + ")");
return Optional.of(new Payload(intermediate));
}
@Override
public Optional<PartitionerOfKey> inPartitioner(int parallelism) {
return Optional.empty();
}
@Override
public Optional<PartitionerOfKey> outPartitioner(int parallelism) {
return Optional.empty();
}
}
public class CopyCatalogJava {
public static void main(String... args) {
new PipelineRunner() {
@Override
public String applicationVersion() {
return BuildInfo.version();
}
@Override
public DriverBuilder configureCompiler(
CompleteConfig completeConfig, DriverContext context, DriverBuilder builder) {
TaskBuilder taskBuilder = builder.newTaskBuilder("copycatalogjava");
Compiler compiler = new Compiler(context);
return builder.addTask(taskBuilder.withDirect1ToNCompiler(compiler, byte[].class).build());
}
}.main(args);
}
}
コピーをフルコンパイルとして実行します
アプリケーションをローカルで実行するには、次のコマンドを実行します。
mvn compile exec:java \
-Dexec.mainClass=CopyCatalogScala \
-Dpipeline.config.input-catalogs.src.hrn="{{YOUR_SRC_CATALOG_HRN}}" \
-Dpipeline.config.output-catalog.hrn="{{YOUR_DST_CATALOG_HRN}}" \
-Dpipeline.job.catalog-versions.input-catalogs.src.version=0 \
-Dpipeline.job.catalog-versions.input-catalogs.src.processing-type="reprocess" \
-Dexec.args="--master local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
mvn compile exec:java \
-Dexec.mainClass=CopyCatalogJava \
-Dpipeline.config.input-catalogs.src.hrn="{{YOUR_SRC_CATALOG_HRN}}" \
-Dpipeline.config.output-catalog.hrn="{{YOUR_DST_CATALOG_HRN}}" \
-Dpipeline.job.catalog-versions.input-catalogs.src.version=0 \
-Dpipeline.job.catalog-versions.input-catalogs.src.processing-type="reprocess" \
-Dexec.args="--master local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
ローカルジョブが完了したら、プラットフォームポータルでコピーされた内容を確認できます。 この操作を行うに は、ポータルに移動し、一覧で{{YOUR_DST_CATALOG_HRN}}
このカタログを探して開きます。 GeoJSON レイヤーを選択 して詳細を表示し、 [ 検査 ] タブでデータを検査します。
ソースカタログを更新します
新しい GeoJSON パーティションを作成 :
という名前のフォルダーを作成 source-partitions
します。
mkdir new-partitions
new-partitions
名前付きでファイルを作成 23618408
し、次の GeoJSON を使用してファイルに入力します。
{
"type": "FeatureCollection",
"features": [{
"type": "Feature",
"geometry": {
"type": "Polygon",
"coordinates": [[
[13.439860469791705, 52.61767449122147],
[13.418455741221472, 52.639079219791704],
[13.388184883778528, 52.639079219791704],
[13.366780155208295, 52.61767449122147],
[13.366780155208295, 52.58740363377853],
[13.388184883778528, 52.565998905208296],
[13.418455741221472, 52.565998905208296],
[13.439860469791705, 52.58740363377853],
[13.439860469791705, 52.61767449122147]
]]
},
"properties": {
"tooltip": "GREEN",
"style" : {
"color" : "#228B22",
"fill" : "#228B22",
"opacity": 1.0
},
"width": 5
}
}]
}
新しいパーティションをソースカタログの GeoJSON レイヤーにアップロードします。
olp catalog layer partition put {{YOUR_SRC_CATALOG_HRN}} geojson --input=./new-partitions --scope {{YOUR_PROJECT_HRN}}
CLI は次のように戻ります。
Partition 23618408 was successfully uploaded.
ローカルジョブが完了したら、プラットフォームポータルでコピーされた内容を確認できます。 この操作を行うに は、ポータルに移動し、一覧で{{YOUR_SRC_CATALOG_HRN}}
このカタログを探して開きます。 GeoJSON レイヤーを選択 して詳細を表示し、 [ 検査 ] タブでデータを検査します。
インクリメンタル・コンパイルを実行します
アプリケーションをローカルで実行するには、次のコマンドを実行します。
mvn compile exec:java \
-Dexec.mainClass=CopyCatalogScala \
-Dpipeline.config.input-catalogs.src.hrn="{{YOUR_SRC_CATALOG_HRN}}" \
-Dpipeline.config.output-catalog.hrn="{{YOUR_DST_CATALOG_HRN}}" \
-Dpipeline.job.catalog-versions.input-catalogs.src.since-version=0 \
-Dpipeline.job.catalog-versions.input-catalogs.src.version=1 \
-Dpipeline.job.catalog-versions.input-catalogs.src.processing-type="changes" \
-Dexec.args="--master local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
mvn compile exec:java \
-Dexec.mainClass=CopyCatalogJava \
-Dpipeline.config.input-catalogs.src.hrn="{{YOUR_SRC_CATALOG_HRN}}" \
-Dpipeline.config.output-catalog.hrn="{{YOUR_DST_CATALOG_HRN}}" \
-Dpipeline.job.catalog-versions.input-catalogs.src.since-version=0 \
-Dpipeline.job.catalog-versions.input-catalogs.src.version=1 \
-Dpipeline.job.catalog-versions.input-catalogs.src.processing-type="changes" \
-Dexec.args="--master local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
ログ出力には、次のような行が表示されます。 compileInFn
およびの間には、新しいパーティションのみが検出されます compileOutFn
。
2018/07/09 17:17:44.015 INFO Direct1ToNCompilerExecutor: Incremental compilation execution // Task=copycatalogscala
2018/07/09 17:17:45.290 INFO CopyCatalogScala$Compiler: compileInFn (Key('src,'geojson,23618408))
2018/07/09 17:17:45.655 INFO CopyCatalogScala$Compiler: compileOutFn (Key('output,'geojson,23618408))
2018/07/09 17:27:09.383 INFO Direct1ToNCompilerExecutor: Incremental compilation execution // Task=copycatalogjava
2018/07/09 17:27:11.070 INFO Compiler: compileInFn (Key('src,'geojson,23618408))
2018/07/09 17:27:11.913 INFO Compiler: compileOutFn (Key('output,'geojson,23618408))
ローカルジョブが完了すると、ポータルのコピーされた内容を https://platform.here.com/data//geojson/inspect
で確認できます ( 実際のカタログ HERE リソースネーム YOUR_DST_CATALOG_HRN
と置き換えます ) 。
注
ソースカタログのバージョン 0 からフルコンパイルを再実行することもできます。 これにより、新しいパーティションが宛先カタログから効果的に削除され、ソースカタログのバージョン 0 と同じレイヤーコンテンツに復元されます。
同様に、インクリメンタル・コンパイルを使用して、デスティネーション・カタログの任意の状態にソース・カタログのバージョン 0 とバージョン 1 の間のデルタを適用できます。 つまり、空の宛先カタログがある場合、前述のようにインクリメンタル・コンパイルを実行することで、ソース・カタログのバージョン 0 とバージョン 1 の間に追加された 1 つの新しいパーティションのみをコピーできます。
このチュートリアルで扱うトピックの詳細については、次のソースを参照してください。 データ プロセッシング ライブラリ開発者ガイド。