こんにちは。最近はDatabricksの勉強をしていて、関連記事の投稿が増えてきました・・
今日は、前から気になってた、Databricksを使った機械学習のモデル開発についての理解を深めてみたいと思います。
以下の記事を多いに参考にしました。
Unity Catalog で機械学習モデルをトレーニングして登録する – Azure Databricks | Microsoft Learn
それではまいります。
Contents
Machine Learningクラスターの作成
Databricksで機械学習のワークロードを実行するためには、機械学習用の設定をほどこしたクラスターを作成する必要があるようです。
具体的にはランタイムを作成するときに、ML用のランタイムを選択します。これによって、Scikit-Learnなどの機械学習に一般的に必要になるライブラリなどを含んだ状態でクラスターを起動できるようになるっぽいです。
以降、このクラスターを利用したノートブック上で作業していきます。
Unity Catalogにデータを準備
カタログの作成
Unity Catalogメタストアが存在する前提で、カタログ以下をNotebookのSQLで作成していきます。
まず、今回の実験用に”ml”という名前の新規カタログを作成。
%sql
CREATE CATALOG ml;
*もしUnity Catalogメタストアの作成から必要な場合は、以下記事を参照ください。
Azure DatabricksのUnity Catalogメタストアを作成してみた | 煎茶 (simpletraveler.jp)
データの取り込み
次に機械学習でよく利用されるタイタニックの乗船客リストデータを、Unity Catalogのテーブルに取り込みます。今回はGUIから。
取り込み完了。
Unity Catalogテーブルのデータを利用して機械学習モデル構築
必要ライブラリのインポート
import mlflow
mlflow.set_registry_uri("databricks-uc") ## モデルの保存先をUnity Catalogに設定
import numpy as np
import pandas as pd
import sklearn.datasets
import sklearn.metrics
import sklearn.model_selection
import sklearn.ensemble
from hyperopt import fmin, tpe, hp, SparkTrials, Trials, STATUS_OK
from hyperopt.pyll import scope
なお、2行目でmlflowのレジストリURIの設定を行っていますが、これはモデルをUnity Catalogで管理するために必要な設定なようです。
Unity Catalog 内でモデル ライフサイクルを管理する – Azure Databricks | Microsoft Learn
Unity Catalogのテーブルからデータをロード
df_train = spark.read.table("ml.default.titanic").toPandas()
df_train.head()
特徴量加工
# ずっと昔に書いたコードを使いまわしてるので中身は適当です・・・
def feature_engineering(df):
# Null Value Handling
df["Age"].fillna(df["Age"].median(),inplace=True)
df["Embarked"].fillna(df['Embarked'].mode()[0], inplace = True)
df = df.fillna(-1)
# Feature Encoding
df["Sex"] = df["Sex"].map({'male':1,'female':0}).fillna(-1).astype(int)
df["Embarked"] = df["Embarked"].map({'S':0,'C':1,'Q':2}).astype(int)
df["Cabin"] = df["Cabin"].str[0].map({'T':0,'G':1,'F':2,'E':3,'D':4,'C':5,'B':6,'A':7}).fillna(-1).astype(int)
# Binning
bins_age = np.linspace(0, 100, 10)
df["AgeBin"] = np.digitize(df["Age"], bins=bins_age)
df["FareBin"] = 0
df["FareBin"][(df["Fare"]>=0)&(df["Fare"]<10)] = 1
df["FareBin"][(df["Fare"]>=10)&(df["Fare"]<20)] = 2
df["FareBin"][(df["Fare"]>=20)&(df["Fare"]<30)] = 3
df["FareBin"][(df["Fare"]>=30)&(df["Fare"]<40)] = 4
df["FareBin"][(df["Fare"]>=40)&(df["Fare"]<50)] = 5
df["FareBin"][(df["Fare"]>=50)&(df["Fare"]<100)] = 6
df["FareBin"][(df["Fare"]>=100)] = 7
# Create New Features (Optional)
df['FamilySize'] = df['SibSp'] + df['Parch'] + 1
df['Title'] = -1
df['Title'][df["Name"].str.contains("Mr")] = 0
df['Title'][df["Name"].str.contains("Master")] = 1
df['Title'][df["Name"].str.contains("Miss")] = 2
df['Title'][df["Name"].str.contains("Mrs")] = 3
# Drop unsed columns
del df["Age"]
del df["Fare"]
del df["Ticket"]
del df["Name"]
del df["PassengerId"]
return df
df_train_fe = feature_engineering(df_train)
df_train_fe.head()
モデルの構築
まず学習データを80%の学習用データと20%の検証用データに分割し。
# Split 80/20 train-test
X, y = df_train_fe.loc[:, df_train_fe.columns != 'Survived'], df_train_fe['Survived']
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(
X,
y,
test_size=0.2,
random_state=1
)
そのデータを使ってモデルを構築
# Enable MLflow autologging for this notebook
mlflow.autolog()
with mlflow.start_run(run_name='gradient_boost') as run:
model = sklearn.ensemble.GradientBoostingClassifier(random_state=0)
# Models, parameters, and training metrics are tracked automatically
model.fit(X_train, y_train)
predicted_probs = model.predict_proba(X_test)
roc_auc = sklearn.metrics.roc_auc_score(y_test, predicted_probs[:,1])
# The AUC score on test data is not automatically logged, so log it manually
mlflow.log_metric("test_auc", roc_auc)
print("Test AUC of: {}".format(roc_auc))
モデルの学習が終わったら、ワークスペース>エクスぺリメントからGUIベースで実験の結果を確認することができます。
各実験をクリックすると、詳細も確認できます。
各種評価指標メトリクスや、
ROCカーブのビジュアルなども確認できます。
分散ハイパーパラメータチューニング
Databricksの各ワーカーに学習タスクを分散して分散チューニングを行うこともできるようです。(前提として、HyperoptというOSSハイパラチューニングライブラリが分散チューニングに対応しているためになせる技、のようです)
Hyperopt の概念 – Azure Databricks | Microsoft Learn
Hyperoptで分散チューニングを行う際には、以下コード中に登場するSparkTrialなる構文が肝っぽいです。
SparkTrials
は、Hyperopt コードに他の変更を加えることなく Hyperopt 実行を配布することができる、Databricks によって開発された API です。SparkTrials
によって試用を Spark worker に配布することで、単一マシンのチューニングを高速化します。
# Define the search space to explore
search_space = {
'n_estimators': scope.int(hp.quniform('n_estimators', 20, 1000, 1)),
'learning_rate': hp.loguniform('learning_rate', -3, 0),
'max_depth': scope.int(hp.quniform('max_depth', 2, 5, 1)),
}
def train_model(params):
# Enable autologging on each worker
mlflow.autolog()
with mlflow.start_run(nested=True):
model_hp = sklearn.ensemble.GradientBoostingClassifier(
random_state=0,
**params
)
model_hp.fit(X_train, y_train)
predicted_probs = model_hp.predict_proba(X_test)
# Tune based on the test AUC
# In production, you could use a separate validation set instead
roc_auc = sklearn.metrics.roc_auc_score(y_test, predicted_probs[:,1])
mlflow.log_metric('test_auc', roc_auc)
# Set the loss to -1*auc_score so fmin maximizes the auc_score
return {'status': STATUS_OK, 'loss': -1*roc_auc}
# SparkTrials distributes the tuning using Spark workers
# Greater parallelism speeds processing, but each hyperparameter trial has less information from other trials
# On smaller clusters or Databricks Community Edition try setting parallelism=2
spark_trials = SparkTrials(
parallelism=1
)
with mlflow.start_run(run_name='gb_hyperopt') as run:
# Use hyperopt to find the parameters yielding the highest AUC
best_params = fmin(
fn=train_model,
space=search_space,
algo=tpe.suggest,
max_evals=32,
trials=spark_trials)
先ほどのコードに続いて上記コードを実行すると、複数のジョブが実行される様子が確認できます。
最良モデルによる予測と結果の保存
以下コードを実行して、最良モデル(この例ではAUCスコアが最も高かったもの、かな)を見つけて、そのモデルを利用して検証データに対する予測を実行します。
# Sort runs by their test auc; in case of ties, use the most recent run
best_run = mlflow.search_runs(
order_by=['metrics.test_auc DESC', 'start_time DESC'],
max_results=10,
).iloc[0]
print('Best Run')
print('AUC: {}'.format(best_run["metrics.test_auc"]))
print('Num Estimators: {}'.format(best_run["params.n_estimators"]))
print('Max Depth: {}'.format(best_run["params.max_depth"]))
print('Learning Rate: {}'.format(best_run["params.learning_rate"]))
best_model_pyfunc = mlflow.pyfunc.load_model(
'runs:/{run_id}/model'.format(
run_id=best_run.run_id
)
)
#make a dataset with all predictions
best_model_predictions = X_test
best_model_predictions["prediction"] = best_model_pyfunc.predict(X_test)
この予測結果とモデルは、以下のようにしてUnity Catalogに保存することができます。
予測結果:predictionsという名前でテーブルとして保存
results = spark.createDataFrame(best_model_predictions)
spark.sql("drop table if exists ml.default.predictions")
#Write results back to Unity Catalog from Python
results.write.saveAsTable("ml.default.predictions")
最良予測モデル:モデルとしてUnity Catalogに登録
Unity Catalog 内でモデル ライフサイクルを管理する – Azure Databricks | Microsoft Learn
model_uri = 'runs:/{run_id}/model'.format(
run_id=best_run.run_id
)
mlflow.register_model(model_uri, "ml.default.titanic_model")
なるほど、カタログのスキーマに”モデル”というタブがあって、そこから確認できるようになるようですね。
モデルをクリックすると、メタデータ情報や権限を確認することができます。
バージョンをクリックすると、そのバージョンに紐づいた実行の詳細リンクを確認することができます。
なお、今回はモデルの保存・管理にUnity Catalogを利用する方法を試してみましたが、モデルレジストリに保存する方法もあるようです。どちらで管理するのがいいのでしょうか・・?
この点については、公式ドキュメントに以下の記載が確認できました。
Azure Databricks では、Unity Catalog のモデルの使用をお勧めしています。 Unity Catalog のモデルには、一元化されたモデル ガバナンス、クロスワークスペース アクセス、系列、デプロイが備わっています。 ワークスペース モデル レジストリは、今後非推奨となる予定です。
ワークスペース モデル レジストリを使用してモデルのライフサイクルを管理する – Azure Databricks | Microsoft Learn
そもそもモデルレジストリの方は今後非推奨になるとのことで、これからDatabricksを利用するならあまり気にしなくてよさそうですね・・・!
以上、今回の記事ではDatabricksのUnity Catalogを利用して機械学習モデルの開発~保存を行うところまでを実際に触って試してみました。
モデルの開発は一般的なPython, Scikit-Learnなどの機械学習ライブラリの使い方が分かれば問題なく、モデルのライフサイクル管理にはOSSのMLFlowが使われるので、ここでもDatabricks固有の知識は不要で、このあたりの前提知識があれば、比較的簡単に使いこなすことができそうだなと感じました。(自分はMLFlow部分の習得が必要なんですが・・・)
今後の記事では、作成したモデルを推論用にデプロイする方法などについても試してみたいと思います。
少しでも参考になりましたら幸いです。
おしまい
コメントを残す