Azure CosmosDB (NoSQL API)のデータベースを別のCosmosDB (NoSQL API)に一括で移行するPythonコード

こんにちは。小ネタですが、Azure CosmosDB (NoSQL API)のデータベース内のデータを別のリソースに一括で移行するコードを貼っておきます。

CosmosDB間のデータ移行は、一般的にはAzure Data FactoryなどのETLサービスを使うと簡単ですが、今回は個人利用環境でなるべくお金をかけずに移行したい、という背景があったので、コードで実装する方法を選択しました。

なお、このコードの原型はChatGPTに生成してもらっています。ただ、そのままでは動かない部分があったので、実際に動作するように筆者で一部改変しています。(移行先コンテナのパーティションキー設定の部分1か所がうまくうごかなかったくらいですが・・ChatGPTスゴイ・・・)

ので、利用にあたっては、十分なテストの上各自の責任でご利用ください。

!pip install azure-cosmos
from azure.cosmos import CosmosClient, exceptions, PartitionKey

# ソースCosmos DBの設定
source_url = 'https://XXXXXX.documents.azure.com:443/'
source_key = 'XXXXXX'
source_database_id = 'XXXXXX'

# ターゲットCosmos DBの設定
target_url = 'https://XXXXXX.documents.azure.com:443/'
target_key = 'XXXXXX'
target_database_id = 'XXXXXX'

# ソースCosmos DBへの接続
source_client = CosmosClient(source_url, credential=source_key)
source_database = source_client.get_database_client(source_database_id)

# ターゲットCosmos DBへの接続
target_client = CosmosClient(target_url, credential=target_key)
target_database = target_client.create_database_if_not_exists(id=target_database_id)

# ソースデータベース内の全コンテナをループする
for source_container_properties in source_database.list_containers():
    source_container_id = source_container_properties['id']
    source_container_client = source_database.get_container_client(source_container_id)

    # 移行元コンテナのパーティションキーを取得する
    source_container_partition_key = source_container_client.read().get('partitionKey')
    source_partition_key_path = source_container_partition_key.get('paths')[0]
    print(source_container_id)

    # 対応するターゲットコンテナを作成または取得する
    # パーティションキーを動的に設定
    
    target_container = target_database.create_container_if_not_exists(
        id=source_container_id,
        partition_key=PartitionKey(path=source_partition_key_path)
    )


    # データの読み込みと書き込み
    for item in source_container_client.query_items(
            query='SELECT * FROM c',
            enable_cross_partition_query=True):

        try:
            # データをターゲットに書き込む
            target_container.upsert_item(item)
            print(f'Item {item["id"]} migrated to container {source_container_id}')
        except exceptions.CosmosHttpResponseError as e:
            print(f'Failed to migrate item {item["id"]} in container {source_container_id}: {e}')

print("Data migration complete.")

それにしてもChatGPTでコーディングの負荷はかなり減っていると感じますね・・ほんと助かる・・

おしまい

この記事を気に入っていただけたらシェアをお願いします!

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

ABOUT US
Yuu113
初めまして。ゆうたろうと申します。 兵庫県出身、東京でシステムエンジニアをしております。現在は主にデータ分析、機械学習を活用してビジネスモデリングに取り組んでいます。 日々学んだことや経験したことを整理していきたいと思い、ブログを始めました。旅行、カメラ、IT技術、江戸文化が大好きですので、これらについても記事にしていきたいと思っています。