こんばんは。今日は、しばし気になっていたAzureの分析データストアソリューション、Azure Synapse Analyticsの専用SQLプール/サーバレスSQL/Sparkプールのクエリ速度比較の実験をしてみましたので、記録を残しておきます。
なお、今回の実験のきっかけになったのは、こちらの記事でした。これを自分の手元でも試してみたい、そんなモチベーションです。なお、こちらの記事では、Azure DatabricksのPhotonエンジンとの比較まで行われているので、そちらも試して別記事にまとめておこうと思います。
Databricks Photon Vs Azure Synapse | Visionarios (blogvisionarios.com)
また、この記事では分析クエリの実行速度だけに焦点を当てていますが、Synapseで利用できるこれらのエンジンは、それ以外の観点でのメリット/デメリットもいろいろとあって、それぞれに適した利用シナリオがあるということも前置きしておきます。
ではまいります。
Contents
前提:今回のテストデータとクエリ
完全に適当なデータですが、以下のような10列1億行のデータを利用しました。
サイズは、CSVにして8.52GB。Parquetにして3.33GBのデータになります。こちらを生成するスクリプトは、以下。
CREATE TABLE dbo.LargeTB
(
[col1] int NOT NULL,
[col2] varchar(255) NOT NULL,
[col3] datetime NOT NULL,
[col4] varchar(255) NOT NULL,
[col5] varchar(255) NOT NULL,
[col6] varchar(255) NOT NULL,
[col7] varchar(255) NOT NULL,
[col8] varchar(255) NOT NULL,
[col9] varchar(255) NOT NULL,
[col10] varchar(255) NOT NULL
);
DECLARE @p_NumOfRows BIGINT
SELECT @p_NumOfRows=100000000;
WITH Base AS (
SELECT 1 AS n
UNION ALL
SELECT n+1 FROM Base WHERE n < CEILING(SQRT(@p_NumOfRows))
),
Expand AS (
SELECT 1 AS c FROM Base AS b1, Base AS b2
),
Nums AS (
SELECT ROW_NUMBER() OVER(ORDER BY c) AS n FROM Expand
)
INSERT INTO LargeTB
SELECT
n,
'DATA' + RIGHT('00000000' + CONVERT(NVARCHAR, n), 8),
DATEADD(mi,n,GETDATE()),
SUBSTRING(CONVERT(varchar(40), NEWID()),0,9),
(ABS(CHECKSUM(NewId())) % 10),
(ABS(CHECKSUM(NewId())) % 20),
(ABS(CHECKSUM(NewId())) % 30),
(ABS(CHECKSUM(NewId())) % 40),
(ABS(CHECKSUM(NewId())) % 50),
(ABS(CHECKSUM(NewId())) % 100)
FROM Nums WHERE n <= @p_NumOfRows
OPTION (MaxRecursion 0);
このスクリプト、SQL Server / Azure SQL Database上では動きますが、Synapse専用SQLプール上では環境の制約で動かなかったので、実はAzure SQL Database上で作成したものをAzure Data FactoryでAzure Data Lake Storage Gen2にCSV、Parquet形式でエクスポートしています。
Synapse専用プールへのデータの読み込みには、そのCSVを専用SQLプールのテーブルに読み込むData Factoryのパイプラインをもう一つ作っています。
次に、クエリは、分析の用途で用いるクエリを想定して、GROUP BY, ORDER BYを含んだ以下のような簡単なクエリにしてみました。(適当です)
/* クエリ */
SELECT TOP(10)
col10,
col6,
col5,
sum(CAST(col7 as BIGINT)) as sum_col7,
avg(CAST(col8 as BIGINT)) as avg_col8,
count(col10) as count
FROM LargeTB
GROUP BY col10, col6, col5
ORDER BY sum_col7 desc, avg_col8 asc;
出力例:
なお、実行時間を測定する際は、エンジンのウォームアップの影響を排除するため、連続実行2回目以降の実行時間を計測しています。
前提:Azure Synapse Analyticsの構成
今回はそれぞれ、以下の構成でテストしてみました。専用SQLプールやSparkプールの構成には、もう少し調整の余地もあったかと思いますが、リソースがボトルネックにならない最低限の構成で選びました。
エンジン | 構成 |
---|---|
サーバレスSQL | 読み取り先のストレージのデータを以下2パターンでテスト。 ・CSV形式(1ファイル) ・Parquet形式(1ファイル) ・Parquet形式(複数ファイル)*Sparkプールで作成したSpark Databaseのparquet群 |
専用SQLプール | 以下2種類のデータ分散方式のテーブルを用意 ・ラウンドロビン分散テーブル ・ハッシュ分散テーブル(GROUP BYのキーになっているcol10を分散キーに) テーブルの設計 – Azure Synapse Analytics | Microsoft Learn SQLプールのSKUは1000DWU(コンピュートノード2台構成) メモリと同時実行の制限 – Azure Synapse Analytics | Microsoft Learn |
Sparkプール | ParquetファイルをSpark Databaseに永続化した上でSpark SQLでクエリ Spark PoolはSpark3.xの既定の設定 + ノードサイズはLarge(16コア/128GB) |
テスト結果
結果は、以下の通りでした。
エンジン | 構成パターン | クエリ実行時間 |
---|---|---|
サーバレスSQL | CSVデータ(単一ファイル) | 173秒 |
Parquetデータ(単一ファイル) | 21秒 | |
Parquetデータ(複数-26ファイル) | 5秒 | |
専用SQLプール | ラウンドロビン分散 | 3秒 |
ハッシュ分散 | 1秒 | |
Sparkプール | Spark Database | 11秒 |
- 分析ワークロードに特化した分散リレーショナルデータベースである専用SQLプールが最も早いのは納得
- サーバレスSQLは読み取るファイルの形式によってかなり速度がばらつく。この点については、公式ドキュメントのベストプラクティスにちゃんと明記されてた。CSVよりもParquet、Parquetはさらに小さいファイルにパーティション分割するとよい、と。実際その通りの結果になっていますね。
サーバーレス SQL プールのベスト プラクティス – Azure Synapse Analytics | Microsoft Learn
補足:サーバレスSQL
以下のようなクエリを実行。
CSV(単一ファイル):
SELECT
TOP(10)
col10,
col6,
col5,
sum(CAST(col7 as BIGINT)) as sum_col7,
avg(CAST(col8 as BIGINT)) as avg_col8,
count(col10) as count
FROM
OPENROWSET(
BULK 'https://<アカウント名>.blob.core.windows.net/<コンテナ名>/LargeTB.csv',
FORMAT='CSV',
FIRSTROW=2
)
WITH (
[col1] BIGINT,
[col2] VARCHAR(255),
[col3] DATETIME,
[col4] VARCHAR(255),
[col5] VARCHAR(255),
[col6] VARCHAR(255),
[col7] VARCHAR(255),
[col8] VARCHAR(255),
[col9] VARCHAR(255),
[col10] VARCHAR(255)
)
AS [LargeTB]
GROUP BY col10, col6, col5
ORDER BY sum_col7 desc, avg_col8 asc;
Parquet(単一ファイル):
サーバーレス SQL プールを使用して Parquet ファイルに対してクエリを実行する – Azure Synapse Analytics | Microsoft Learn
SELECT
TOP(10)
col10,
col6,
col5,
sum(CAST(col7 as BIGINT)) as sum_col7,
avg(CAST(col8 as BIGINT)) as avg_col8,
count(col10) as count
FROM
OPENROWSET(
BULK 'https://<アカウント名>.blob.core.windows.net/<コンテナ名>/LargeTB.parquet',
FORMAT='Parquet'
)
AS [LargeTB]
GROUP BY col10, col6, col5
ORDER BY sum_col7 desc, avg_col8 asc;
Parquet(複数ファイル):
サーバーレス SQL プールを使用して Parquet ファイルに対してクエリを実行する – Azure Synapse Analytics | Microsoft Learn
*複数ファイルのparquetを作成した手順はSpark Poolの章を参照
SELECT
TOP(10)
col10,
col6,
col5,
sum(CAST(col7 as BIGINT)) as sum_col7,
avg(CAST(col8 as BIGINT)) as avg_col8,
count(col10) as count
FROM
OPENROWSET(
BULK 'https://<アカウント名>.dfs.core.windows.net/<パス>/*.parquet',
FORMAT='Parquet'
)
AS [LargeTB]
GROUP BY col10, col6, col5
ORDER BY sum_col7 desc, avg_col8 asc;
専用SQLプール
事前に以下のようなテーブルを作成し、データを流し込んでおきます。
/* ハッシュ分散テーブル */
CREATE TABLE [dbo].[LargeTable_Hash]
(
[col1] int NOT NULL,
[col2] varchar(255) NOT NULL,
[col3] datetime NOT NULL,
[col4] varchar(255) NOT NULL,
[col5] varchar(255) NOT NULL,
[col6] varchar(255) NOT NULL,
[col7] varchar(255) NOT NULL,
[col8] varchar(255) NOT NULL,
[col9] varchar(255) NOT NULL,
[col10] varchar(255) NOT NULL
)
WITH
(
DISTRIBUTION = HASH (col10),
CLUSTERED COLUMNSTORE INDEX
)
GO
/* ラウンドロビン分散テーブル */
CREATE TABLE [dbo].[LargeTable_RoundRobin]
(
[col1] int NOT NULL,
[col2] varchar(255) NOT NULL,
[col3] datetime NOT NULL,
[col4] varchar(255) NOT NULL,
[col5] varchar(255) NOT NULL,
[col6] varchar(255) NOT NULL,
[col7] varchar(255) NOT NULL,
[col8] varchar(255) NOT NULL,
[col9] varchar(255) NOT NULL,
[col10] varchar(255) NOT NULL
)
WITH
(
DISTRIBUTION = ROUND_ROBIN,
CLUSTERED COLUMNSTORE INDEX
)
GO
実行したクエリは以下。最初に紹介したものと同じです。
/* ベンチマーククエリ ハッシュ分散 */
SELECT TOP(10)
col10,
col6,
col5,
sum(CAST(col7 as BIGINT)) as sum_col7,
avg(CAST(col8 as BIGINT)) as avg_col8,
count(col10) as count
FROM LargeTable_Hash
GROUP BY col10, col6, col5
ORDER BY sum_col7 desc, avg_col8 asc;
/* ベンチマーククエリ ラウンドロビン分散 */
SELECT TOP(10)
col10,
col6,
col5,
sum(CAST(col7 as BIGINT)) as sum_col7,
avg(CAST(col8 as BIGINT)) as avg_col8,
count(col10) as count
FROM LargeTable_RoundRobin
GROUP BY col10, col6, col5
ORDER BY sum_col7 desc, avg_col8 asc;
ちなみに、ハッシュ分散の方が早い理由は、ラウンドロビンと比べてデータの移動が少なくて済むためです。これは分散実行プランを見ると分かります。
Azure Monitor を使用して Synapse Analytics を監視する方法 – Azure Synapse Analytics | Microsoft Learn
ハッシュ分散verの分散実行プラン:
ラウンドロビンverの分散実行プラン:
Sparkプール
参考の手順:
クイックスタート: Spark を使用して分析を開始する – Azure Synapse Analytics | Microsoft Learn
SQLによるクエリを可能とするために、事前にSpark Databaseにデータを流しておきます。
%%pyspark
from pyspark.sql import functions as F
from pyspark.sql.functions import col
# データの読み込み
df = spark.read.load('abfss://<コンテナ名>@<ストレージアカウント名>.dfs.core.windows.net/LargeTB.parquet',format='parquet',inferSchema="true")
display(df.limit(10))
# データベースの作成
spark.sql("CREATE DATABASE IF NOT EXISTS LargeTB")
df.write.mode("overwrite").saveAsTable("LargeTB")
ちなみに、Spark Pool上でSpark Databaseを作成すると、Synapse Analytics作成時に指定した規定のAzure Data Lake Storage Gen2にパーティション分割されたparquetが実体として出力されます。
実行したクエリは以下。1点、TOPコマンドがSpark SQLではサポートされていないようだったので、LIMIT 10という構文に変えたくらいで、他は同じ。
# クエリ実行
df = spark.sql("SELECT\
col10,\
col6,\
col5,\
sum(CAST(col7 as BIGINT)) as sum_col7,\
avg(CAST(col8 as BIGINT)) as avg_col8,\
count(col10) as count \
FROM LargeTB \
GROUP BY col10, col6, col5 \
ORDER BY sum_col7 desc, avg_col8 asc LIMIT 10;")
display(df)
sql – unable to select top 10 records per group in sparksql – Stack Overflow
以上、厳密な調整はしていないのであくまで1例としてみていただければと思いますが、Synapseの各機能を使って分析クエリを処理する速度を測ってみた結果でした。
参考になりましたら幸いです。
おしまい
コメントを残す