#!/usr/bin/env python3 """ rog_goalimages テーブル専用移行スクリプト (team_name → zekken_number変換対応) old_rogdb の rog_goalimages から rogdb の rog_goalimages へ zekken_numberがブランクの場合、team_nameからrog_entryを検索してzekken_numberを取得 """ import os import sys import psycopg2 from datetime import datetime, timezone import logging # ログ設定 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # データベース設定 OLD_ROGDB_CONFIG = { 'host': os.getenv('OLD_ROGDB_HOST', 'postgres-db'), 'database': os.getenv('OLD_ROGDB_NAME', 'old_rogdb'), 'user': os.getenv('OLD_ROGDB_USER', 'admin'), 'password': os.getenv('OLD_ROGDB_PASSWORD', 'admin123456'), 'port': int(os.getenv('OLD_ROGDB_PORT', 5432)) } ROGDB_CONFIG = { 'host': os.getenv('ROGDB_HOST', 'postgres-db'), 'database': os.getenv('ROGDB_NAME', 'rogdb'), 'user': os.getenv('ROGDB_USER', 'admin'), 'password': os.getenv('ROGDB_PASSWORD', 'admin123456'), 'port': int(os.getenv('ROGDB_PORT', 5432)) } class RogGoalImagesMigrator: """rog_goalimages テーブル専用移行クラス""" def __init__(self): self.old_conn = None self.new_conn = None self.old_cursor = None self.new_cursor = None self.team_zekken_cache = {} # team_name → zekken_number キャッシュ def connect_databases(self): """データベース接続""" try: logger.info("データベースに接続中...") self.old_conn = psycopg2.connect(**OLD_ROGDB_CONFIG) self.new_conn = psycopg2.connect(**ROGDB_CONFIG) self.old_conn.autocommit = True self.new_conn.autocommit = False self.old_cursor = self.old_conn.cursor() self.new_cursor = self.new_conn.cursor() logger.info("✅ データベース接続成功") return True except Exception as e: logger.error(f"❌ データベース接続エラー: {e}") return False def close_connections(self): """データベース接続クローズ""" try: if self.old_cursor: self.old_cursor.close() if self.new_cursor: self.new_cursor.close() if self.old_conn: self.old_conn.close() if self.new_conn: self.new_conn.close() logger.info("データベース接続をクローズしました") except Exception as e: logger.warning(f"接続クローズ時の警告: {e}") def build_team_zekken_cache(self): """team_name → zekken_number のキャッシュを構築""" logger.info("team_name → zekken_number キャッシュを構築中...") try: # 新DBのrog_entryから team_name → zekken_number マッピングを取得 self.new_cursor.execute(""" SELECT DISTINCT t.team_name, e.zekken_number FROM rog_entry e JOIN rog_team t ON e.team_id = t.id WHERE t.team_name IS NOT NULL AND e.zekken_number IS NOT NULL ORDER BY t.team_name, e.zekken_number """) team_zekken_pairs = self.new_cursor.fetchall() for team_name, zekken_number in team_zekken_pairs: if team_name not in self.team_zekken_cache: self.team_zekken_cache[team_name] = zekken_number logger.debug(f"キャッシュ追加: {team_name} → {zekken_number}") logger.info(f"✅ キャッシュ構築完了: {len(self.team_zekken_cache)}件のteam_name → zekken_numberマッピング") # キャッシュ内容の一部をログ出力 if self.team_zekken_cache: sample_items = list(self.team_zekken_cache.items())[:5] logger.info(f"キャッシュサンプル: {sample_items}") return True except Exception as e: logger.error(f"❌ キャッシュ構築エラー: {e}") return False def resolve_zekken_number(self, old_zekken_number, team_name): """zekken_numberを解決(ブランクの場合はteam_nameから取得)""" # zekken_numberが既に設定されている場合はそのまま使用 if old_zekken_number and old_zekken_number.strip(): return old_zekken_number.strip() # team_nameからzekken_numberを検索 if team_name and team_name.strip(): clean_team_name = team_name.strip() if clean_team_name in self.team_zekken_cache: resolved_zekken = self.team_zekken_cache[clean_team_name] logger.debug(f"team_name '{clean_team_name}' → zekken_number '{resolved_zekken}'") return str(resolved_zekken) else: logger.warning(f"team_name '{clean_team_name}' に対応するzekken_numberが見つかりません") # 解決できない場合は空文字列を返す logger.warning(f"zekken_number解決失敗: zekken='{old_zekken_number}', team='{team_name}'") return '' def get_old_goalimages_structure(self): """旧DBのrog_goalimagesテーブル構造を取得""" try: self.old_cursor.execute(""" SELECT column_name, data_type, is_nullable FROM information_schema.columns WHERE table_name = 'rog_goalimages' AND table_schema = 'public' ORDER BY ordinal_position """) columns = self.old_cursor.fetchall() column_names = [col[0] for col in columns] logger.info(f"旧DBのrog_goalimagesカラム: {column_names}") return column_names except Exception as e: logger.error(f"❌ 旧DBテーブル構造取得エラー: {e}") return [] def migrate_rog_goalimages(self): """rog_goalimages テーブルのデータ移行""" logger.info("=" * 60) logger.info("rog_goalimages テーブルデータ移行開始") logger.info("=" * 60) try: # team_name → zekken_number キャッシュ構築 if not self.build_team_zekken_cache(): logger.error("❌ キャッシュ構築に失敗しました") return False # 旧DBテーブル構造確認 old_columns = self.get_old_goalimages_structure() if not old_columns: logger.error("❌ 旧DBのテーブル構造を取得できませんでした") return False # 旧データ取得 logger.info("旧rog_goalimagesデータを取得中...") # カラム存在チェック has_zekken_number = 'zekken_number' in old_columns if has_zekken_number: select_query = """ SELECT id, goalimage, goaltime, team_name, event_code, cp_number, user_id, zekken_number FROM rog_goalimages ORDER BY id """ else: select_query = """ SELECT id, goalimage, goaltime, team_name, event_code, cp_number, user_id, NULL as zekken_number FROM rog_goalimages ORDER BY id """ self.old_cursor.execute(select_query) old_records = self.old_cursor.fetchall() if not old_records: logger.info("✅ 移行対象データがありません") return True logger.info(f"移行対象レコード数: {len(old_records)}件") # 統計情報 inserted_count = 0 updated_count = 0 error_count = 0 zekken_resolved_count = 0 # レコード別処理 for i, old_record in enumerate(old_records): try: # レコードデータの展開 record_id, goalimage, goaltime, team_name, event_code, \ cp_number, user_id, old_zekken_number = old_record # zekken_number解決 resolved_zekken_number = self.resolve_zekken_number(old_zekken_number, team_name) if not old_zekken_number and resolved_zekken_number: zekken_resolved_count += 1 # 新レコードデータ new_record = { 'id': record_id, 'goalimage': goalimage, 'goaltime': goaltime, 'team_name': team_name or '', 'event_code': event_code or '', 'cp_number': cp_number or 0, 'user_id': user_id, 'zekken_number': resolved_zekken_number } # 既存レコード確認 self.new_cursor.execute( "SELECT COUNT(*) FROM rog_goalimages WHERE id = %s", (record_id,) ) exists = self.new_cursor.fetchone()[0] > 0 if exists: # UPDATE処理 update_query = """ UPDATE rog_goalimages SET goalimage = %s, goaltime = %s, team_name = %s, event_code = %s, cp_number = %s, user_id = %s, zekken_number = %s WHERE id = %s """ self.new_cursor.execute(update_query, ( new_record['goalimage'], new_record['goaltime'], new_record['team_name'], new_record['event_code'], new_record['cp_number'], new_record['user_id'], new_record['zekken_number'], record_id )) updated_count += 1 else: # INSERT処理 insert_query = """ INSERT INTO rog_goalimages ( id, goalimage, goaltime, team_name, event_code, cp_number, user_id, zekken_number ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s ) """ self.new_cursor.execute(insert_query, ( new_record['id'], new_record['goalimage'], new_record['goaltime'], new_record['team_name'], new_record['event_code'], new_record['cp_number'], new_record['user_id'], new_record['zekken_number'] )) inserted_count += 1 # 進捗表示とコミット if (i + 1) % 100 == 0: self.new_conn.commit() logger.info(f" 進捗: {i + 1}/{len(old_records)} 件処理完了") except Exception as e: error_count += 1 logger.error(f" レコード処理エラー (ID: {record_id}): {e}") # トランザクションロールバック try: self.new_conn.rollback() except: pass if error_count > 10: logger.error("❌ エラー数が上限を超えました") break # 最終コミット self.new_conn.commit() # 結果サマリー logger.info("=" * 60) logger.info("rog_goalimages 移行完了") logger.info("=" * 60) logger.info(f"挿入: {inserted_count}件") logger.info(f"更新: {updated_count}件") logger.info(f"エラー: {error_count}件") logger.info(f"zekken_number解決: {zekken_resolved_count}件") logger.info(f"総処理: {len(old_records)}件") if error_count == 0: logger.info("✅ rog_goalimages移行が正常に完了しました!") return True else: logger.warning(f"⚠️ {error_count}件のエラーがありました") return False except Exception as e: logger.error(f"❌ rog_goalimages移行エラー: {e}") try: self.new_conn.rollback() except: pass return False def run(self): """移行実行""" try: if not self.connect_databases(): return False return self.migrate_rog_goalimages() finally: self.close_connections() def main(): """メイン処理""" logger.info("rog_goalimages テーブル移行スクリプト (team_name → zekken_number変換対応)") migrator = RogGoalImagesMigrator() success = migrator.run() if success: logger.info("🎉 移行が正常に完了しました!") else: logger.error("💥 移行中にエラーが発生しました") sys.exit(0 if success else 1) if __name__ == "__main__": main()