Spark での Hadoop FS サポートを使用したオブジェクト ストア レイヤーの読み取りおよび書き込み

目的: Hadoop FS サポートを使用して、 Spark を使用してオブジェクト ストア レイヤーにデータを読み書きする方法について理解します。

複雑さ: 初心者向け

所要時間: 30 分

前提条件: プロジェクトでの作業を整理します

ソースコード: ダウンロード

このチュートリアルの例では 、データ クライアント ライブラリが提供する Hadoop FS サポートコンポーネントの使用方法を示します。 これにより、 Apache Spark などの標準ツールを使用して、オブジェクト ストア レイヤーに保存されているデータに最小限のカスタマイズコードでアクセスできるようになります。

このチュートリアルでは、次の手順を実行します。

  1. オブジェクト ストア レイヤーを使用してカタログを作成します。
  2. 形式でテストデータを生成するアプリケーションを作成 parquet します。
  3. hadoop-fs-support ライブラリを使用して、テストデータをオブジェクト ストア レイヤーに書き込みます。
  4. hadoop-fs-support ライブラリを使用して、オブジェクト ストア レイヤーからテストデータを読み取ります。
  5. CLI を使用して、オブジェクト ストア レイヤーに保存されているデータにアクセスします。

準備ステップとして、カタログタイプ(オブジェクト ストア レイヤー type )を使用してカタログを作成する必要があります。

Maven プロジェクトを設定します

プロジェクトの次のフォルダー構造を作成します。

hadoop-fs-support-spark-pipeline
└── src
    └── main
        ├── java
        └── resources
        └── scala

この操作は、次の bash 1 つのコマンドで実行できます。

mkdir -p hadoop-fs-support-spark-pipeline/src/main/{java,resources,scala}

カタログを作成します

カタログを作成する必要があります。 これを行う には、「 OLP コマンド ライン インターフェース ( CLI ) を使用してプロジェクトで作業を整理する」で説明されている手順に従います。

カタログの一意の識別子名を使用します。 たとえば、 {{YOUR_USERNAME}}-hadoop-fs-support-spark-pipelineです。

hadoop-fs-support-spark-pipeline.json 次の内容のファイルを作成 {{YOUR_CATALOG_ID}} し、を選択した識別子に置き換えます。

{
  "id": "hadoop-fs-support-spark-pipeline-catalog",
  "name": "Tutorial for reading and writing data to Object store layer using Hadoop FS Support",
  "summary": "Tutorial for reading and writing data to Object store layer using Hadoop FS Support",
  "description": "Tutorial for reading and writing data to Object store layer using Hadoop FS Support",
  "tags": ["Hadoop FS Support", "Object store"],
  "layers": [
    {
      "id": "parquet",
      "name": "parquet-layer",
      "summary": "Simulated data.",
      "description": "Simulated parquet data to demonstrate usability of Object store layer",
      "tags": ["Hadoop FS Support", "Object store"],
      "layerType": "objectstore",
      "volume": {
        "volumeType": "durable"
      }
    }
  ]
}

レルムで請求タグが必要な場合 は、layerセクションにbillingTags: ["YOUR_BILLING_TAG"]プロパティを追加して設定 ファイルを更新します。

{{YOUR_CATALOG_ID}} を自分の識別子に置き換えます。 また {{YOUR_PROJECT_HRN}} 、 [ プロジェクトで作業内容を整理] の HERE リソースネーム で置き換え、次のコマンドを実行します。

#!/usr/bin/env bash
set -o nounset -o errexit -o xtrace

### [catalog]
olp catalog create {{YOUR_CATALOG_ID}} \
    "Tutorial for reading and writing data to Object store layer using Hadoop FS Support ({{YOUR_USERNAME}})" \
    --config hadoop-fs-support-spark-pipeline.json \
    --scope {{YOUR_PROJECT_HRN}}
### [catalog]

