こんにちは。今日は、DatabricksのDelta Live Table機能について勉強をしたので、わかったことなどをこの記事にまとめておきたいと思います。
それではまいります。
Contents
Delta Live Tableとは?
概要
最近、この手の何も前提知識がない状態から機能概要を理解する時には、ChatGPTの力を借りることが多くなってきました笑
Delta Live Table機能についても、例を用いながら機能の概要と意義をまとめてもらいました。(私も前提知識がないためChatGPTのまとめが間違っているまま掲載している可能性もありますのが、ご了承ください・・気づいたら直します)
なお、Delta Live Table機能の原典はこのあたりになります。
https://www.databricks.com/jp/product/delta-live-tables
https://learn.microsoft.com/ja-jp/azure/databricks/delta-live-tables/
〜以下、ChatGPTによる要約(指示した訳でないのに何故か口調がだいぶフランク笑)〜
Delta Live Tablesって何?
Delta Live Tablesは、信頼性、保守性、テスト可能性に優れたデータ処理パイプラインを構築するための宣言型フレームワークで、ざっくり言うと自動運転車のようなものだよ。目的地(分析結果やレポートなど)と出発地点(元のデータ)だけ指定してあげれば、あとは全部自動で運転(データ処理)してくれる。
なぜいいの?
- 自動運転: 運転の疲れを感じることなく、目的地まで行けるように、Delta Live Tablesもデータの取り込みから処理、そして出力まで全部やってくれる。
- リアルタイム: 道に渋滞ができたらルートを変更するように、リアルタイムデータにも対応してくれる。
- 過去のルートも保存: 過去に行った場所(データの旧バージョン)も記録してくれるから、後で確認もできる。
- エラー対応: タイヤがパンクしたら自動で修理してくれるような感じで、データに問題が出たら自動で修復。
- 大量の乗客もOK: データが多くても、スムーズに処理してくれる。
Delta Live Tablesを使うと、これらの処理をスムーズに自動化して、煩わしい手作業から解放されるんだ。というわけで、手間いらずで目的地に到達できるってわけさ。
3つのテーブル種類
Delta Live Tableは、上記の通りETLのプロセスを自動化・簡略化してくれる機能が追加されたDelta Tableとなりますが、テーブルの形式は以下の3種類から選択することができます。
- ストリーミングテーブル
- マテリアライズドビュー
- ビュー
それぞれどのような場合に使うと良いかは、このあたりで解説されています。
なお、DatabricksでETLを行う際には、メダリオンアーキテクチャに基づいて複数のデータ加工ステップを経ることが多いと思いますが、公式ドキュメントでは、Row Dataが蓄積されるBronze層ではストリーミングテーブル形式を、最終データマートであるGold層ではマテリアライズドビュー形式を用いるなど、加工のステップごとに最適な形式を利用することが提唱されています。
ここにも、Bronze層はストリーミングテーブルを推奨すると明記されていました。さらに、データを増分更新するような考慮をするために、Auto Loaderの機能も合わせて使うよう書かれてたりもしました。Auto Loader x Delta Live Tableの読み込みについては、また後日試して別記事にまとめたいと思います。
Databricks では、クラウド オブジェクト ストレージからのほとんどのデータ取り込みタスクに Auto Loader と Delta Live Tables を使用することをお勧めします。Auto Loader と Delta Live Tables は、増え続けるデータがクラウド ストレージに到着すると、増分的かつべき等にロードするように設計されています。
https://learn.microsoft.com/en-us/azure/databricks/delta-live-tables/load#load-files-from-cloud-object-storage
Delta Live Tableの実行環境
Delta Live Tableパイプラインは、どうやら専用のクラスター上で実行されるようです。
実際に使ってみる
私はAzureで提供されるAzure Databricksを利用しているので、以下はAzure Databricks上でDelta Live Table機能を使ってみたメモになります。
Delta Live Tableの機能を利用するには、以下の2つを準備する必要があります。
- テーブルとデータ加工を定義したNotebook
- そのNotebookを実行するパイプライン
以下では、それぞれの手順をメモしています。
テーブルとデータ加工を定義したNotebookの作成
このステップは、以下のサンプルコードをそのままなぞりました。
Rawデータ蓄積用(Bronze)テーブルの定義
はじめにテーブルの定義。以下では、Databricksのサンプルデータセットの1つである、clickstreamデータを取り込むためのBronze層のテーブルを定義しています。
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
# データパスの定義
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
# テーブルの定義
@dlt.create_table(
comment="The raw wikipedia clickstream dataset, ingested from /databricks-datasets."
)
def clickstream_raw():
return (
spark.read.json(json_path)
)
初学者の自分にはこの構文からよくわからなかったので、リファレンスで各構文調べてみた。
- Delta Live Tableのストリーミングテーブルやマテリアライズドビューを作成する場合は、「@dlt.create_table(….」と書き出すお作法。(ビューの場合は@dlt.view(….」)
- @dlt.create_tableの後ろのカッコの中では、テーブルのコメントなどのメタ情報を捕捉できる模様。
- その後ろに続くdef〜の関数定義部分で、テーブルのデータ構造を指定する。前のcreate_table()内でnameプロパティが指定されてない場合は、この関数名がテーブル名として利用される。
なお、今回は、Databricksのストレージ内にあるJSONファイルから生データの読み込みを行う定義ですが、Databricksがサポートしている任意のデータソースからの読み込みが定義できるようです。
https://learn.microsoft.com/en-us/azure/databricks/delta-live-tables/load
クレンジング済データ保存用(Silver層)テーブルとデータ加工の定義
続いて、上で作成したBronzeテーブルをもとにデータをクレンジングし、それを保存するための別のテーブル作成します。
## クレンジング済データ保存用(Silver層)テーブルの定義
@dlt.table(
comment="Wikipedia clickstream data cleaned and prepared for analysis."
)
@dlt.expect("valid_current_page_title", "current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_prepared():
return (
dlt.read("clickstream_raw")
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_title", "click_count", "previous_page_title")
)
ここで、新たに@dlt.expect構文が出てきました。
- @dlt.expectではデータのバリデーションが定義できる。この時、バリデーションのロジック自体を実装する必要はなく、「期待する状態」を宣言するだけで実装できるのが便利なところ(なのだろう、まだしらんけど)
@expect("description", "constraint")
という構文なので、上のコード中の「@dlt.expect(“valid_current_page_title”, “current_page_title IS NOT NULL”)」に当てはめると、current_page_titleという列のデータがNOT NULLの状態を期待し、仮にNULLのものがあった場合でも、結果のデータセットのそのデータは含む、ということかな・・。@expect_or_failとか@expect_or_dropで定義した場合は、制約に引っかかるデータがあれば実行をFailさせたりデータをDropしたりもできるらしい。@expectの場合は、制約に引っかかるデータがあったという事実はどこかで確認できるのかな・・?- もう一つの@expect文「@dlt.expect_or_fail(“valid_count”, “click_count > 0”)」の方は、click_count列の値が0より小さいデータがあれば、実行を失敗させる、ということかな。
- 最後のdef内のテーブルデータ構造の定義では、列の型変換や、リネームを行なうことが宣言されています。
最終加工済(Gold層)テーブルとデータ加工の定義
最後に、Silverテーブルのデータに対してビジネスで活用する形に集計を行った結果を保持するGoldテーブルを定義します。
@dlt.table(
comment="A table containing the top pages linking to the Apache Spark page."
)
def top_spark_referrers():
return (
dlt.read("clickstream_prepared")
.filter(expr("current_page_title == 'Apache_Spark'"))
.withColumnRenamed("previous_page_title", "referrer")
.sort(desc("click_count"))
.select("referrer", "click_count")
.limit(10)
)
Bronze, Silverテーブルのコードを見てきたので、もうだいぶ読みやすくなりました。
- Silverテーブル(clickstream_prepared)から、条件にあった行(current_page_titleが”Apache_Spark”のもの)の中でclick_countが多い上位10件を抽出したデータを保持しています
Delta Live Tableパイプラインの作成と実行
Notebookができたら、それを実行するパイプラインを定義します。
パイプラインの定義は、GUIの「Delta Live Tables」メニューから行うことができます。
「パイプラインを作成」に進み、
パイプライン名と実行対象Notebookその他を設定すればOK。
作成が完了したら、パイプラインをオンデマンドで実行してみます。
実行が進むと、以下のようにNotebookに記載したETLの各ステップが可視化されます。おお・・これはどういう仕組みだ・・すごい。
ETLの各ステップをクリックしてみると、データの詳細が確認できます。そういえば先ほどExpect句で指定していた期待条件に満たないデータの情報も確認できますね。この辺りの実装と監視が簡単に行えるのもDelta Live Table機能の価値と言えそうです。
(補足)Delta Live Tableに保存したデータをクエリできるようにする
さて、上記パイプラインを実行したら各テーブルがHiveメタストアもしくはUnityCatalogから確認できるようになるのかなと思っていましたが、存在しないことに気づきました。
以下を読んだところ、デフォルトでは各テーブルはパイプラインローカルなので、Hive メタストアやUnity Catalogから確認できるようにするためには、出力先のスキーマを明示的に指定しないといけないらしいことが分かった。
デフォルトでは、デルタ ライブ テーブルで作成されたすべてのテーブルとビューはパイプラインに対してローカルです。デルタ ライブ テーブル データセットが宣言されているパイプラインの外でデータセットをクエリまたは使用するには、テーブルをターゲット スキーマにパブリッシュする必要があります。
https://learn.microsoft.com/ja-jp/azure/databricks/delta-live-tables/publish
出力先のスキーマは、パイプラインの定義で設定できる。今回はdefaultにしてみた。
この状態でパイプラインを再実行すると・・・今度はカタログからちゃんとテーブルが確認できた!
なるほど、Delta Live Table機能を利用することで、ETLを宣言的に実装することができることが実感できました。
Delta Live Table機能に関する疑問あれこれ
さて、一通りチュートリアルをなぞってみたところで、いくつか追加の疑問が出てきたので、最後にそれらについて調べたり考えたりしたことをまとめておきます。
Delta Live Table機能とジョブ機能はどのように使い分ける?
これはDatabricksのコミュニティで既にいくつかの議論がありました。どちらでも同じ機能は実現できそうですが、一つのNotebook内で完結する一連のデータ加工を、データの品質を担保しながらよりシンプルに実装したい場合にはDelta Live Tableが選択肢となりそうです。
ジョブは、Databricksのノートブック、JAR、spark-submitジョブなどを自動的に実行する(スケジュールまたは手動)ために設計されています。基本的には、データエンジニアリング、データ分析、またはデータサイエンスのワークロードを実行するための汎用的なフレームワークです。一方で、Delta Live Tables(DLT)は、Delta Lake上で高品質なデータを提供する信頼性の高いデータパイプラインを簡単に構築・管理するために設計されています。DLTは、宣言的なパイプライン開発、自動データテスト、およびモニタリングと回復のための深い可視性を提供することで、データエンジニアリングチームがETL(Extract, Transform, Load)の開発と管理を簡素化します。
https://community.databricks.com/t5/data-engineering/jobs-delta-live-tables-difference/td-p/26159
Delta Live Tablesは、いくつかのDeltaテーブルがフローの観点から相互に接続され、単一のノートブック内で動作するETLパイプラインを構築することを目的としています。一方で、Multi-task Jobsは、さまざまなノートブックをフロー内で実行できるようにする、より汎用的なオーケストレーションフレームワークです。
https://community.databricks.com/t5/data-engineering/difference-between-delta-live-tables-and-multitask-jobs/td-p/25599
Delta Live Tablesは「宣言的なデータフロー」のためのAPIを提供します。手動でデータを読み込み、変換を適用し、それを書き出すコードを書く代わりに、Delta Live Tablesは入力ソース、変換、および意図された出力ソースを宣言するように求めます。管理されたランタイムは、裏側で自動的に最適化し、必要な読み取り、ブックマーク、および計算のロジスティクスを実行します。これは、Terraformがクラウドインフラストラクチャの構築に対して宣言的であるのと非常に似ていますが、Delta Live Tablesはデータテーブルに対して宣言的です。
https://community.databricks.com/t5/data-engineering/difference-between-delta-live-tables-and-multitask-jobs/td-p/25599
Bronze層に読み込むデータを増分更新するには?
最初の方にも書きましたが、Auto Loader機能が使えそうです。Auto Loaderも現時点でそこまで理解がないので、また別記事でまとめたいと思います。
以上、Delta Live Table機能でできることのメモでした。
少しでも参考になったという方いらっしゃいましたら下のいいねをポチっていただけると励みになります!Databricksの勉強は続く・・・
おしまい
ちなみに、なぜこのような機能が登場する必要があったかは、機能公開のアナウンスブログなどをみるとヒントになるものです。この機能についても、例にもれずDatabricksチームからのブログが出てましたので、そのポイントを以下に簡単にまとめておきます。
https://www.databricks.com/blog/2022/04/05/announcing-generally-availability-of-databricks-delta-live-tables-dlt.html
DLTによって・・・