#!/usr/bin/env python3 """ rog_team テーブル専用移行スクリプト (構造変換対応) old_rogdb の rog_team から rogdb の rog_team へ構造変換を行いながらデータ移行 """ import os import sys import psycopg2 from datetime import datetime, timezone import logging # ログ設定 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # データベース設定 OLD_ROGDB_CONFIG = { 'host': os.getenv('OLD_ROGDB_HOST', 'postgres-db'), 'database': os.getenv('OLD_ROGDB_NAME', 'old_rogdb'), 'user': os.getenv('OLD_ROGDB_USER', 'admin'), 'password': os.getenv('OLD_ROGDB_PASSWORD', 'admin123456'), 'port': int(os.getenv('OLD_ROGDB_PORT', 5432)) } ROGDB_CONFIG = { 'host': os.getenv('ROGDB_HOST', 'postgres-db'), 'database': os.getenv('ROGDB_NAME', 'rogdb'), 'user': os.getenv('ROGDB_USER', 'admin'), 'password': os.getenv('ROGDB_PASSWORD', 'admin123456'), 'port': int(os.getenv('ROGDB_PORT', 5432)) } class RogTeamMigrator: """rog_team テーブル専用移行クラス""" def __init__(self): self.old_conn = None self.new_conn = None self.old_cursor = None self.new_cursor = None self.default_event_id = None def connect_databases(self): """データベース接続""" try: logger.info("データベースに接続中...") self.old_conn = psycopg2.connect(**OLD_ROGDB_CONFIG) self.new_conn = psycopg2.connect(**ROGDB_CONFIG) self.old_conn.autocommit = True self.new_conn.autocommit = False self.old_cursor = self.old_conn.cursor() self.new_cursor = self.new_conn.cursor() logger.info("✅ データベース接続成功") return True except Exception as e: logger.error(f"❌ データベース接続エラー: {e}") return False def close_connections(self): """データベース接続クローズ""" try: if self.old_cursor: self.old_cursor.close() if self.new_cursor: self.new_cursor.close() if self.old_conn: self.old_conn.close() if self.new_conn: self.new_conn.close() logger.info("データベース接続をクローズしました") except Exception as e: logger.warning(f"接続クローズ時の警告: {e}") def get_team_zekken_and_event(self, team_id): """team_idから最新のrog_entryのzekken_numberとevent_idを取得""" try: # 旧DBのrog_entryから該当team_idの最新レコードを取得 self.old_cursor.execute(""" SELECT zekken_number, event_id, date FROM rog_entry WHERE team_id = %s AND zekken_number IS NOT NULL AND event_id IS NOT NULL ORDER BY date DESC, id DESC LIMIT 1 """, (team_id,)) result = self.old_cursor.fetchone() if result: zekken_number, event_id, entry_date = result logger.debug(f"team_id {team_id}: zekken_number={zekken_number}, event_id={event_id}, date={entry_date}") return str(zekken_number), event_id else: # rog_entryにレコードがない場合はデフォルト値を返す logger.warning(f"team_id {team_id}: rog_entryにレコードが見つかりません") return '', self.default_event_id except Exception as e: logger.error(f"team_id {team_id} のzekken_number/event_id取得エラー: {e}") return '', self.default_event_id def get_default_event_id(self): """デフォルトのevent_idを取得または作成""" try: # 既存のイベントを探す self.new_cursor.execute(""" SELECT id FROM rog_newevent2 WHERE event_name LIKE '%移行%' OR event_name LIKE '%default%' ORDER BY id LIMIT 1 """) result = self.new_cursor.fetchone() if result: event_id = result[0] logger.info(f"既存のデフォルトイベントを使用: event_id = {event_id}") return event_id # なければ最初のイベントを使用 self.new_cursor.execute(""" SELECT id FROM rog_newevent2 ORDER BY id LIMIT 1 """) result = self.new_cursor.fetchone() if result: event_id = result[0] logger.info(f"最初のイベントをデフォルトとして使用: event_id = {event_id}") return event_id # イベントがない場合はエラー logger.error("❌ rog_newevent2 テーブルにイベントが存在しません") return None except Exception as e: logger.error(f"❌ デフォルトevent_id取得エラー: {e}") return None def get_category_mapping(self): """カテゴリIDのマッピングを確認""" try: # 旧DBのカテゴリ self.old_cursor.execute("SELECT id, category_name FROM rog_newcategory ORDER BY id") old_categories = dict(self.old_cursor.fetchall()) # 新DBのカテゴリ self.new_cursor.execute("SELECT id, category_name FROM rog_newcategory ORDER BY id") new_categories = dict(self.new_cursor.fetchall()) logger.info(f"旧DB カテゴリ: {old_categories}") logger.info(f"新DB カテゴリ: {new_categories}") # 名前ベースでマッピング作成 category_mapping = {} for old_id, old_name in old_categories.items(): for new_id, new_name in new_categories.items(): if old_name == new_name: category_mapping[old_id] = new_id break else: # マッチしない場合は最初のカテゴリを使用 if new_categories: category_mapping[old_id] = min(new_categories.keys()) logger.warning(f"カテゴリマッピング失敗 - デフォルト使用: {old_id} -> {category_mapping[old_id]}") return category_mapping except Exception as e: logger.error(f"❌ カテゴリマッピング取得エラー: {e}") return {} def convert_team_record(self, old_record, category_mapping): """旧レコードを新レコード形式に変換""" old_id, old_team_name, old_category_id, old_owner_id = old_record # team_idから最新のzekken_numberとevent_idを取得 zekken_number, event_id = self.get_team_zekken_and_event(old_id) # 新しいレコード作成 new_record = { 'id': old_id, 'team_name': old_team_name, 'category_id': category_mapping.get(old_category_id, old_category_id), 'owner_id': old_owner_id, # 新しいフィールドにデフォルト値を設定 'class_name': '', # 空文字列 'event_id': event_id, # rog_entryから取得したevent_id 'location': None, # PostGIS座標は後で設定可能 'password': '', # パスワードなし 'trial': False, # 本番チーム 'zekken_number': zekken_number, # rog_entryから取得したzekken_number 'created_at': datetime.now(timezone.utc), 'updated_at': datetime.now(timezone.utc) } return new_record def migrate_rog_team(self): """rog_team テーブルのデータ移行""" logger.info("=" * 60) logger.info("rog_team テーブル構造変換移行開始") logger.info("=" * 60) try: # デフォルトevent_id取得 self.default_event_id = self.get_default_event_id() if not self.default_event_id: return False # カテゴリマッピング取得 category_mapping = self.get_category_mapping() # 旧データ取得 logger.info("旧rog_teamデータを取得中...") self.old_cursor.execute(""" SELECT id, team_name, category_id, owner_id FROM rog_team ORDER BY id """) old_records = self.old_cursor.fetchall() if not old_records: logger.info("✅ 移行対象データがありません") return True logger.info(f"移行対象レコード数: {len(old_records)}件") # 統計情報 inserted_count = 0 updated_count = 0 error_count = 0 zekken_resolved_count = 0 constraint_avoided_count = 0 # レコード別処理 for i, old_record in enumerate(old_records): try: # レコード変換 new_record = self.convert_team_record(old_record, category_mapping) team_id = new_record['id'] # zekken_number解決統計 if new_record['zekken_number']: zekken_resolved_count += 1 # 既存レコード確認(IDベース) self.new_cursor.execute( "SELECT COUNT(*) FROM rog_team WHERE id = %s", (team_id,) ) exists_by_id = self.new_cursor.fetchone()[0] > 0 # 重複制約確認(zekken_number + event_id の組み合わせ) if new_record['zekken_number']: # zekken_numberが空でない場合のみチェック self.new_cursor.execute( "SELECT COUNT(*) FROM rog_team WHERE zekken_number = %s AND event_id = %s", (new_record['zekken_number'], new_record['event_id']) ) exists_by_constraint = self.new_cursor.fetchone()[0] > 0 if exists_by_constraint and not exists_by_id: # 制約違反が発生する場合は、zekken_numberを空にしてデフォルトevent_idを使用 logger.warning(f"Team ID {team_id}: zekken_number制約回避のため空文字に変更") new_record['zekken_number'] = '' new_record['event_id'] = self.default_event_id constraint_avoided_count += 1 if exists_by_id: # UPDATE処理 update_query = """ UPDATE rog_team SET team_name = %s, category_id = %s, owner_id = %s, class_name = %s, event_id = %s, location = %s, password = %s, trial = %s, zekken_number = %s, updated_at = %s WHERE id = %s """ self.new_cursor.execute(update_query, ( new_record['team_name'], new_record['category_id'], new_record['owner_id'], new_record['class_name'], new_record['event_id'], new_record['location'], new_record['password'], new_record['trial'], new_record['zekken_number'], new_record['updated_at'], team_id )) updated_count += 1 else: # INSERT処理 insert_query = """ INSERT INTO rog_team ( id, team_name, category_id, owner_id, class_name, event_id, location, password, trial, zekken_number, created_at, updated_at ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ) """ self.new_cursor.execute(insert_query, ( new_record['id'], new_record['team_name'], new_record['category_id'], new_record['owner_id'], new_record['class_name'], new_record['event_id'], new_record['location'], new_record['password'], new_record['trial'], new_record['zekken_number'], new_record['created_at'], new_record['updated_at'] )) inserted_count += 1 # 進捗表示 if (i + 1) % 50 == 0: self.new_conn.commit() logger.info(f" 進捗: {i + 1}/{len(old_records)} 件処理完了") except Exception as e: error_count += 1 logger.error(f" レコード処理エラー (ID: {old_record[0]}): {e}") # トランザクションロールバック try: self.new_conn.rollback() except: pass if error_count > 10: logger.error("❌ エラー数が上限を超えました") break # 最終コミット self.new_conn.commit() # 結果サマリー logger.info("=" * 60) logger.info("rog_team 移行完了") logger.info("=" * 60) logger.info(f"挿入: {inserted_count}件") logger.info(f"更新: {updated_count}件") logger.info(f"エラー: {error_count}件") logger.info(f"zekken_number解決: {zekken_resolved_count}件") logger.info(f"制約回避: {constraint_avoided_count}件") logger.info(f"総処理: {len(old_records)}件") if error_count == 0: logger.info("✅ rog_team移行が正常に完了しました!") return True else: logger.warning(f"⚠️ {error_count}件のエラーがありました") return False except Exception as e: logger.error(f"❌ rog_team移行エラー: {e}") try: self.new_conn.rollback() except: pass return False def run(self): """移行実行""" try: if not self.connect_databases(): return False return self.migrate_rog_team() finally: self.close_connections() def main(): """メイン処理""" logger.info("rog_team テーブル構造変換移行スクリプト") migrator = RogTeamMigrator() success = migrator.run() if success: logger.info("🎉 移行が正常に完了しました!") else: logger.error("💥 移行中にエラーが発生しました") sys.exit(0 if success else 1) if __name__ == "__main__": main()