Fix migration

This commit is contained in:
2025-08-25 18:49:33 +09:00
parent 6886ba15c8
commit 8e3f7024a2
12 changed files with 2254 additions and 20 deletions

View File

@ -0,0 +1,493 @@
#!/usr/bin/env python3
"""
Old RogDB データ移行スクリプト (エラー修正版)
old_rogdb の rog_* テーブルから rogdb の rog_* テーブルへデータを更新・挿入
"""
import os
import sys
import psycopg2
from datetime import datetime, timezone
from typing import Optional, Dict, List, Tuple
import logging
# ログ設定
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# データベース設定
OLD_ROGDB_CONFIG = {
'host': os.getenv('OLD_ROGDB_HOST', 'postgres-db'),
'database': os.getenv('OLD_ROGDB_NAME', 'old_rogdb'),
'user': os.getenv('OLD_ROGDB_USER', 'admin'),
'password': os.getenv('OLD_ROGDB_PASSWORD', 'admin123456'),
'port': int(os.getenv('OLD_ROGDB_PORT', 5432))
}
ROGDB_CONFIG = {
'host': os.getenv('ROGDB_HOST', 'postgres-db'),
'database': os.getenv('ROGDB_NAME', 'rogdb'),
'user': os.getenv('ROGDB_USER', 'admin'),
'password': os.getenv('ROGDB_PASSWORD', 'admin123456'),
'port': int(os.getenv('ROGDB_PORT', 5432))
}
# PostgreSQL予約語をクォートで囲む必要があるカラム
RESERVED_KEYWORDS = {
'like', 'order', 'group', 'user', 'table', 'where', 'select', 'insert',
'update', 'delete', 'create', 'drop', 'alter', 'index', 'constraint'
}
class RogTableMigrator:
"""Rogテーブル移行クラス (エラー修正版)"""
def __init__(self):
self.old_conn = None
self.new_conn = None
self.old_cursor = None
self.new_cursor = None
self.migration_stats = {}
def quote_column_if_needed(self, column_name):
"""予約語やキャメルケースの場合はダブルクォートで囲む"""
# 予約語チェック
if column_name.lower() in RESERVED_KEYWORDS:
return f'"{column_name}"'
# キャメルケースや大文字が含まれる場合もクォート
if any(c.isupper() for c in column_name) or column_name != column_name.lower():
return f'"{column_name}"'
return column_name
def handle_null_values(self, table_name, column_name, value):
"""NULL値の処理とデフォルト値設定"""
if value is not None:
return value
# テーブル・カラム別のデフォルト値設定
null_defaults = {
'rog_team': {
'trial': False, # Boolean型のデフォルト
'is_active': True,
'is_trial': False,
},
'rog_entry': {
'trial': False,
'is_active': True,
'is_trial': False,
'hasGoaled': False,
'hasParticipated': False,
},
'rog_member': {
'is_active': True,
'is_captain': False,
},
'rog_newevent2': {
'public': True,
'class_general': True,
'class_family': True,
'class_solo_male': True,
'class_solo_female': True,
'hour_3': True,
'hour_5': True,
'self_rogaining': False,
}
}
# テーブル固有のデフォルト値を取得
if table_name in null_defaults and column_name in null_defaults[table_name]:
default_value = null_defaults[table_name][column_name]
logger.debug(f"NULL値をデフォルト値に変換: {table_name}.{column_name} = {default_value}")
return default_value
# 一般的なデフォルト値
common_defaults = {
# Boolean型
'is_active': True,
'is_trial': False,
'public': True,
'trial': False,
# 文字列型
'description': '',
'note': '',
'comment': '',
# 数値型
'sort_order': 0,
'order': 0,
# PostgreSQL予約語
'group': '',
'like': False,
}
if column_name in common_defaults:
default_value = common_defaults[column_name]
logger.debug(f"NULL値を共通デフォルト値に変換: {column_name} = {default_value}")
return default_value
# デフォルト値が見つからない場合はNULLを返す
logger.warning(f"デフォルト値が設定されていません: {table_name}.{column_name}")
return None
def connect_databases(self):
"""データベース接続"""
try:
logger.info("データベースに接続中...")
self.old_conn = psycopg2.connect(**OLD_ROGDB_CONFIG)
self.new_conn = psycopg2.connect(**ROGDB_CONFIG)
# autocommitを有効にしてトランザクション問題を回避
self.old_conn.autocommit = True
self.new_conn.autocommit = False # 新しい方は手動コミット
self.old_cursor = self.old_conn.cursor()
self.new_cursor = self.new_conn.cursor()
logger.info("✅ データベース接続成功")
return True
except Exception as e:
logger.error(f"❌ データベース接続エラー: {e}")
return False
def close_connections(self):
"""データベース接続クローズ"""
try:
if self.old_cursor:
self.old_cursor.close()
if self.new_cursor:
self.new_cursor.close()
if self.old_conn:
self.old_conn.close()
if self.new_conn:
self.new_conn.close()
logger.info("データベース接続をクローズしました")
except Exception as e:
logger.warning(f"接続クローズ時の警告: {e}")
def get_rog_tables(self):
"""rog_で始まるテーブル一覧を取得"""
try:
# old_rogdbのrog_テーブル一覧
self.old_cursor.execute("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name LIKE 'rog_%'
ORDER BY table_name
""")
old_tables = [row[0] for row in self.old_cursor.fetchall()]
# rogdbのrog_テーブル一覧
self.new_cursor.execute("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name LIKE 'rog_%'
ORDER BY table_name
""")
new_tables = [row[0] for row in self.new_cursor.fetchall()]
# 共通テーブル
common_tables = list(set(old_tables) & set(new_tables))
logger.info(f"old_rogdb rog_テーブル: {len(old_tables)}")
logger.info(f"rogdb rog_テーブル: {len(new_tables)}")
logger.info(f"共通 rog_テーブル: {len(common_tables)}")
return old_tables, new_tables, common_tables
except Exception as e:
logger.error(f"テーブル一覧取得エラー: {e}")
return [], [], []
def get_table_structure(self, table_name, cursor):
"""テーブル構造を取得 (エラーハンドリング強化)"""
try:
cursor.execute("""
SELECT column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_name = %s
AND table_schema = 'public'
ORDER BY ordinal_position
""", (table_name,))
columns = cursor.fetchall()
return {
'columns': [col[0] for col in columns],
'details': columns
}
except Exception as e:
logger.error(f"テーブル構造取得エラー ({table_name}): {e}")
# トランザクションエラーを回避
try:
cursor.connection.rollback()
except:
pass
return {'columns': [], 'details': []}
def get_primary_key(self, table_name, cursor):
"""主キーカラムを取得"""
try:
cursor.execute("""
SELECT a.attname
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = %s::regclass AND i.indisprimary
""", (table_name,))
pk_columns = [row[0] for row in cursor.fetchall()]
return pk_columns if pk_columns else ['id']
except Exception as e:
logger.warning(f"主キー取得エラー ({table_name}): {e}")
return ['id'] # デフォルトでidを使用
def migrate_table_data(self, table_name):
"""個別テーブルのデータ移行 (エラー修正版)"""
logger.info(f"\n=== {table_name} データ移行開始 ===")
try:
# テーブル構造比較
old_structure = self.get_table_structure(table_name, self.old_cursor)
new_structure = self.get_table_structure(table_name, self.new_cursor)
old_columns = set(old_structure['columns'])
new_columns = set(new_structure['columns'])
common_columns = old_columns & new_columns
if not common_columns:
logger.warning(f"⚠️ {table_name}: 共通カラムがありません")
return {'inserted': 0, 'updated': 0, 'errors': 0}
logger.info(f"共通カラム ({len(common_columns)}個): {sorted(common_columns)}")
# 主キー取得
pk_columns = self.get_primary_key(table_name, self.new_cursor)
logger.info(f"主キー: {pk_columns}")
# カラム名を予約語対応でクォート
quoted_columns = [self.quote_column_if_needed(col) for col in common_columns]
columns_str = ', '.join(quoted_columns)
# old_rogdbからデータ取得
try:
self.old_cursor.execute(f"SELECT {columns_str} FROM {table_name}")
old_records = self.old_cursor.fetchall()
except Exception as e:
logger.error(f"{table_name} データ取得エラー: {e}")
return {'inserted': 0, 'updated': 0, 'errors': 1}
if not old_records:
logger.info(f"{table_name}: 移行対象データなし")
return {'inserted': 0, 'updated': 0, 'errors': 0}
logger.info(f"移行対象レコード数: {len(old_records)}")
# 統計情報
inserted_count = 0
updated_count = 0
error_count = 0
# レコード移行処理
for i, record in enumerate(old_records):
try:
# レコードデータを辞書形式に変換NULL値処理込み
record_dict = {}
for j, col in enumerate(common_columns):
original_value = record[j]
processed_value = self.handle_null_values(table_name, col, original_value)
record_dict[col] = processed_value
# 主キー値を取得
pk_values = []
pk_conditions = []
for pk_col in pk_columns:
if pk_col in record_dict:
pk_values.append(record_dict[pk_col])
quoted_pk_col = self.quote_column_if_needed(pk_col)
pk_conditions.append(f"{quoted_pk_col} = %s")
if not pk_values:
error_count += 1
continue
# 既存レコード確認
check_query = f"SELECT COUNT(*) FROM {table_name} WHERE {' AND '.join(pk_conditions)}"
self.new_cursor.execute(check_query, pk_values)
exists = self.new_cursor.fetchone()[0] > 0
if exists:
# UPDATE処理
set_clauses = []
update_values = []
for col in common_columns:
if col not in pk_columns: # 主キー以外を更新
quoted_col = self.quote_column_if_needed(col)
set_clauses.append(f"{quoted_col} = %s")
update_values.append(record_dict[col])
if set_clauses:
update_query = f"""
UPDATE {table_name}
SET {', '.join(set_clauses)}
WHERE {' AND '.join(pk_conditions)}
"""
self.new_cursor.execute(update_query, update_values + pk_values)
updated_count += 1
else:
# INSERT処理
insert_columns = list(common_columns)
insert_values = [record_dict[col] for col in insert_columns]
quoted_insert_columns = [self.quote_column_if_needed(col) for col in insert_columns]
placeholders = ', '.join(['%s'] * len(insert_columns))
insert_query = f"""
INSERT INTO {table_name} ({', '.join(quoted_insert_columns)})
VALUES ({placeholders})
"""
self.new_cursor.execute(insert_query, insert_values)
inserted_count += 1
# 定期的なコミット
if (i + 1) % 100 == 0:
self.new_conn.commit()
logger.info(f" 進捗: {i + 1}/{len(old_records)} 件処理完了")
except Exception as e:
error_count += 1
logger.error(f" レコード処理エラー (行{i+1}): {e}")
# トランザクションロールバック
try:
self.new_conn.rollback()
except:
pass
if error_count > 10: # エラー上限
logger.error(f"{table_name}: エラー数が上限を超えました")
break
# 最終コミット
try:
self.new_conn.commit()
except Exception as e:
logger.error(f"コミットエラー: {e}")
# 結果サマリー
stats = {
'inserted': inserted_count,
'updated': updated_count,
'errors': error_count,
'total': len(old_records)
}
logger.info(f"{table_name} 移行完了:")
logger.info(f" 挿入: {inserted_count}")
logger.info(f" 更新: {updated_count}")
logger.info(f" エラー: {error_count}")
self.migration_stats[table_name] = stats
return stats
except Exception as e:
logger.error(f"{table_name} 移行エラー: {e}")
try:
self.new_conn.rollback()
except:
pass
return {'inserted': 0, 'updated': 0, 'errors': 1}
def run_migration(self, exclude_tables=None):
"""全体移行実行"""
if exclude_tables is None:
exclude_tables = []
logger.info("=" * 80)
logger.info("Old RogDB → RogDB データ移行開始")
logger.info("=" * 80)
try:
# データベース接続
if not self.connect_databases():
return False
# テーブル一覧取得
old_tables, new_tables, common_tables = self.get_rog_tables()
if not common_tables:
logger.error("❌ 移行対象の共通テーブルがありません")
return False
# 除外テーブルを除去
migration_tables = [t for t in common_tables if t not in exclude_tables]
if exclude_tables:
logger.info(f"除外テーブル: {exclude_tables}")
logger.info(f"移行対象テーブル ({len(migration_tables)}個): {migration_tables}")
# テーブル別移行実行
total_inserted = 0
total_updated = 0
total_errors = 0
for table_name in migration_tables:
stats = self.migrate_table_data(table_name)
total_inserted += stats['inserted']
total_updated += stats['updated']
total_errors += stats['errors']
# 最終結果
logger.info("\n" + "=" * 80)
logger.info("移行完了サマリー")
logger.info("=" * 80)
logger.info(f"処理対象テーブル: {len(migration_tables)}")
logger.info(f"総挿入件数: {total_inserted}")
logger.info(f"総更新件数: {total_updated}")
logger.info(f"総エラー件数: {total_errors}")
# テーブル別詳細
logger.info("\n--- テーブル別詳細 ---")
for table_name, stats in self.migration_stats.items():
logger.info(f"{table_name}: 挿入{stats['inserted']}, 更新{stats['updated']}, エラー{stats['errors']}")
if total_errors == 0:
logger.info("✅ 全ての移行が正常に完了しました!")
return True
else:
logger.warning(f"⚠️ {total_errors}件のエラーがありました")
return False
except Exception as e:
logger.error(f"❌ 移行処理エラー: {e}")
return False
finally:
self.close_connections()
def main():
"""メイン処理"""
logger.info("Old RogDB → RogDB データ移行スクリプト")
# 除外テーブルの設定(必要に応じて)
exclude_tables = [
# 'rog_customuser', # ユーザーデータは慎重に扱う
# 'rog_session', # セッションデータは移行不要
# 'django_migrations', # Django管理テーブル
]
# 環境変数による除外テーブル指定
env_exclude = os.getenv('EXCLUDE_TABLES', '')
if env_exclude:
exclude_tables.extend(env_exclude.split(','))
# 移行実行
migrator = RogTableMigrator()
success = migrator.run_migration(exclude_tables=exclude_tables)
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()