Files
rogaining_srv/migrate_old_rogdb_to_rogdb.py

500 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Old RogDB データ移行スクリプト (エラー修正版)
old_rogdb の rog_* テーブルから rogdb の rog_* テーブルへデータを更新・挿入
"""
import os
import sys
import psycopg2
from datetime import datetime, timezone
from typing import Optional, Dict, List, Tuple
import logging
# ログ設定
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# データベース設定
OLD_ROGDB_CONFIG = {
'host': os.getenv('OLD_ROGDB_HOST', 'postgres-db'),
'database': os.getenv('OLD_ROGDB_NAME', 'old_rogdb'),
'user': os.getenv('OLD_ROGDB_USER', 'admin'),
'password': os.getenv('OLD_ROGDB_PASSWORD', 'admin123456'),
'port': int(os.getenv('OLD_ROGDB_PORT', 5432))
}
ROGDB_CONFIG = {
'host': os.getenv('ROGDB_HOST', 'postgres-db'),
'database': os.getenv('ROGDB_NAME', 'rogdb'),
'user': os.getenv('ROGDB_USER', 'admin'),
'password': os.getenv('ROGDB_PASSWORD', 'admin123456'),
'port': int(os.getenv('ROGDB_PORT', 5432))
}
# PostgreSQL予約語をクォートで囲む必要があるカラム
RESERVED_KEYWORDS = {
'like', 'order', 'group', 'user', 'table', 'where', 'select', 'insert',
'update', 'delete', 'create', 'drop', 'alter', 'index', 'constraint'
}
class RogTableMigrator:
"""Rogテーブル移行クラス (エラー修正版)"""
def __init__(self):
self.old_conn = None
self.new_conn = None
self.old_cursor = None
self.new_cursor = None
self.migration_stats = {}
def quote_column_if_needed(self, column_name):
"""予約語やキャメルケースの場合はダブルクォートで囲む"""
# 予約語チェック
if column_name.lower() in RESERVED_KEYWORDS:
return f'"{column_name}"'
# キャメルケースや大文字が含まれる場合もクォート
if any(c.isupper() for c in column_name) or column_name != column_name.lower():
return f'"{column_name}"'
return column_name
def handle_null_values(self, table_name, column_name, value):
"""NULL値の処理とデフォルト値設定"""
if value is not None:
return value
# テーブル・カラム別のデフォルト値設定
null_defaults = {
'rog_team': {
'trial': False, # Boolean型のデフォルト
'is_active': True,
'is_trial': False,
},
'rog_entry': {
'trial': False,
'is_active': True,
'is_trial': False,
'hasGoaled': False,
'hasParticipated': False,
},
'rog_member': {
'is_active': True,
'is_captain': False,
},
'rog_newevent2': {
'public': True,
'class_general': True,
'class_family': True,
'class_solo_male': True,
'class_solo_female': True,
'hour_3': True,
'hour_5': True,
'self_rogaining': False,
}
}
# テーブル固有のデフォルト値を取得
if table_name in null_defaults and column_name in null_defaults[table_name]:
default_value = null_defaults[table_name][column_name]
logger.debug(f"NULL値をデフォルト値に変換: {table_name}.{column_name} = {default_value}")
return default_value
# 一般的なデフォルト値
common_defaults = {
# Boolean型
'is_active': True,
'is_trial': False,
'public': True,
'trial': False,
# 文字列型
'description': '',
'note': '',
'comment': '',
# 数値型
'sort_order': 0,
'order': 0,
# PostgreSQL予約語
'group': '',
'like': False,
}
if column_name in common_defaults:
default_value = common_defaults[column_name]
logger.debug(f"NULL値を共通デフォルト値に変換: {column_name} = {default_value}")
return default_value
# デフォルト値が見つからない場合はNULLを返す
logger.warning(f"デフォルト値が設定されていません: {table_name}.{column_name}")
return None
def connect_databases(self):
"""データベース接続"""
try:
logger.info("データベースに接続中...")
self.old_conn = psycopg2.connect(**OLD_ROGDB_CONFIG)
self.new_conn = psycopg2.connect(**ROGDB_CONFIG)
# autocommitを有効にしてトランザクション問題を回避
self.old_conn.autocommit = True
self.new_conn.autocommit = False # 新しい方は手動コミット
self.old_cursor = self.old_conn.cursor()
self.new_cursor = self.new_conn.cursor()
logger.info("✅ データベース接続成功")
return True
except Exception as e:
logger.error(f"❌ データベース接続エラー: {e}")
return False
def close_connections(self):
"""データベース接続クローズ"""
try:
if self.old_cursor:
self.old_cursor.close()
if self.new_cursor:
self.new_cursor.close()
if self.old_conn:
self.old_conn.close()
if self.new_conn:
self.new_conn.close()
logger.info("データベース接続をクローズしました")
except Exception as e:
logger.warning(f"接続クローズ時の警告: {e}")
def get_rog_tables(self):
"""rog_で始まるテーブル一覧を取得"""
try:
# old_rogdbのrog_テーブル一覧
self.old_cursor.execute("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name LIKE 'rog_%'
ORDER BY table_name
""")
old_tables = [row[0] for row in self.old_cursor.fetchall()]
# rogdbのrog_テーブル一覧
self.new_cursor.execute("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name LIKE 'rog_%'
ORDER BY table_name
""")
new_tables = [row[0] for row in self.new_cursor.fetchall()]
# 共通テーブル
common_tables = list(set(old_tables) & set(new_tables))
logger.info(f"old_rogdb rog_テーブル: {len(old_tables)}")
logger.info(f"rogdb rog_テーブル: {len(new_tables)}")
logger.info(f"共通 rog_テーブル: {len(common_tables)}")
return old_tables, new_tables, common_tables
except Exception as e:
logger.error(f"テーブル一覧取得エラー: {e}")
return [], [], []
def get_table_structure(self, table_name, cursor):
"""テーブル構造を取得 (エラーハンドリング強化)"""
try:
cursor.execute("""
SELECT column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_name = %s
AND table_schema = 'public'
ORDER BY ordinal_position
""", (table_name,))
columns = cursor.fetchall()
return {
'columns': [col[0] for col in columns],
'details': columns
}
except Exception as e:
logger.error(f"テーブル構造取得エラー ({table_name}): {e}")
# トランザクションエラーを回避
try:
cursor.connection.rollback()
except:
pass
return {'columns': [], 'details': []}
def get_primary_key(self, table_name, cursor):
"""主キーカラムを取得"""
try:
cursor.execute("""
SELECT a.attname
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = %s::regclass AND i.indisprimary
""", (table_name,))
pk_columns = [row[0] for row in cursor.fetchall()]
return pk_columns if pk_columns else ['id']
except Exception as e:
logger.warning(f"主キー取得エラー ({table_name}): {e}")
return ['id'] # デフォルトでidを使用
def migrate_table_data(self, table_name):
"""個別テーブルのデータ移行 (エラー修正版)"""
logger.info(f"\n=== {table_name} データ移行開始 ===")
try:
# テーブル構造比較
old_structure = self.get_table_structure(table_name, self.old_cursor)
new_structure = self.get_table_structure(table_name, self.new_cursor)
old_columns = set(old_structure['columns'])
new_columns = set(new_structure['columns'])
common_columns = old_columns & new_columns
if not common_columns:
logger.warning(f"⚠️ {table_name}: 共通カラムがありません")
return {'inserted': 0, 'updated': 0, 'errors': 0}
logger.info(f"共通カラム ({len(common_columns)}個): {sorted(common_columns)}")
# 主キー取得
pk_columns = self.get_primary_key(table_name, self.new_cursor)
logger.info(f"主キー: {pk_columns}")
# カラム名を予約語対応でクォート
quoted_columns = [self.quote_column_if_needed(col) for col in common_columns]
columns_str = ', '.join(quoted_columns)
# old_rogdbからデータ取得
try:
self.old_cursor.execute(f"SELECT {columns_str} FROM {table_name}")
old_records = self.old_cursor.fetchall()
except Exception as e:
logger.error(f"{table_name} データ取得エラー: {e}")
return {'inserted': 0, 'updated': 0, 'errors': 1}
if not old_records:
logger.info(f"{table_name}: 移行対象データなし")
return {'inserted': 0, 'updated': 0, 'errors': 0}
logger.info(f"移行対象レコード数: {len(old_records)}")
# 統計情報
inserted_count = 0
updated_count = 0
error_count = 0
# レコード移行処理
for i, record in enumerate(old_records):
try:
# レコードデータを辞書形式に変換NULL値処理込み
record_dict = {}
for j, col in enumerate(common_columns):
original_value = record[j]
processed_value = self.handle_null_values(table_name, col, original_value)
record_dict[col] = processed_value
# 主キー値を取得
pk_values = []
pk_conditions = []
for pk_col in pk_columns:
if pk_col in record_dict:
pk_values.append(record_dict[pk_col])
quoted_pk_col = self.quote_column_if_needed(pk_col)
pk_conditions.append(f"{quoted_pk_col} = %s")
if not pk_values:
error_count += 1
continue
# 既存レコード確認
check_query = f"SELECT COUNT(*) FROM {table_name} WHERE {' AND '.join(pk_conditions)}"
self.new_cursor.execute(check_query, pk_values)
exists = self.new_cursor.fetchone()[0] > 0
if exists:
# UPDATE処理
set_clauses = []
update_values = []
for col in common_columns:
if col not in pk_columns: # 主キー以外を更新
quoted_col = self.quote_column_if_needed(col)
set_clauses.append(f"{quoted_col} = %s")
update_values.append(record_dict[col])
if set_clauses:
update_query = f"""
UPDATE {table_name}
SET {', '.join(set_clauses)}
WHERE {' AND '.join(pk_conditions)}
"""
self.new_cursor.execute(update_query, update_values + pk_values)
updated_count += 1
else:
# INSERT処理
insert_columns = list(common_columns)
insert_values = [record_dict[col] for col in insert_columns]
quoted_insert_columns = [self.quote_column_if_needed(col) for col in insert_columns]
placeholders = ', '.join(['%s'] * len(insert_columns))
insert_query = f"""
INSERT INTO {table_name} ({', '.join(quoted_insert_columns)})
VALUES ({placeholders})
"""
self.new_cursor.execute(insert_query, insert_values)
inserted_count += 1
# 定期的なコミット
if (i + 1) % 100 == 0:
self.new_conn.commit()
logger.info(f" 進捗: {i + 1}/{len(old_records)} 件処理完了")
except Exception as e:
error_count += 1
logger.error(f" レコード処理エラー (行{i+1}): {e}")
# トランザクションロールバック
try:
self.new_conn.rollback()
except:
pass
if error_count > 10: # エラー上限
logger.error(f"{table_name}: エラー数が上限を超えました")
break
# 最終コミット
try:
self.new_conn.commit()
except Exception as e:
logger.error(f"コミットエラー: {e}")
# 結果サマリー
stats = {
'inserted': inserted_count,
'updated': updated_count,
'errors': error_count,
'total': len(old_records)
}
logger.info(f"{table_name} 移行完了:")
logger.info(f" 挿入: {inserted_count}")
logger.info(f" 更新: {updated_count}")
logger.info(f" エラー: {error_count}")
self.migration_stats[table_name] = stats
return stats
except Exception as e:
logger.error(f"{table_name} 移行エラー: {e}")
try:
self.new_conn.rollback()
except:
pass
return {'inserted': 0, 'updated': 0, 'errors': 1}
def run_migration(self, exclude_tables=None):
"""全体移行実行"""
if exclude_tables is None:
exclude_tables = []
# 特殊処理が必要なテーブルは専用スクリプトで処理
exclude_tables.extend(['rog_team', 'rog_entry', 'rog_goalimages'])
logger.info("=" * 80)
logger.info("Old RogDB → RogDB データ移行開始")
logger.info("⚠️ rog_team は migrate_rog_team_enhanced.py で別途処理してください")
logger.info("⚠️ rog_entry は migrate_rog_entry_enhanced.py で別途処理してください")
logger.info("⚠️ rog_goalimages は migrate_rog_goalimages_enhanced.py で別途処理してください")
logger.info("=" * 80)
try:
# データベース接続
if not self.connect_databases():
return False
# テーブル一覧取得
old_tables, new_tables, common_tables = self.get_rog_tables()
if not common_tables:
logger.error("❌ 移行対象の共通テーブルがありません")
return False
# 除外テーブルを除去
migration_tables = [t for t in common_tables if t not in exclude_tables]
if exclude_tables:
logger.info(f"除外テーブル: {exclude_tables}")
logger.info(f"移行対象テーブル ({len(migration_tables)}個): {migration_tables}")
# テーブル別移行実行
total_inserted = 0
total_updated = 0
total_errors = 0
for table_name in migration_tables:
stats = self.migrate_table_data(table_name)
total_inserted += stats['inserted']
total_updated += stats['updated']
total_errors += stats['errors']
# 最終結果
logger.info("\n" + "=" * 80)
logger.info("移行完了サマリー")
logger.info("=" * 80)
logger.info(f"処理対象テーブル: {len(migration_tables)}")
logger.info(f"総挿入件数: {total_inserted}")
logger.info(f"総更新件数: {total_updated}")
logger.info(f"総エラー件数: {total_errors}")
# テーブル別詳細
logger.info("\n--- テーブル別詳細 ---")
for table_name, stats in self.migration_stats.items():
logger.info(f"{table_name}: 挿入{stats['inserted']}, 更新{stats['updated']}, エラー{stats['errors']}")
if total_errors == 0:
logger.info("✅ 全ての移行が正常に完了しました!")
return True
else:
logger.warning(f"⚠️ {total_errors}件のエラーがありました")
return False
except Exception as e:
logger.error(f"❌ 移行処理エラー: {e}")
return False
finally:
self.close_connections()
def main():
"""メイン処理"""
logger.info("Old RogDB → RogDB データ移行スクリプト")
# 除外テーブルの設定(必要に応じて)
exclude_tables = [
# 'rog_customuser', # ユーザーデータは慎重に扱う
# 'rog_session', # セッションデータは移行不要
# 'django_migrations', # Django管理テーブル
]
# 環境変数による除外テーブル指定
env_exclude = os.getenv('EXCLUDE_TABLES', '')
if env_exclude:
exclude_tables.extend(env_exclude.split(','))
# 移行実行
migrator = RogTableMigrator()
success = migrator.run_migration(exclude_tables=exclude_tables)
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()