パイプライン設定をセットアップします

  1. という名前のファイルを作成 pipeline-config.confし、次のスニペットを入力します。
  2. ` を、 「プロジェクトでの作業の整理」で作成したカタログの HERE リソースネーム で置き換えます。

このチュートリアルは、パイプライン環境外で実行することもできます。 唯一の違い catalogHrn は、アプリケーションコードが、カタログ作成ステップから受け取った HERE リソースネーム での値を設定する必要があることです。

pipeline.config {

  output-catalog {hrn = "hrn:here:data::olp-here:dummy"}

  input-catalogs {
      objectStoreCatalog { hrn = "{{YOUR_CATALOG_HRN}}" }
  }
}

Maven でプロジェクトを設定します

Spark を使用してパイプラインで実行されるアプリケーションを開発するには sdk-batch-bom_2.12 、をアプリケーションの親ポムとして使用します。

<parent>
    <groupId>com.here.platform</groupId>
    <artifactId>sdk-batch-bom_2.12</artifactId>
    <version>2.54.3</version>
    <relativePath/>
</parent>

Scala と Java の依存関係を調整します。

<dependencies>
    <dependency>
        <groupId>com.here.platform.pipeline</groupId>
        <artifactId>pipeline-interface_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.platform.data.client</groupId>
        <artifactId>hadoop-fs-support_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
    </dependency>
</dependencies>

アプリケーションを実装します

このアプリケーションは、 Hadoop FS サポートを使用してオブジェクト ストア レイヤーに寄木細工のデータを書き込み、簡単な spark-SQL クエリを実行して、インデックス列に偶数の値が設定されている行を選択します。

Scala
Java
/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.slf4j.LoggerFactory

object HadoopFsSupportSparkScala {

  def main(args: Array[String]): Unit = {

    val logger = LoggerFactory.getLogger(HadoopFsSupportSparkScala.getClass)

    val pipelineContext = new PipelineContext
    val catalogHrn = pipelineContext.config.inputCatalogs("objectStoreCatalog").toString
    val layerId = "parquet"

    val sparkSession =
      SparkSession
        .builder()
        .appName("HadoopFsSupportSparkScala")
        .getOrCreate()

    val dataFrame = generateDataFrame(sparkSession)

    // Write data frame as parquet at random directory location in object store layer
    val parquetDir = "parquet-dir"
    logger.info(s"Writing parquet files at: blobfs://$catalogHrn:$layerId/$parquetDir")
    dataFrame.write
      .parquet(s"blobfs://$catalogHrn:$layerId/$parquetDir")

    // Read parquet data from object store layer, select only those rows where index column has and even value
    logger.info(s"Reading parquet files at: blobfs://$catalogHrn:$layerId/$parquetDir")
    val parquetData: DataFrame =
      sparkSession.read
        .parquet(s"blobfs://$catalogHrn:$layerId/$parquetDir")
        .select("index")
        .where("tileId % 2 == 0")
        .sort("index")
    val parquetArray: Array[Row] = parquetData.collect()

    printDfArray(parquetArray)

  }

  // Generate test data in csv format
  private def generateDataFrame(sparkSession: SparkSession): DataFrame = {
    import sparkSession.implicits._
    val csvData: Dataset[String] = sparkSession.sparkContext.parallelize("""
                                                                           |index,tileId
                                                                           |0,0
                                                                           |1,1
                                                                           |2,3
                                                                           |3,6
                                                                           |4,4
                                                                           |5,5
                                                                           |6,7
                                                                           |7,2
                                                                           |8,8
                                                                           |9,10
  """.stripMargin.lines.toList).toDS

    sparkSession.read
      .option("header", value = true)
      .option("timestampFormat", "MM/dd/yyyy")
      .option("inferSchema", value = true)
      .csv(csvData)
  }

  // Print data frame
  private def printDfArray(rowArray: Array[Row]): Unit =
    for (row <- rowArray) {
      for (i <- 0 until row.size) {
        println(row.get(i))
      }
    }

}

/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.platform.pipeline.PipelineContext;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HadoopFsSupportSparkJava {

  public static void main(String[] args) {
    Logger logger = LoggerFactory.getLogger(HadoopFsSupportSparkJava.class);

    PipelineContext pipelineContext = new PipelineContext();
    String catalogHrn =
        pipelineContext.getConfig().getInputCatalogs().get("objectStoreCatalog").toString();
    String layerId = "parquet";

    SparkSession sparkSession =
        SparkSession.builder().appName("HadoopFsSupportSparkJava").getOrCreate();

    Dataset<Row> dataFrame = generateDataFrame(sparkSession);

    // Write data frame as parquet at random directory location in object store layer
    String parquetDir = "parquet-dir";
    logger.info(
        "Writing parquet files at: blobfs://" + catalogHrn + ":" + layerId + "/" + parquetDir);
    dataFrame.write().parquet("blobfs://" + catalogHrn + ":" + layerId + "/" + parquetDir);

    // Read parquet data from object store layer, select only those rows where index column has and
    // even value
    logger.info(
        "Reading parquet files at: blobfs://" + catalogHrn + ":" + parquetDir + "/" + parquetDir);
    Dataset<Row> parquetData =
        sparkSession
            .read()
            .parquet("blobfs://" + catalogHrn + ":" + layerId + "/" + parquetDir)
            .select("index")
            .where("tileId % 2 == 0")
            .sort("index");
    List<Row> parquetArray = parquetData.collectAsList();

    printDfArray(parquetArray);
  }

  // Generate test data in csv format
  private static Dataset<Row> generateDataFrame(SparkSession sparkSession) {
    String csvString =
        String.join(
            "\n",
            "index,tileId",
            "0,0",
            "1,1",
            "2,3",
            "3,6",
            "4,4",
            "5,5",
            "6,7",
            "7,2",
            "8,8",
            "9,10");

    ArrayList<String> data = new ArrayList<>(Arrays.asList(csvString.split("\n")));

    Dataset<String> csvData = sparkSession.createDataset(data, Encoders.STRING());

    return sparkSession
        .read()
        .option("header", true)
        .option("timestampFormat", "MM/dd/yyyy")
        .option("inferSchema", true)
        .csv(csvData);
  }

  // Print data frame
  private static void printDfArray(List<Row> rowArray) {
    for (Row row : rowArray) {
      for (int i = 0; i < row.length(); i++) {
        System.out.println(row.get(i));
      }
    }
  }
}

アプリケーションを実行します

アプリケーションをローカルで実行するには、次のコマンドを使用します。

Scala を実行します
Java を実行します

mvn compile exec:java \
    -Dexec.mainClass=HadoopFsSupportSparkScala \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dspark.master="local[*]" \
    -Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
    

mvn compile exec:java \
    -Dexec.mainClass=HadoopFsSupportSparkJava \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dspark.master="local[*]" \
    -Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}

-Dhere.platform.data-client.request-signer.credentials.here-account.* パラメーターのセットを考慮してください。 credentials.properties このパラメータは、ファイルから {{YOUR_PROJECT_HRN}} データ クライアント ライブラリにデータを渡すために指定します。 データ クライアント ライブラリの初期化の詳細について は、「 Java システムのプロパティを使用して資格情報を設定する」を参照してください。

CLI を使用してデータにアクセスします

カタログ作成ステップで作成したカタログの HERE リソースネーム を使用して、オブジェクト ストア レイヤーの内容を確認できます。

olp catalog layer object list {{YOUR_CATALOG_HRN}} parquet --key parquet-dir

CLI の出力はと類似している必要があります ( 正確な値ではなく、出力例を次に示します ) 。

name                                    keyType                       lastModified                                      size                
parquet-dir/_SUCCESS                    object                        2021-01-12T19:23:47Z                              0                   
parquet-dir/part-00000-5bf23e5f-53be-4c83-917c-69ca6e042934-c000.snappy.parquet object                        2021-01-12T19:23:39Z                              353                 
...

特定のキーによって参照されているオブジェクト ストア レイヤーに保存されているデータをダウンロードできます。

olp catalog layer object get {{YOUR_CATALOG_HRN}} parquet --key parquet-dir/part-00000-5bf23e5f-53be-4c83-917c-69ca6e042934-c000.snappy.parquet

詳細情報

このチュートリアルで扱うトピックの詳細については、次のソースを参照してください。

」に一致する結果は 件です

    」に一致する結果はありません