Spark での Hadoop FS サポートを使用したオブジェクト ストア レイヤーの読み取りおよび書き込み
目的: Hadoop FS サポートを使用して、 Spark を使用してオブジェクト ストア レイヤーにデータを読み書きする方法について理解します。
複雑さ: 初心者向け
所要時間: 30 分
前提条件: プロジェクトでの作業を整理します
ソースコード: ダウンロード
このチュートリアルの例では 、データ クライアント ライブラリが提供する Hadoop FS サポートコンポーネントの使用方法を示します。 これにより、 Apache Spark などの標準ツールを使用して、オブジェクト ストア レイヤーに保存されているデータに最小限のカスタマイズコードでアクセスできるようになります。
このチュートリアルでは、次の手順を実行します。
- オブジェクト ストア レイヤーを使用してカタログを作成します。
- 形式でテストデータを生成するアプリケーションを作成
parquet
します。 - hadoop-fs-support ライブラリを使用して、テストデータをオブジェクト ストア レイヤーに書き込みます。
- hadoop-fs-support ライブラリを使用して、オブジェクト ストア レイヤーからテストデータを読み取ります。
- CLI を使用して、オブジェクト ストア レイヤーに保存されているデータにアクセスします。
準備ステップとして、カタログタイプ(オブジェクト ストア レイヤー type )を使用してカタログを作成する必要があります。
Maven プロジェクトを設定します
プロジェクトの次のフォルダー構造を作成します。
hadoop-fs-support-spark-pipeline
└── src
└── main
├── java
└── resources
└── scala
この操作は、次の bash
1 つのコマンドで実行できます。
mkdir -p hadoop-fs-support-spark-pipeline/src/main/{java,resources,scala}
カタログを作成します
カタログを作成する必要があります。 これを行う には、「 OLP コマンド ライン インターフェース ( CLI ) を使用してプロジェクトで作業を整理する」で説明されている手順に従います。
カタログの一意の識別子名を使用します。 たとえば、 {{YOUR_USERNAME}}-hadoop-fs-support-spark-pipeline
です。
hadoop-fs-support-spark-pipeline.json
次の内容のファイルを作成 {{YOUR_CATALOG_ID}}
し、を選択した識別子に置き換えます。
{
"id": "hadoop-fs-support-spark-pipeline-catalog",
"name": "Tutorial for reading and writing data to Object store layer using Hadoop FS Support",
"summary": "Tutorial for reading and writing data to Object store layer using Hadoop FS Support",
"description": "Tutorial for reading and writing data to Object store layer using Hadoop FS Support",
"tags": ["Hadoop FS Support", "Object store"],
"layers": [
{
"id": "parquet",
"name": "parquet-layer",
"summary": "Simulated data.",
"description": "Simulated parquet data to demonstrate usability of Object store layer",
"tags": ["Hadoop FS Support", "Object store"],
"layerType": "objectstore",
"volume": {
"volumeType": "durable"
}
}
]
}
注
レルムで請求タグが必要な場合 は、layer
セクションにbillingTags: ["YOUR_BILLING_TAG"]
プロパティを追加して設定 ファイルを更新します。
{{YOUR_CATALOG_ID}}
を自分の識別子に置き換えます。 また {{YOUR_PROJECT_HRN}}
、 [ プロジェクトで作業内容を整理] の HERE リソースネーム で置き換え、次のコマンドを実行します。
#!/usr/bin/env bash
set -o nounset -o errexit -o xtrace
olp catalog create {{YOUR_CATALOG_ID}} \
"Tutorial for reading and writing data to Object store layer using Hadoop FS Support ({{YOUR_USERNAME}})" \
--config hadoop-fs-support-spark-pipeline.json \
--scope {{YOUR_PROJECT_HRN}}
パイプライン設定をセットアップします
- という名前のファイルを作成
pipeline-config.conf
し、次のスニペットを入力します。 - ` を、 「プロジェクトでの作業の整理」で作成したカタログの HERE リソースネーム で置き換えます。
このチュートリアルは、パイプライン環境外で実行することもできます。 唯一の違い catalogHrn
は、アプリケーションコードが、カタログ作成ステップから受け取った HERE リソースネーム での値を設定する必要があることです。
pipeline.config {
output-catalog {hrn = "hrn:here:data::olp-here:dummy"}
input-catalogs {
objectStoreCatalog { hrn = "{{YOUR_CATALOG_HRN}}" }
}
}
Maven でプロジェクトを設定します
Spark を使用してパイプラインで実行されるアプリケーションを開発するには sdk-batch-bom_2.12
、をアプリケーションの親ポムとして使用します。
<parent>
<groupId>com.here.platform</groupId>
<artifactId>sdk-batch-bom_2.12</artifactId>
<version>2.54.3</version>
<relativePath/>
</parent>
Scala と Java の依存関係を調整します。
<dependencies>
<dependency>
<groupId>com.here.platform.pipeline</groupId>
<artifactId>pipeline-interface_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.data.client</groupId>
<artifactId>hadoop-fs-support_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
</dependencies>
アプリケーションを実装します
このアプリケーションは、 Hadoop FS サポートを使用してオブジェクト ストア レイヤーに寄木細工のデータを書き込み、簡単な spark-SQL クエリを実行して、インデックス列に偶数の値が設定されている行を選択します。
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.slf4j.LoggerFactory
object HadoopFsSupportSparkScala {
def main(args: Array[String]): Unit = {
val logger = LoggerFactory.getLogger(HadoopFsSupportSparkScala.getClass)
val pipelineContext = new PipelineContext
val catalogHrn = pipelineContext.config.inputCatalogs("objectStoreCatalog").toString
val layerId = "parquet"
val sparkSession =
SparkSession
.builder()
.appName("HadoopFsSupportSparkScala")
.getOrCreate()
val dataFrame = generateDataFrame(sparkSession)
val parquetDir = "parquet-dir"
logger.info(s"Writing parquet files at: blobfs://$catalogHrn:$layerId/$parquetDir")
dataFrame.write
.parquet(s"blobfs://$catalogHrn:$layerId/$parquetDir")
logger.info(s"Reading parquet files at: blobfs://$catalogHrn:$layerId/$parquetDir")
val parquetData: DataFrame =
sparkSession.read
.parquet(s"blobfs://$catalogHrn:$layerId/$parquetDir")
.select("index")
.where("tileId % 2 == 0")
.sort("index")
val parquetArray: Array[Row] = parquetData.collect()
printDfArray(parquetArray)
}
private def generateDataFrame(sparkSession: SparkSession): DataFrame = {
import sparkSession.implicits._
val csvData: Dataset[String] = sparkSession.sparkContext.parallelize("""
|index,tileId
|0,0
|1,1
|2,3
|3,6
|4,4
|5,5
|6,7
|7,2
|8,8
|9,10
""".stripMargin.lines.toList).toDS
sparkSession.read
.option("header", value = true)
.option("timestampFormat", "MM/dd/yyyy")
.option("inferSchema", value = true)
.csv(csvData)
}
private def printDfArray(rowArray: Array[Row]): Unit =
for (row <- rowArray) {
for (i <- 0 until row.size) {
println(row.get(i))
}
}
}
import com.here.platform.pipeline.PipelineContext;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HadoopFsSupportSparkJava {
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger(HadoopFsSupportSparkJava.class);
PipelineContext pipelineContext = new PipelineContext();
String catalogHrn =
pipelineContext.getConfig().getInputCatalogs().get("objectStoreCatalog").toString();
String layerId = "parquet";
SparkSession sparkSession =
SparkSession.builder().appName("HadoopFsSupportSparkJava").getOrCreate();
Dataset<Row> dataFrame = generateDataFrame(sparkSession);
String parquetDir = "parquet-dir";
logger.info(
"Writing parquet files at: blobfs://" + catalogHrn + ":" + layerId + "/" + parquetDir);
dataFrame.write().parquet("blobfs://" + catalogHrn + ":" + layerId + "/" + parquetDir);
logger.info(
"Reading parquet files at: blobfs://" + catalogHrn + ":" + parquetDir + "/" + parquetDir);
Dataset<Row> parquetData =
sparkSession
.read()
.parquet("blobfs://" + catalogHrn + ":" + layerId + "/" + parquetDir)
.select("index")
.where("tileId % 2 == 0")
.sort("index");
List<Row> parquetArray = parquetData.collectAsList();
printDfArray(parquetArray);
}
private static Dataset<Row> generateDataFrame(SparkSession sparkSession) {
String csvString =
String.join(
"\n",
"index,tileId",
"0,0",
"1,1",
"2,3",
"3,6",
"4,4",
"5,5",
"6,7",
"7,2",
"8,8",
"9,10");
ArrayList<String> data = new ArrayList<>(Arrays.asList(csvString.split("\n")));
Dataset<String> csvData = sparkSession.createDataset(data, Encoders.STRING());
return sparkSession
.read()
.option("header", true)
.option("timestampFormat", "MM/dd/yyyy")
.option("inferSchema", true)
.csv(csvData);
}
private static void printDfArray(List<Row> rowArray) {
for (Row row : rowArray) {
for (int i = 0; i < row.length(); i++) {
System.out.println(row.get(i));
}
}
}
}
アプリケーションを実行します
アプリケーションをローカルで実行するには、次のコマンドを使用します。
mvn compile exec:java \
-Dexec.mainClass=HadoopFsSupportSparkScala \
-Dpipeline-config.file=pipeline-config.conf \
-Dspark.master="local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
mvn compile exec:java \
-Dexec.mainClass=HadoopFsSupportSparkJava \
-Dpipeline-config.file=pipeline-config.conf \
-Dspark.master="local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
-Dhere.platform.data-client.request-signer.credentials.here-account.*
パラメーターのセットを考慮してください。 credentials.properties
このパラメータは、ファイルから {{YOUR_PROJECT_HRN}}
データ クライアント ライブラリにデータを渡すために指定します。 データ クライアント ライブラリの初期化の詳細について は、「 Java システムのプロパティを使用して資格情報を設定する」を参照してください。
CLI を使用してデータにアクセスします
カタログ作成ステップで作成したカタログの HERE リソースネーム を使用して、オブジェクト ストア レイヤーの内容を確認できます。
olp catalog layer object list {{YOUR_CATALOG_HRN}} parquet --key parquet-dir
CLI の出力はと類似している必要があります ( 正確な値ではなく、出力例を次に示します ) 。
name keyType lastModified size
parquet-dir/_SUCCESS object 2021-01-12T19:23:47Z 0
parquet-dir/part-00000-5bf23e5f-53be-4c83-917c-69ca6e042934-c000.snappy.parquet object 2021-01-12T19:23:39Z 353
...
特定のキーによって参照されているオブジェクト ストア レイヤーに保存されているデータをダウンロードできます。
olp catalog layer object get {{YOUR_CATALOG_HRN}} parquet --key parquet-dir/part-00000-5bf23e5f-53be-4c83-917c-69ca6e042934-c000.snappy.parquet
このチュートリアルで扱うトピックの詳細については、次のソースを参照してください。