#!/usr/bin/env python3 """ rog_entry テーブル専用移行スクリプト (予約語・NULL値対応) old_rogdb の rog_entry から rogdb の rog_entry へデータ移行 """ import os import sys import psycopg2 from datetime import datetime, timezone import logging # ログ設定 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # データベース設定 OLD_ROGDB_CONFIG = { 'host': os.getenv('OLD_ROGDB_HOST', 'postgres-db'), 'database': os.getenv('OLD_ROGDB_NAME', 'old_rogdb'), 'user': os.getenv('OLD_ROGDB_USER', 'admin'), 'password': os.getenv('OLD_ROGDB_PASSWORD', 'admin123456'), 'port': int(os.getenv('OLD_ROGDB_PORT', 5432)) } ROGDB_CONFIG = { 'host': os.getenv('ROGDB_HOST', 'postgres-db'), 'database': os.getenv('ROGDB_NAME', 'rogdb'), 'user': os.getenv('ROGDB_USER', 'admin'), 'password': os.getenv('ROGDB_PASSWORD', 'admin123456'), 'port': int(os.getenv('ROGDB_PORT', 5432)) } class RogEntryMigrator: """rog_entry テーブル専用移行クラス""" def __init__(self): self.old_conn = None self.new_conn = None self.old_cursor = None self.new_cursor = None def connect_databases(self): """データベース接続""" try: logger.info("データベースに接続中...") self.old_conn = psycopg2.connect(**OLD_ROGDB_CONFIG) self.new_conn = psycopg2.connect(**ROGDB_CONFIG) self.old_conn.autocommit = True self.new_conn.autocommit = False self.old_cursor = self.old_conn.cursor() self.new_cursor = self.new_conn.cursor() logger.info("✅ データベース接続成功") return True except Exception as e: logger.error(f"❌ データベース接続エラー: {e}") return False def close_connections(self): """データベース接続クローズ""" try: if self.old_cursor: self.old_cursor.close() if self.new_cursor: self.new_cursor.close() if self.old_conn: self.old_conn.close() if self.new_conn: self.new_conn.close() logger.info("データベース接続をクローズしました") except Exception as e: logger.warning(f"接続クローズ時の警告: {e}") def quote_column_if_needed(self, column_name): """予約語やキャメルケースの場合はダブルクォートで囲む""" # camelCaseの場合はクォート if any(c.isupper() for c in column_name): return f'"{column_name}"' return column_name def handle_null_values(self, column_name, value): """NULL値の処理とデフォルト値設定""" if value is not None: return value # rog_entryテーブル固有のデフォルト値 defaults = { 'is_active': True, 'is_trial': False, 'hasGoaled': False, 'hasParticipated': False, 'zekken_label': '', 'zekken_number': 0 } if column_name in defaults: default_value = defaults[column_name] logger.debug(f"NULL値をデフォルト値に変換: {column_name} = {default_value}") return default_value # デフォルト値が見つからない場合はNULLを返す logger.warning(f"デフォルト値が設定されていません: {column_name}") return None def validate_foreign_keys(self): """外部キー参照の整合性をチェック""" logger.info("外部キー参照の整合性をチェック中...") # team_id の存在確認 self.old_cursor.execute("SELECT DISTINCT team_id FROM rog_entry WHERE team_id IS NOT NULL") old_team_ids = [row[0] for row in self.old_cursor.fetchall()] self.new_cursor.execute("SELECT id FROM rog_team") new_team_ids = [row[0] for row in self.new_cursor.fetchall()] missing_teams = set(old_team_ids) - set(new_team_ids) if missing_teams: logger.warning(f"⚠️ 新DBに存在しないteam_id: {missing_teams}") logger.warning("先にrog_teamの移行を完了してください") return False # event_id の存在確認 self.old_cursor.execute("SELECT DISTINCT event_id FROM rog_entry WHERE event_id IS NOT NULL") old_event_ids = [row[0] for row in self.old_cursor.fetchall()] self.new_cursor.execute("SELECT id FROM rog_newevent2") new_event_ids = [row[0] for row in self.new_cursor.fetchall()] missing_events = set(old_event_ids) - set(new_event_ids) if missing_events: logger.warning(f"⚠️ 新DBに存在しないevent_id: {missing_events}") logger.warning("先にrog_newevent2の移行を完了してください") return False # category_id の存在確認 self.old_cursor.execute("SELECT DISTINCT category_id FROM rog_entry WHERE category_id IS NOT NULL") old_category_ids = [row[0] for row in self.old_cursor.fetchall()] self.new_cursor.execute("SELECT id FROM rog_newcategory") new_category_ids = [row[0] for row in self.new_cursor.fetchall()] missing_categories = set(old_category_ids) - set(new_category_ids) if missing_categories: logger.warning(f"⚠️ 新DBに存在しないcategory_id: {missing_categories}") logger.warning("先にrog_newcategoryの移行を完了してください") return False logger.info("✅ 外部キー参照の整合性チェック完了") return True def migrate_rog_entry(self): """rog_entry テーブルのデータ移行""" logger.info("=" * 60) logger.info("rog_entry テーブルデータ移行開始") logger.info("=" * 60) try: # 外部キー整合性チェック if not self.validate_foreign_keys(): logger.error("❌ 外部キー整合性チェックに失敗しました") return False # 旧データ取得(camelCaseカラム名をクォート) logger.info("旧rog_entryデータを取得中...") self.old_cursor.execute(""" SELECT id, date, category_id, event_id, owner_id, team_id, is_active, zekken_number, "hasGoaled", "hasParticipated", zekken_label, is_trial FROM rog_entry ORDER BY id """) old_records = self.old_cursor.fetchall() if not old_records: logger.info("✅ 移行対象データがありません") return True logger.info(f"移行対象レコード数: {len(old_records)}件") # 統計情報 inserted_count = 0 updated_count = 0 error_count = 0 # レコード別処理 for i, old_record in enumerate(old_records): try: # レコードデータの展開とNULL値処理 entry_id, date, category_id, event_id, owner_id, team_id, \ is_active, zekken_number, hasGoaled, hasParticipated, \ zekken_label, is_trial = old_record # NULL値処理 processed_record = { 'id': entry_id, 'date': date, 'category_id': category_id, 'event_id': event_id, 'owner_id': owner_id, 'team_id': team_id, 'is_active': self.handle_null_values('is_active', is_active), 'zekken_number': self.handle_null_values('zekken_number', zekken_number), 'hasGoaled': self.handle_null_values('hasGoaled', hasGoaled), 'hasParticipated': self.handle_null_values('hasParticipated', hasParticipated), 'zekken_label': self.handle_null_values('zekken_label', zekken_label), 'is_trial': self.handle_null_values('is_trial', is_trial) } # 既存レコード確認 self.new_cursor.execute( "SELECT COUNT(*) FROM rog_entry WHERE id = %s", (entry_id,) ) exists = self.new_cursor.fetchone()[0] > 0 if exists: # UPDATE処理(camelCaseカラムをクォート) update_query = """ UPDATE rog_entry SET date = %s, category_id = %s, event_id = %s, owner_id = %s, team_id = %s, is_active = %s, zekken_number = %s, "hasGoaled" = %s, "hasParticipated" = %s, zekken_label = %s, is_trial = %s WHERE id = %s """ self.new_cursor.execute(update_query, ( processed_record['date'], processed_record['category_id'], processed_record['event_id'], processed_record['owner_id'], processed_record['team_id'], processed_record['is_active'], processed_record['zekken_number'], processed_record['hasGoaled'], processed_record['hasParticipated'], processed_record['zekken_label'], processed_record['is_trial'], entry_id )) updated_count += 1 else: # INSERT処理(camelCaseカラムをクォート) insert_query = """ INSERT INTO rog_entry ( id, date, category_id, event_id, owner_id, team_id, is_active, zekken_number, "hasGoaled", "hasParticipated", zekken_label, is_trial ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ) """ self.new_cursor.execute(insert_query, ( processed_record['id'], processed_record['date'], processed_record['category_id'], processed_record['event_id'], processed_record['owner_id'], processed_record['team_id'], processed_record['is_active'], processed_record['zekken_number'], processed_record['hasGoaled'], processed_record['hasParticipated'], processed_record['zekken_label'], processed_record['is_trial'] )) inserted_count += 1 # 進捗表示とコミット if (i + 1) % 100 == 0: self.new_conn.commit() logger.info(f" 進捗: {i + 1}/{len(old_records)} 件処理完了") except Exception as e: error_count += 1 logger.error(f" レコード処理エラー (ID: {entry_id}): {e}") # トランザクションロールバック try: self.new_conn.rollback() except: pass if error_count > 10: logger.error("❌ エラー数が上限を超えました") break # 最終コミット self.new_conn.commit() # 結果サマリー logger.info("=" * 60) logger.info("rog_entry 移行完了") logger.info("=" * 60) logger.info(f"挿入: {inserted_count}件") logger.info(f"更新: {updated_count}件") logger.info(f"エラー: {error_count}件") logger.info(f"総処理: {len(old_records)}件") if error_count == 0: logger.info("✅ rog_entry移行が正常に完了しました!") return True else: logger.warning(f"⚠️ {error_count}件のエラーがありました") return False except Exception as e: logger.error(f"❌ rog_entry移行エラー: {e}") try: self.new_conn.rollback() except: pass return False def run(self): """移行実行""" try: if not self.connect_databases(): return False return self.migrate_rog_entry() finally: self.close_connections() def main(): """メイン処理""" logger.info("rog_entry テーブル移行スクリプト") migrator = RogEntryMigrator() success = migrator.run() if success: logger.info("🎉 移行が正常に完了しました!") else: logger.error("💥 移行中にエラーが発生しました") sys.exit(0 if success else 1) if __name__ == "__main__": main()