同じカタログの複数のバージョンを処理しています

データ プロセッシング ライブラリを使用すると、同じカタログの複数のバージョンを操作して、たとえばこれらのバージョン間の変更を比較および処理できます。 パイプラインジョブ設定で指定されたカタログのバージョンに加えて、コンパイラーは、パイプラインの以前の実行で使用されていた同じカタログのバージョンにアクセスできます。 より正確に言えば、同じ出力カタログの古いバージョンをコンパイルするために使用されたすべてのバージョンにアクセスできます。

データ プロセッシング ライブラリのカタログバージョン

入力カタログのジョブバージョンは 、パイプラインジョブ設定によってバッチ パイプラインに提供されたカタログのバージョンです。 ジョブ設定で指定されたカタログ ID は、常にカタログのジョブバージョンを参照します。 パイプラインがすべてのバージョンの入力カタログを少なくとも 1 回または 1 回は処理するという保証はありません。 次の図に、入力カタログのバージョン 1 、 3 、 4 、および 6 がスキップされ、バージョン 2 がパイプラインによって 2 回処理されます。この処理は、パイプラインが手動でトリガーされた場合、またはパイプラインの別の入力カタログが変更された場合に発生することがあります。

カタログバージョンにアクセスして生成するパイプラインジョブの例
カタログバージョンにアクセスして生成するパイプラインジョブの例

データ プロセッシング ライブラリでは、同じパイプラインの以前の正常な実行で使用された入力カタログのバージョンにアクセスできます。 これらのバージョンには、を使用してアクセスでき PreviousRunIdます。 たとえば、Default.PreviousRunId(inputA)は、ジョブ設定で指定されたカタログ ID inputAからをPreviousRunId派生し、前の実行で使用されたinputAのバージョンを参照します。 次の図は、が PreviousRunId 参照するバージョンを示しています。

「 PreviousRunId 」が使用されている場合に、コンパイルに含まれるカタログバージョン
「 PreviousRunId 」が使用されている場合に、コンパイルに含まれるカタログバージョン

パイプラインがまだ実行されていないために参照されているバージョンが存在しない場合、は PreviousRunId 空のカタログを参照します。 上記の例では PreviousRunId 、パイプラインが初めて実行されるため、最初のパイプラインジョブでは空のカタログが参照されます。

次の図は、前の実行が失敗した場合には無視されることを示しています。 出力カタログバージョンの公開に成功した場合にのみ、以前の実行を使用できます。

「 PreviousRunId 」は、新しい出力カタログバージョンを公開していない、失敗した以前の実行を無視します
「 PreviousRunId 」は、新しい出力カタログバージョンを公開していない、失敗した以前の実行を無視します

上記の例では、 1 PreviousRunId つの入力カタログと 1 つのランのみが表示されています。 当然のことながら、表示されている HERE の概念は任意の数の入力カタログに拡張され、任意の数のランに戻ります。 たとえば、Default.PreviousRunId(inputA, 2)は、現在の実行よりも前に処理された 2 つの実行inputAのバージョンを示します。デフォルトは 1 です。 そのため、最初の 2 つのランでは、このカタログ ID を使用して空のカタログにアクセスします。

同様に、この機能を使用して、以前のバージョンの出力カタログにアクセスすることもできます。 たとえば、カタログ ID Default.PreviousRunId(Default.OutCatalogId) は、出力カタログの前回の正常な実行によって生成されたバッチ パイプラインを参照します。 このカタログ ID の短縮形は Default.FeedbackCatalogId、ステートフル処理に使用されるフィードバックカタログ ID です。

メモ ステートフル処理と同様に、処理ライブラリのすべてのコンパイルパターンは、入力または出力カタログの前のバージョンにアクセスした場合にも適用されます。 さらに、この機能は、概念、機能に影響を与えるものではなく、アプリケーションが実行されている環境 ( 通常は HERE platform ) で特別な設定を行う必要もありません。

カタログビュー

CatalogView特定のバージョンのカタログにアクセスするには、を使用します。 CatalogViewは、 1 つのバージョンのカタログにアクセスできる、Catalogインターフェースの制限付きバリアントに対応しています。 DriverContextは、PreviousRunIdを含む任意の入力カタログ ID のCatalogViewを取得するために使用できます

メモDriverContextinRetrievers方法を使用して、PreviousRunIdRetrieverを取得 できます。

例 : カタログバージョンの比較

この例では、データ処理ライブラリが同じカタログの複数のバージョンにアクセスできる機能を使用して、入力カタログの 2 つのバージョンの違いを計算するコンパイラーを作成する方法を示します。

topology-geometry HERE Map Content カタログのレイヤーには、特に一意の ID を持つ道路トポロジセグメントと、座標系の形式でのジオメトリが含まれています。 この例では、 2 つの異なるバージョンのレイヤーを読み込み、これらの 2 つのバージョンの間で追加、削除、または変更されたすべてのセグメントの ID を含む JSON ドキュメントを出力します。

入力レイヤーを設定しています

コンパイラーで以前のバージョンを処理するには、次のスニペットに示すように、inLayers構成でPreviousRunIdを使用します。

