415 lines
16 KiB
Python
415 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Old RogDB データ移行スクリプト (エラー修正版)
|
|
old_rogdb の rog_* テーブルから rogdb の rog_* テーブルへデータを更新・挿入
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import psycopg2
|
|
from datetime import datetime, timezone
|
|
from typing import Optional, Dict, List, Tuple
|
|
import logging
|
|
|
|
# ログ設定
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# データベース設定
|
|
OLD_ROGDB_CONFIG = {
|
|
'host': os.getenv('OLD_ROGDB_HOST', 'postgres-db'),
|
|
'database': os.getenv('OLD_ROGDB_NAME', 'old_rogdb'),
|
|
'user': os.getenv('OLD_ROGDB_USER', 'admin'),
|
|
'password': os.getenv('OLD_ROGDB_PASSWORD', 'admin123456'),
|
|
'port': int(os.getenv('OLD_ROGDB_PORT', 5432))
|
|
}
|
|
|
|
ROGDB_CONFIG = {
|
|
'host': os.getenv('ROGDB_HOST', 'postgres-db'),
|
|
'database': os.getenv('ROGDB_NAME', 'rogdb'),
|
|
'user': os.getenv('ROGDB_USER', 'admin'),
|
|
'password': os.getenv('ROGDB_PASSWORD', 'admin123456'),
|
|
'port': int(os.getenv('ROGDB_PORT', 5432))
|
|
}
|
|
|
|
# PostgreSQL予約語をクォートで囲む必要があるカラム
|
|
RESERVED_KEYWORDS = {
|
|
'like', 'order', 'group', 'user', 'table', 'where', 'select', 'insert',
|
|
'update', 'delete', 'create', 'drop', 'alter', 'index', 'constraint'
|
|
}
|
|
|
|
class RogTableMigrator:
|
|
"""Rogテーブル移行クラス (エラー修正版)"""
|
|
|
|
def __init__(self):
|
|
self.old_conn = None
|
|
self.new_conn = None
|
|
self.old_cursor = None
|
|
self.new_cursor = None
|
|
self.migration_stats = {}
|
|
|
|
def quote_column_if_needed(self, column_name):
|
|
"""予約語やキャメルケースの場合はダブルクォートで囲む"""
|
|
# 予約語チェック
|
|
if column_name.lower() in RESERVED_KEYWORDS:
|
|
return f'"{column_name}"'
|
|
|
|
# キャメルケースや大文字が含まれる場合もクォート
|
|
if any(c.isupper() for c in column_name) or column_name != column_name.lower():
|
|
return f'"{column_name}"'
|
|
|
|
return column_name
|
|
|
|
def connect_databases(self):
|
|
"""データベース接続"""
|
|
try:
|
|
logger.info("データベースに接続中...")
|
|
self.old_conn = psycopg2.connect(**OLD_ROGDB_CONFIG)
|
|
self.new_conn = psycopg2.connect(**ROGDB_CONFIG)
|
|
|
|
# autocommitを有効にしてトランザクション問題を回避
|
|
self.old_conn.autocommit = True
|
|
self.new_conn.autocommit = False # 新しい方は手動コミット
|
|
|
|
self.old_cursor = self.old_conn.cursor()
|
|
self.new_cursor = self.new_conn.cursor()
|
|
|
|
logger.info("✅ データベース接続成功")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ データベース接続エラー: {e}")
|
|
return False
|
|
|
|
def close_connections(self):
|
|
"""データベース接続クローズ"""
|
|
try:
|
|
if self.old_cursor:
|
|
self.old_cursor.close()
|
|
if self.new_cursor:
|
|
self.new_cursor.close()
|
|
if self.old_conn:
|
|
self.old_conn.close()
|
|
if self.new_conn:
|
|
self.new_conn.close()
|
|
logger.info("データベース接続をクローズしました")
|
|
except Exception as e:
|
|
logger.warning(f"接続クローズ時の警告: {e}")
|
|
|
|
def get_rog_tables(self):
|
|
"""rog_で始まるテーブル一覧を取得"""
|
|
try:
|
|
# old_rogdbのrog_テーブル一覧
|
|
self.old_cursor.execute("""
|
|
SELECT table_name
|
|
FROM information_schema.tables
|
|
WHERE table_schema = 'public'
|
|
AND table_name LIKE 'rog_%'
|
|
ORDER BY table_name
|
|
""")
|
|
old_tables = [row[0] for row in self.old_cursor.fetchall()]
|
|
|
|
# rogdbのrog_テーブル一覧
|
|
self.new_cursor.execute("""
|
|
SELECT table_name
|
|
FROM information_schema.tables
|
|
WHERE table_schema = 'public'
|
|
AND table_name LIKE 'rog_%'
|
|
ORDER BY table_name
|
|
""")
|
|
new_tables = [row[0] for row in self.new_cursor.fetchall()]
|
|
|
|
# 共通テーブル
|
|
common_tables = list(set(old_tables) & set(new_tables))
|
|
|
|
logger.info(f"old_rogdb rog_テーブル: {len(old_tables)}個")
|
|
logger.info(f"rogdb rog_テーブル: {len(new_tables)}個")
|
|
logger.info(f"共通 rog_テーブル: {len(common_tables)}個")
|
|
|
|
return old_tables, new_tables, common_tables
|
|
|
|
except Exception as e:
|
|
logger.error(f"テーブル一覧取得エラー: {e}")
|
|
return [], [], []
|
|
|
|
def get_table_structure(self, table_name, cursor):
|
|
"""テーブル構造を取得 (エラーハンドリング強化)"""
|
|
try:
|
|
cursor.execute("""
|
|
SELECT column_name, data_type, is_nullable, column_default
|
|
FROM information_schema.columns
|
|
WHERE table_name = %s
|
|
AND table_schema = 'public'
|
|
ORDER BY ordinal_position
|
|
""", (table_name,))
|
|
|
|
columns = cursor.fetchall()
|
|
return {
|
|
'columns': [col[0] for col in columns],
|
|
'details': columns
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"テーブル構造取得エラー ({table_name}): {e}")
|
|
# トランザクションエラーを回避
|
|
try:
|
|
cursor.connection.rollback()
|
|
except:
|
|
pass
|
|
return {'columns': [], 'details': []}
|
|
|
|
def get_primary_key(self, table_name, cursor):
|
|
"""主キーカラムを取得"""
|
|
try:
|
|
cursor.execute("""
|
|
SELECT a.attname
|
|
FROM pg_index i
|
|
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
|
|
WHERE i.indrelid = %s::regclass AND i.indisprimary
|
|
""", (table_name,))
|
|
|
|
pk_columns = [row[0] for row in cursor.fetchall()]
|
|
return pk_columns if pk_columns else ['id']
|
|
|
|
except Exception as e:
|
|
logger.warning(f"主キー取得エラー ({table_name}): {e}")
|
|
return ['id'] # デフォルトでidを使用
|
|
|
|
def migrate_table_data(self, table_name):
|
|
"""個別テーブルのデータ移行 (エラー修正版)"""
|
|
logger.info(f"\n=== {table_name} データ移行開始 ===")
|
|
|
|
try:
|
|
# テーブル構造比較
|
|
old_structure = self.get_table_structure(table_name, self.old_cursor)
|
|
new_structure = self.get_table_structure(table_name, self.new_cursor)
|
|
|
|
old_columns = set(old_structure['columns'])
|
|
new_columns = set(new_structure['columns'])
|
|
common_columns = old_columns & new_columns
|
|
|
|
if not common_columns:
|
|
logger.warning(f"⚠️ {table_name}: 共通カラムがありません")
|
|
return {'inserted': 0, 'updated': 0, 'errors': 0}
|
|
|
|
logger.info(f"共通カラム ({len(common_columns)}個): {sorted(common_columns)}")
|
|
|
|
# 主キー取得
|
|
pk_columns = self.get_primary_key(table_name, self.new_cursor)
|
|
logger.info(f"主キー: {pk_columns}")
|
|
|
|
# カラム名を予約語対応でクォート
|
|
quoted_columns = [self.quote_column_if_needed(col) for col in common_columns]
|
|
columns_str = ', '.join(quoted_columns)
|
|
|
|
# old_rogdbからデータ取得
|
|
try:
|
|
self.old_cursor.execute(f"SELECT {columns_str} FROM {table_name}")
|
|
old_records = self.old_cursor.fetchall()
|
|
except Exception as e:
|
|
logger.error(f"❌ {table_name} データ取得エラー: {e}")
|
|
return {'inserted': 0, 'updated': 0, 'errors': 1}
|
|
|
|
if not old_records:
|
|
logger.info(f"✅ {table_name}: 移行対象データなし")
|
|
return {'inserted': 0, 'updated': 0, 'errors': 0}
|
|
|
|
logger.info(f"移行対象レコード数: {len(old_records)}件")
|
|
|
|
# 統計情報
|
|
inserted_count = 0
|
|
updated_count = 0
|
|
error_count = 0
|
|
|
|
# レコード移行処理
|
|
for i, record in enumerate(old_records):
|
|
try:
|
|
# レコードデータを辞書形式に変換
|
|
record_dict = dict(zip(common_columns, record))
|
|
|
|
# 主キー値を取得
|
|
pk_values = []
|
|
pk_conditions = []
|
|
for pk_col in pk_columns:
|
|
if pk_col in record_dict:
|
|
pk_values.append(record_dict[pk_col])
|
|
quoted_pk_col = self.quote_column_if_needed(pk_col)
|
|
pk_conditions.append(f"{quoted_pk_col} = %s")
|
|
|
|
if not pk_values:
|
|
error_count += 1
|
|
continue
|
|
|
|
# 既存レコード確認
|
|
check_query = f"SELECT COUNT(*) FROM {table_name} WHERE {' AND '.join(pk_conditions)}"
|
|
self.new_cursor.execute(check_query, pk_values)
|
|
exists = self.new_cursor.fetchone()[0] > 0
|
|
|
|
if exists:
|
|
# UPDATE処理
|
|
set_clauses = []
|
|
update_values = []
|
|
|
|
for col in common_columns:
|
|
if col not in pk_columns: # 主キー以外を更新
|
|
quoted_col = self.quote_column_if_needed(col)
|
|
set_clauses.append(f"{quoted_col} = %s")
|
|
update_values.append(record_dict[col])
|
|
|
|
if set_clauses:
|
|
update_query = f"""
|
|
UPDATE {table_name}
|
|
SET {', '.join(set_clauses)}
|
|
WHERE {' AND '.join(pk_conditions)}
|
|
"""
|
|
self.new_cursor.execute(update_query, update_values + pk_values)
|
|
updated_count += 1
|
|
|
|
else:
|
|
# INSERT処理
|
|
insert_columns = list(common_columns)
|
|
insert_values = [record_dict[col] for col in insert_columns]
|
|
quoted_insert_columns = [self.quote_column_if_needed(col) for col in insert_columns]
|
|
placeholders = ', '.join(['%s'] * len(insert_columns))
|
|
|
|
insert_query = f"""
|
|
INSERT INTO {table_name} ({', '.join(quoted_insert_columns)})
|
|
VALUES ({placeholders})
|
|
"""
|
|
self.new_cursor.execute(insert_query, insert_values)
|
|
inserted_count += 1
|
|
|
|
# 定期的なコミット
|
|
if (i + 1) % 100 == 0:
|
|
self.new_conn.commit()
|
|
logger.info(f" 進捗: {i + 1}/{len(old_records)} 件処理完了")
|
|
|
|
except Exception as e:
|
|
error_count += 1
|
|
logger.error(f" レコード処理エラー (行{i+1}): {e}")
|
|
# トランザクションロールバック
|
|
try:
|
|
self.new_conn.rollback()
|
|
except:
|
|
pass
|
|
|
|
if error_count > 10: # エラー上限
|
|
logger.error(f"❌ {table_name}: エラー数が上限を超えました")
|
|
break
|
|
|
|
# 最終コミット
|
|
try:
|
|
self.new_conn.commit()
|
|
except Exception as e:
|
|
logger.error(f"コミットエラー: {e}")
|
|
|
|
# 結果サマリー
|
|
stats = {
|
|
'inserted': inserted_count,
|
|
'updated': updated_count,
|
|
'errors': error_count,
|
|
'total': len(old_records)
|
|
}
|
|
|
|
logger.info(f"✅ {table_name} 移行完了:")
|
|
logger.info(f" 挿入: {inserted_count}件")
|
|
logger.info(f" 更新: {updated_count}件")
|
|
logger.info(f" エラー: {error_count}件")
|
|
|
|
self.migration_stats[table_name] = stats
|
|
return stats
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ {table_name} 移行エラー: {e}")
|
|
try:
|
|
self.new_conn.rollback()
|
|
except:
|
|
pass
|
|
return {'inserted': 0, 'updated': 0, 'errors': 1}
|
|
|
|
def run_migration(self, exclude_tables=None):
|
|
"""全体移行実行"""
|
|
if exclude_tables is None:
|
|
exclude_tables = []
|
|
|
|
logger.info("=" * 80)
|
|
logger.info("Old RogDB → RogDB データ移行開始")
|
|
logger.info("=" * 80)
|
|
|
|
try:
|
|
# データベース接続
|
|
if not self.connect_databases():
|
|
return False
|
|
|
|
# テーブル一覧取得
|
|
old_tables, new_tables, common_tables = self.get_rog_tables()
|
|
|
|
if not common_tables:
|
|
logger.error("❌ 移行対象テーブルがありません")
|
|
return False
|
|
|
|
# 除外テーブルをフィルタリング
|
|
migration_tables = [t for t in common_tables if t not in exclude_tables]
|
|
logger.info(f"移行対象テーブル ({len(migration_tables)}個): {migration_tables}")
|
|
|
|
# 各テーブルを移行
|
|
total_inserted = 0
|
|
total_updated = 0
|
|
total_errors = 0
|
|
|
|
for table_name in migration_tables:
|
|
stats = self.migrate_table_data(table_name)
|
|
total_inserted += stats['inserted']
|
|
total_updated += stats['updated']
|
|
total_errors += stats['errors']
|
|
|
|
# 全体サマリー出力
|
|
logger.info("=" * 80)
|
|
logger.info("移行完了サマリー")
|
|
logger.info("=" * 80)
|
|
logger.info(f"処理対象テーブル: {len(migration_tables)}個")
|
|
logger.info(f"総挿入件数: {total_inserted}件")
|
|
logger.info(f"総更新件数: {total_updated}件")
|
|
logger.info(f"総エラー件数: {total_errors}件")
|
|
|
|
logger.info("\n--- テーブル別詳細 ---")
|
|
for table_name, stats in self.migration_stats.items():
|
|
logger.info(f"{table_name}: 挿入{stats['inserted']}, 更新{stats['updated']}, エラー{stats['errors']}")
|
|
|
|
if total_errors > 0:
|
|
logger.warning(f"⚠️ {total_errors}件のエラーがありました")
|
|
else:
|
|
logger.info("✅ 全ての移行が正常に完了しました!")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ 移行処理エラー: {e}")
|
|
return False
|
|
|
|
finally:
|
|
self.close_connections()
|
|
|
|
def main():
|
|
"""メイン処理"""
|
|
logger.info("Old RogDB → RogDB データ移行スクリプト")
|
|
|
|
# 除外テーブル設定
|
|
exclude_tables = []
|
|
if os.getenv('EXCLUDE_TABLES'):
|
|
exclude_tables = [t.strip() for t in os.getenv('EXCLUDE_TABLES').split(',')]
|
|
logger.info(f"除外テーブル: {exclude_tables}")
|
|
|
|
# 移行実行
|
|
migrator = RogTableMigrator()
|
|
success = migrator.run_migration(exclude_tables=exclude_tables)
|
|
|
|
if success:
|
|
logger.info("移行処理が完了しました")
|
|
sys.exit(0)
|
|
else:
|
|
logger.error("移行処理が失敗しました")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|