こんにちは。今日は、Databricksの自動ローダー機能について勉強を進めたので、理解したことなどをメモしておきたいと思います。
なお、私もDatabricks、自動ローダー機能ともに調べながら理解を深めているなので、記載している内容に誤りがあるかもしれませんが、ご了承ください・・・コメントで指摘いただけると助かります。
Contents
自動ローダー機能とは
https://learn.microsoft.com/ja-jp/azure/databricks/ingestion/auto-loader/
一言でいうと、クラウドストレージからDatabricksへのデータの増分取り込みを効率的に実現するための機能、と言えそうです。以下のDatabricksのブログポストを見る感じ、2020年頃に登場した機能っぽいですね。
従来のApache Sparkの構造化ストリーミングと比べて何が優位なのか?
自動ローダー機能を使わなくても、従来Apache Sparkが備えている構造化ストリーミングの機能を使って増分読み込みはできたはずですが、自動ローダー機能を使うとこれがもっと効率的にできるっぽいです。(自動ローダー自体も構造化ストリーミングの技術を用いて実装されているようですが、そこにDatabricksがいくつかの付加価値を加えた機能、と理解しています)
具体的にどういう点が効率的なのかというと、以下のような点が挙げられています。
- 読み込み時にスキーマの推論を行ってくれる
- 今どこまで処理したかという情報を保持することで、エラーなどが発生した場合でも厳密に1回だけ処理されることを保証してくれる
- 数十億オーダーのファイルが到来してもパフォーマンス効率高くファイルを検知・処理できる
対象とできるデータソース
この自動ローダー機能で対象とできるソースデータは「クラウドストレージ」です。ここでいうクラウドストレージとしては、Azure Data Lake StorageやAmazon S3, Google Cloud Storageなどのストレージサービスを指しています。
なので、逆にいうと、Apache Kafkaなどからデータを継続的に取り込みたい場合などは、現在のところ自動ローダー機能は使えず、従来のSparkの構造化ストリーミングを利用する必要があるっぽいです。
Delta Live Tableと自動ローダー
公式ドキュメントによると、クラウドストレージからデータを増分で取り込むようなシナリオでは、Delta Live Tableと自動ローダーを組み合わせて利用することが推奨されているようです。
Delta Live Tableは超ざっくりETLパイプラインの実装を簡易化してくれる機能、と理解していますが、この中で自動ローダーを使うと、自動ローダー単体で使う場合には手動で構成が必要だったチェックポイントのディレクトリ管理等も不要になり、さらに記述と管理が効率化されるようです。
Delta Live Tableについては別記事にまとめています。
実際に使ってみた
さて、概要を理解したところで実際に機能を触ってみたいと思います。
自動ローダー単体で増分取り込み
以下のようにAzure Data Lake Storage Gen2のDataというコンテナに適当なCSVファイルを入れておきます。ここに、定期的に同じスキーマ構造を持つCSVが追加されていくので、それをDatabricksのテーブルに順次取り込みたい、というシナリオです。
DatabricksのNotebookで以下のようなコードを書きます。
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Define variables used in code below
file_path = "abfss://data@datalake014.dfs.core.windows.net"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"titanic_autoloader"
checkpoint_path = f"/tmp/{username}/_checkpoint/titanic_autoloader"
# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
#dbutils.fs.rm(checkpoint_path, True)
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable(table_name))
ポイントは以下です。
- Apache Sparkの構造化ストリーミングと同様、readStreamのAPIを利用するが、自動ローダーを利用する場合、データソースは”cloudFiles”と記述する
- 今どこまでファイルを処理したか、を記録するチェックポイント機能のためのディレクトリを指定する( .option(“checkpointLocation”, checkpoint_path))
- 自動推論したスキーマ情報を保持しておくためのディレクトリを指定する(.option(“cloudFiles.schemaLocation”, checkpoint_path))
これを実行すると、以下のようにデータのストリーミングが開始して実行中の状態が継続する。Notebook上でストリーミング状況が見れるの、便利ですね。
この時点では、ストリーミングが起動した時点で存在していたファイルが1つ取り込まれて、Databricksのテーブルにデータが反映される。(891行)
次に、手動でクラウドストレージにファイルを追加してみる。(全く同じファイル)
Notebookに戻って先ほど起動したストリーミングの状況を見てみると、早速新しいファイルの取り込みが行われた模様。
Databricks上のテーブルの状態をみると、1782行と、データが更新されていることがわかります。
チェックポイント状態の確認
ちなみにチェックポイントの状態は、以下のコマンドから確認できるようです。ここで、今どのファイルまで処理されたか確認できました。
%sql
SELECT * FROM cloud_files_state('チェックポイントディレクトリのパス');
ファイル検知のしくみ
さて、上記で試してみて、確かにクラウドストレージにファイルを配置すると、即座にデータが取り込まれる様子が確認できました。
ここで疑問が。
- ファイルの到来はいつ・どのように検知されている?
実は、この要素はユーザが調整可能なパラメータとなっています。
ファイルの到来をどのように検知するか
https://learn.microsoft.com/ja-jp/azure/databricks/ingestion/auto-loader/file-detection-modes
自動ローダーでは、「ディレクトリ一覧モード」と「ファイル通知モード」という2つのファイル検知の方法が提供されています。(規定はディレクトリ一覧モード)
公式ドキュメントの記載を読むと、ディレクトリ一覧モードは、Databricksが定期的にクラウドストレージのディレクトリ一覧をスキャンして、未処理のデータがあれば処理する動作、ファイル通知モードは、クラウドストレージ側が備える、ファイルの到来を他システムに通知する機能を使って、クラウドストレージからDatabricksにファイル到来を通知する動作、というように読み取れます。
ファイル通知モードの方が大量ファイルの処理に最適だが、少々必要なセットアップが増えるトレードオフがある、とありますね。
ファイルの到来をいつチェックするか
Databricks側からファイルの存在をチェックしに行くディレクトリ一覧モードの場合は、処理間隔のパラメータを調整可能です。規定だと、500ミリ秒間隔のようですね。
Databricksからの推奨としては、上記ドキュメントにあるように、規定値のような継続的なチェックを行うと、Databricksのコンピューティングコストがかかり続けるデメリットがあるので、そのようなリアルタイム反映の要件がなければ、コスト効率を考えるならば間隔をもう少し緩めるか、もしくはAvaialableNowトリガーを利用した単発実行にして、そのNotebookをジョブやパイプラインから定期的にスケジュール起動するような構成が推奨されているようです。
Delta Live Table + 自動ローダー
次に、Delta Live Tableで自動ローダーを使ってみます。
Delta Live Tableで自動ローダーを使わないストリーミング
と、その前に、自動ローダーを使わずにストリーミング取り込みを行うパターンも試してみます。
先ほどと同様に、クラウドストレージのコンテナに適当なファイルを1つ配置しておきます。
続けてNotebookで以下のようなコードを書きます。
## 生データ取り込み用のテーブル定義(Bronze)
file_path = "abfss://data@datalake014.dfs.core.windows.net"
# Titanicデータのスキーマを定義
titanicSchema = StructType([
StructField("PassengerId", StringType(), True),
StructField("Survived", StringType(), True),
StructField("Pclass", StringType(), True),
StructField("Name", StringType(), True),
StructField("Sex", StringType(), True),
StructField("Age", StringType(), True),
StructField("SibSp", StringType(), True),
StructField("Parch", StringType(), True),
StructField("Ticket", StringType(), True),
StructField("Fare", StringType(), True),
StructField("Cabin", StringType(), True),
StructField("Embarked", StringType(), True)
])
@dlt.create_table(
comment="Titanic Raw Data."
)
def titanic_raw():
return (
spark.readStream.schema(titanicSchema).load(file_path, format='csv',header='true')
)
## 生データ取り込み用テーブルからデータを加工した最終活用テーブル(Gold層、加工は適当)
@dlt.table(
comment="Titanic Summary."
)
def titanic_summary():
return (
dlt.read("titanic_raw")
.groupBy("Survived").count()
)
ポイントは以下かなと思います。
- 自動ローダーを使わない場合は、スキーマを明示的に指定してやる必要がある
この状態でこのノートブックから作成したDelta Live Tableパイプラインを実行すると、以下のようにデータの取り込みが完了します。(891行が処理される。Titanic_rawテーブルもストリーミングテーブルとして扱われていることがわかります。)
この状態で、オブジェクトストレージに同じ内容のファイルをもう一つ追加して、Delta Live Tableパイプラインを再実行すると・・・
ありゃ、titanic_rawテーブルにデータが読み込まれませんでした・・ファイル名は違うけど中身全く同じデータだったから重複読み込みを防ごうと動作したのかな・・・Docsから動作の明記見つけられず・・
とりあえず、データ内容の異なる別のファイル(test.csv)をアップロードしたら、想定通り差分で読み込みが行われました!(test.csvの418行だけが読み込まれた)
Delta Live Tableと自動ローダーを組み合わせたストリーミング
今度は、上と同じ処理を自動ローダーを利用する構文に変えてみます。なお、Row Dataを蓄積するBronzeテーブルの構文のみ着目しています。
file_path = "abfss://data@datalake014.dfs.core.windows.net"
@dlt.create_table(
comment="Titanic Raw Data."
)
def titanic_raw_autoloader():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load(file_path)
)
確かに、自動ローダーを利用することで、スキーマを指定する必要がなくなり、記載はすっきりしました。加えて、Delta Live Tableパイプラインの中で利用することで、チェックポイントの構成なども不要になっていますね。
これで作成したDelta Live Tableパイプラインを実行した時の動作は先ほどの自動ローダーを利用しない場合と同様となりました。
復習ですが、Delta Live Tableパイプラインの中で自動ローダーを利用することで、実装が簡素化されました。これに加えて、最初の概要で述べたような、チェックポイントや、スケーラビリティの恩恵を受けられることが自動ローダーを利用するメリットと理解できました。
ちなみに、自動ローダーを使う場合でもベーシックなreadStreamを使う場合でも、Delta Live Tableパイプラインでデータをリアルタイムで取り込めるようにしたい場合は、パイプラインの実行モードを「連続」にすれば良いです。(「トリガー」ベースの実行だと、実行タイミングでのみ取り込みが走る)
https://learn.microsoft.com/ja-jp/azure/databricks/delta-live-tables/updates#continuous-triggered
「連続」実行モードに切り替えて試してみるとこんな感じ。
まずはパイプラインの設定を「連続」に変更し、
パイプラインを時効すると、今度はDelta Live Tableパイプラインが常時実行中の表示になった。
ここでファイル(test.csv)を追加してみると・・
総処理行数が起動時に取り込んだ891行と追加したtest.csvファイルのデータ行数を合計した行数に切り替わった・・・!
以上、今日はこのくらいまで・・・
ただ、色々と触ってみて、自動ローダー機能に対する理解が深まった気がします。「クラウドストレージからのデータを継続的に取り込むシナリオでは自動ローダーによる取り込みの実装を最初に考える」と頭が整理されました。
少しでも参考になりましたら幸いです。
おしまい
Azure Data Lake Storage Gen2に定期的に到来するCSVファイルを増分で取り込む