object In {
  val Rib = Catalog.Id("rib")
  val PreviousRib = Default.PreviousRunId(Rib)
  val LayerName = Layer.Id("topology-geometry")
}

def inLayers = Map(
  In.Rib -> Set(In.LayerName),
  In.PreviousRib -> Set(In.LayerName)
)

取得者にアクセスしています

を使用 DriverContext して、カタログのバージョンごとに異なるカタログビューにアクセスします。 各カタログビューでは、取得者にアクセス権が付与されます。

private val previousRetriever: Retriever = context.inCatalogView(In.PreviousRib).retriever
private val currentRetriever: Retriever = context.inCatalogView(In.Rib).retriever

パーティションのグループ化

同じパーティション名のパーティションを比較するため compileIn 、これらのパーティションにはコンパイルのフェーズで同じ出力キーを割り当てる必要があります。 各入力パーティションは、そのパーティション名に基づいて正確に 1 つの出力パーティションにマッピングされるので、を使用 DirectMToNCompiler できます(詳細については、を参照してください)。 は mappingFn 、入力キーのカタログ ID を置き換えて正しい出力キーを作成するだけで実装 compileIn されます。関数は ID 関数です。

override def mappingFn(inKey: Key): Iterable[OutKey] =
  Iterable(inKey.copy(catalog = Default.OutCatalogId))

override def compileInFn(in: (Key, Meta)): (Key, Meta) = in

DirectMToNCompilerの詳細について は、 compilation patterns を参照してください。

パーティションの比較

compileOut コンパイルのフェーズでは、取得元を使用してパーティションの両方のバージョンをロードします。 これらの両方のパーティションのKeyおよびMetaは、対応するOutKeyの中間データにあります。 次のスニペットでは Segment 、セグメント ID から、パーティションの以前のバージョンと最新バージョンの両方のオブジェクトへのマップをロードします。パーティションのいずれかが存在しない場合は、空のマップを使用します。 次に、追加、削除、または変更されたセグメントのセットを取得します。

def getSegments(retriever: Retriever)(keyMeta: (Key, Meta)): Map[String, Segment] = {
  val partition =
    TopologyGeometryPartition.parseFrom(retriever.getPayload(keyMeta.key, keyMeta.meta).content)
  partition.segment.map(x => (x.identifier, x))(scala.collection.breakOut)
}

def compileOutFn(outKey: OutKey, intermediate: Iterable[(Key, Meta)]): Option[Payload] = {

  val previousSegments =
    intermediate
      .find(_.key.catalog == In.PreviousRib)
      .map(getSegments(previousRetriever))
      .getOrElse(Map.empty)
  val latestSegments =
    intermediate
      .find(_.key.catalog == In.Rib)
      .map(getSegments(currentRetriever))
      .getOrElse(Map.empty)

  val addedSegments = latestSegments.keySet -- previousSegments.keySet
  val removedSegments = previousSegments.keySet -- latestSegments.keySet
  val modifiedSegments =
    (previousSegments.keySet intersect latestSegments.keySet).filter { segmentId =>
      previousSegments(segmentId).geometry != latestSegments(segmentId).geometry
    }

  ??? // TODO: Produce output payload from addedSegments, removedSegments, modifiedSegments
}

この例の完全なバージョンが examples/data-processing/scala/heremapcontent-difftool SDK のディレクトリに含まれています。

カタログの固定バージョンを処理しています

データ プロセッシング ライブラリには、 FixedVersionId 現在または以前の実行でパイプラインジョブ設定で提供されているバージョンとは関係なく、常に特定のカタログの同じバージョンを参照するがあります。 FixedVersionIdたとえば、次のようにを使用できます。 現在のバージョンのカタログのデータを、修正された以前のバージョンのカタログに関連付けます。 たとえば、は Default.FixedVersionId(inputA, 4) カタログ ID のバージョン 4 を参照 inputAします。 もちろん、に渡されるバージョン Default.FixedVersionId は定数である必要はありませんが、別の入力カタログや設定ファイルなどから読み取られた値にすることもできます。

FixedVersionIdPreviousRunIdとまったく同じように使用できます。コンパイラーの入力レイヤーを宣言し、DriverContextを使用してそのためのCatalogViewを取得する場合に使用できます。

: 同じコンパイラーの 2 つの後続の実行で、異なる固定バージョンのカタログが使用されている場合 ( たとえば、バージョンが構成ファイルから読み取られた場合 ) 、 2 番目の実行増分コンパイルでは無効になります。 データ プロセッシング ライブラリ Fingerprint は、カタログに使用されているすべての固定バージョンのセットのを登録することで、このことを自動的に確認します。 固定バージョンのセットが 2 つの実行の間で同じままで、コンパイラーロジックでのバージョンの使用方法が変更された場合 Fingerprint、インクリメンタル・コンパイルは自動的には無効になりません。これは、が同じままであるためです。 このような場合 Fingerprint は、処理ロジックの変更をトリガーした外部ソースのを手動で追加してください。

」に一致する結果は 件です

    」に一致する結果はありません