#!/usr/bin/env python3 """ Old RogDB データ移行スクリプト (エラー修正版) old_rogdb の rog_* テーブルから rogdb の rog_* テーブルへデータを更新・挿入 """ import os import sys import psycopg2 from datetime import datetime, timezone from typing import Optional, Dict, List, Tuple import logging # ログ設定 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # データベース設定 OLD_ROGDB_CONFIG = { 'host': os.getenv('OLD_ROGDB_HOST', 'postgres-db'), 'database': os.getenv('OLD_ROGDB_NAME', 'old_rogdb'), 'user': os.getenv('OLD_ROGDB_USER', 'admin'), 'password': os.getenv('OLD_ROGDB_PASSWORD', 'admin123456'), 'port': int(os.getenv('OLD_ROGDB_PORT', 5432)) } ROGDB_CONFIG = { 'host': os.getenv('ROGDB_HOST', 'postgres-db'), 'database': os.getenv('ROGDB_NAME', 'rogdb'), 'user': os.getenv('ROGDB_USER', 'admin'), 'password': os.getenv('ROGDB_PASSWORD', 'admin123456'), 'port': int(os.getenv('ROGDB_PORT', 5432)) } # PostgreSQL予約語をクォートで囲む必要があるカラム RESERVED_KEYWORDS = { 'like', 'order', 'group', 'user', 'table', 'where', 'select', 'insert', 'update', 'delete', 'create', 'drop', 'alter', 'index', 'constraint' } class RogTableMigrator: """Rogテーブル移行クラス (エラー修正版)""" def __init__(self): self.old_conn = None self.new_conn = None self.old_cursor = None self.new_cursor = None self.migration_stats = {} def quote_column_if_needed(self, column_name): """予約語やキャメルケースの場合はダブルクォートで囲む""" # 予約語チェック if column_name.lower() in RESERVED_KEYWORDS: return f'"{column_name}"' # キャメルケースや大文字が含まれる場合もクォート if any(c.isupper() for c in column_name) or column_name != column_name.lower(): return f'"{column_name}"' return column_name def connect_databases(self): """データベース接続""" try: logger.info("データベースに接続中...") self.old_conn = psycopg2.connect(**OLD_ROGDB_CONFIG) self.new_conn = psycopg2.connect(**ROGDB_CONFIG) # autocommitを有効にしてトランザクション問題を回避 self.old_conn.autocommit = True self.new_conn.autocommit = False # 新しい方は手動コミット self.old_cursor = self.old_conn.cursor() self.new_cursor = self.new_conn.cursor() logger.info("✅ データベース接続成功") return True except Exception as e: logger.error(f"❌ データベース接続エラー: {e}") return False def close_connections(self): """データベース接続クローズ""" try: if self.old_cursor: self.old_cursor.close() if self.new_cursor: self.new_cursor.close() if self.old_conn: self.old_conn.close() if self.new_conn: self.new_conn.close() logger.info("データベース接続をクローズしました") except Exception as e: logger.warning(f"接続クローズ時の警告: {e}") def get_rog_tables(self): """rog_で始まるテーブル一覧を取得""" try: # old_rogdbのrog_テーブル一覧 self.old_cursor.execute(""" SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND table_name LIKE 'rog_%' ORDER BY table_name """) old_tables = [row[0] for row in self.old_cursor.fetchall()] # rogdbのrog_テーブル一覧 self.new_cursor.execute(""" SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND table_name LIKE 'rog_%' ORDER BY table_name """) new_tables = [row[0] for row in self.new_cursor.fetchall()] # 共通テーブル common_tables = list(set(old_tables) & set(new_tables)) logger.info(f"old_rogdb rog_テーブル: {len(old_tables)}個") logger.info(f"rogdb rog_テーブル: {len(new_tables)}個") logger.info(f"共通 rog_テーブル: {len(common_tables)}個") return old_tables, new_tables, common_tables except Exception as e: logger.error(f"テーブル一覧取得エラー: {e}") return [], [], [] def get_table_structure(self, table_name, cursor): """テーブル構造を取得 (エラーハンドリング強化)""" try: cursor.execute(""" SELECT column_name, data_type, is_nullable, column_default FROM information_schema.columns WHERE table_name = %s AND table_schema = 'public' ORDER BY ordinal_position """, (table_name,)) columns = cursor.fetchall() return { 'columns': [col[0] for col in columns], 'details': columns } except Exception as e: logger.error(f"テーブル構造取得エラー ({table_name}): {e}") # トランザクションエラーを回避 try: cursor.connection.rollback() except: pass return {'columns': [], 'details': []} def get_primary_key(self, table_name, cursor): """主キーカラムを取得""" try: cursor.execute(""" SELECT a.attname FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) WHERE i.indrelid = %s::regclass AND i.indisprimary """, (table_name,)) pk_columns = [row[0] for row in cursor.fetchall()] return pk_columns if pk_columns else ['id'] except Exception as e: logger.warning(f"主キー取得エラー ({table_name}): {e}") return ['id'] # デフォルトでidを使用 def migrate_table_data(self, table_name): """個別テーブルのデータ移行 (エラー修正版)""" logger.info(f"\n=== {table_name} データ移行開始 ===") try: # テーブル構造比較 old_structure = self.get_table_structure(table_name, self.old_cursor) new_structure = self.get_table_structure(table_name, self.new_cursor) old_columns = set(old_structure['columns']) new_columns = set(new_structure['columns']) common_columns = old_columns & new_columns if not common_columns: logger.warning(f"⚠️ {table_name}: 共通カラムがありません") return {'inserted': 0, 'updated': 0, 'errors': 0} logger.info(f"共通カラム ({len(common_columns)}個): {sorted(common_columns)}") # 主キー取得 pk_columns = self.get_primary_key(table_name, self.new_cursor) logger.info(f"主キー: {pk_columns}") # カラム名を予約語対応でクォート quoted_columns = [self.quote_column_if_needed(col) for col in common_columns] columns_str = ', '.join(quoted_columns) # old_rogdbからデータ取得 try: self.old_cursor.execute(f"SELECT {columns_str} FROM {table_name}") old_records = self.old_cursor.fetchall() except Exception as e: logger.error(f"❌ {table_name} データ取得エラー: {e}") return {'inserted': 0, 'updated': 0, 'errors': 1} if not old_records: logger.info(f"✅ {table_name}: 移行対象データなし") return {'inserted': 0, 'updated': 0, 'errors': 0} logger.info(f"移行対象レコード数: {len(old_records)}件") # 統計情報 inserted_count = 0 updated_count = 0 error_count = 0 # レコード移行処理 for i, record in enumerate(old_records): try: # レコードデータを辞書形式に変換 record_dict = dict(zip(common_columns, record)) # 主キー値を取得 pk_values = [] pk_conditions = [] for pk_col in pk_columns: if pk_col in record_dict: pk_values.append(record_dict[pk_col]) quoted_pk_col = self.quote_column_if_needed(pk_col) pk_conditions.append(f"{quoted_pk_col} = %s") if not pk_values: error_count += 1 continue # 既存レコード確認 check_query = f"SELECT COUNT(*) FROM {table_name} WHERE {' AND '.join(pk_conditions)}" self.new_cursor.execute(check_query, pk_values) exists = self.new_cursor.fetchone()[0] > 0 if exists: # UPDATE処理 set_clauses = [] update_values = [] for col in common_columns: if col not in pk_columns: # 主キー以外を更新 quoted_col = self.quote_column_if_needed(col) set_clauses.append(f"{quoted_col} = %s") update_values.append(record_dict[col]) if set_clauses: update_query = f""" UPDATE {table_name} SET {', '.join(set_clauses)} WHERE {' AND '.join(pk_conditions)} """ self.new_cursor.execute(update_query, update_values + pk_values) updated_count += 1 else: # INSERT処理 insert_columns = list(common_columns) insert_values = [record_dict[col] for col in insert_columns] quoted_insert_columns = [self.quote_column_if_needed(col) for col in insert_columns] placeholders = ', '.join(['%s'] * len(insert_columns)) insert_query = f""" INSERT INTO {table_name} ({', '.join(quoted_insert_columns)}) VALUES ({placeholders}) """ self.new_cursor.execute(insert_query, insert_values) inserted_count += 1 # 定期的なコミット if (i + 1) % 100 == 0: self.new_conn.commit() logger.info(f" 進捗: {i + 1}/{len(old_records)} 件処理完了") except Exception as e: error_count += 1 logger.error(f" レコード処理エラー (行{i+1}): {e}") # トランザクションロールバック try: self.new_conn.rollback() except: pass if error_count > 10: # エラー上限 logger.error(f"❌ {table_name}: エラー数が上限を超えました") break # 最終コミット try: self.new_conn.commit() except Exception as e: logger.error(f"コミットエラー: {e}") # 結果サマリー stats = { 'inserted': inserted_count, 'updated': updated_count, 'errors': error_count, 'total': len(old_records) } logger.info(f"✅ {table_name} 移行完了:") logger.info(f" 挿入: {inserted_count}件") logger.info(f" 更新: {updated_count}件") logger.info(f" エラー: {error_count}件") self.migration_stats[table_name] = stats return stats except Exception as e: logger.error(f"❌ {table_name} 移行エラー: {e}") try: self.new_conn.rollback() except: pass return {'inserted': 0, 'updated': 0, 'errors': 1} def run_migration(self, exclude_tables=None): """全体移行実行""" if exclude_tables is None: exclude_tables = [] logger.info("=" * 80) logger.info("Old RogDB → RogDB データ移行開始") logger.info("=" * 80) try: # データベース接続 if not self.connect_databases(): return False # テーブル一覧取得 old_tables, new_tables, common_tables = self.get_rog_tables() if not common_tables: logger.error("❌ 移行対象テーブルがありません") return False # 除外テーブルをフィルタリング migration_tables = [t for t in common_tables if t not in exclude_tables] logger.info(f"移行対象テーブル ({len(migration_tables)}個): {migration_tables}") # 各テーブルを移行 total_inserted = 0 total_updated = 0 total_errors = 0 for table_name in migration_tables: stats = self.migrate_table_data(table_name) total_inserted += stats['inserted'] total_updated += stats['updated'] total_errors += stats['errors'] # 全体サマリー出力 logger.info("=" * 80) logger.info("移行完了サマリー") logger.info("=" * 80) logger.info(f"処理対象テーブル: {len(migration_tables)}個") logger.info(f"総挿入件数: {total_inserted}件") logger.info(f"総更新件数: {total_updated}件") logger.info(f"総エラー件数: {total_errors}件") logger.info("\n--- テーブル別詳細 ---") for table_name, stats in self.migration_stats.items(): logger.info(f"{table_name}: 挿入{stats['inserted']}, 更新{stats['updated']}, エラー{stats['errors']}") if total_errors > 0: logger.warning(f"⚠️ {total_errors}件のエラーがありました") else: logger.info("✅ 全ての移行が正常に完了しました!") return True except Exception as e: logger.error(f"❌ 移行処理エラー: {e}") return False finally: self.close_connections() def main(): """メイン処理""" logger.info("Old RogDB → RogDB データ移行スクリプト") # 除外テーブル設定 exclude_tables = [] if os.getenv('EXCLUDE_TABLES'): exclude_tables = [t.strip() for t in os.getenv('EXCLUDE_TABLES').split(',')] logger.info(f"除外テーブル: {exclude_tables}") # 移行実行 migrator = RogTableMigrator() success = migrator.run_migration(exclude_tables=exclude_tables) if success: logger.info("移行処理が完了しました") sys.exit(0) else: logger.error("移行処理が失敗しました") sys.exit(1) if __name__ == "__main__": main()