#!/usr/bin/env python3 """ バックアップからのコアデータ復元スクリプト(Location2025対応) testdb/rogdb.sqlからentry、team、memberデータを選択的に復元する Location2025テーブルとの整合性を確認し、必要に応じて警告を表示する """ import os import sys import psycopg2 import subprocess def check_existing_data(cursor): """既存データの確認""" print("=== 既存データ確認 ===") tables = ['rog_entry', 'rog_team', 'rog_member', 'rog_entrymember'] counts = {} for table in tables: cursor.execute(f"SELECT COUNT(*) FROM {table}") counts[table] = cursor.fetchone()[0] print(f"{table}: {counts[table]} 件") # Location2025データも確認 try: cursor.execute("SELECT COUNT(*) FROM rog_location2025") location2025_count = cursor.fetchone()[0] print(f"rog_location2025: {location2025_count} 件") counts['rog_location2025'] = location2025_count except Exception as e: print(f"rog_location2025: 確認エラー ({e})") counts['rog_location2025'] = 0 return counts def extract_core_data_from_backup(): """バックアップファイルからコアデータ部分を抽出""" backup_file = '/app/testdb/rogdb.sql' temp_file = '/tmp/core_data_restore.sql' if not os.path.exists(backup_file): print(f"エラー: バックアップファイルが見つかりません: {backup_file}") return None print(f"バックアップファイルからコアデータを抽出中: {backup_file}") with open(backup_file, 'r', encoding='utf-8') as f_in, open(temp_file, 'w', encoding='utf-8') as f_out: in_data_section = False current_table = None for line_num, line in enumerate(f_in, 1): # COPYコマンドの開始を検出 if line.startswith('COPY public.rog_entry '): current_table = 'rog_entry' in_data_section = True f_out.write(line) print(f"rog_entry データセクション開始 (行 {line_num})") elif line.startswith('COPY public.rog_team '): current_table = 'rog_team' in_data_section = True f_out.write(line) print(f"rog_team データセクション開始 (行 {line_num})") elif line.startswith('COPY public.rog_member '): current_table = 'rog_member' in_data_section = True f_out.write(line) print(f"rog_member データセクション開始 (行 {line_num})") elif line.startswith('COPY public.rog_entrymember '): current_table = 'rog_entrymember' in_data_section = True f_out.write(line) print(f"rog_entrymember データセクション開始 (行 {line_num})") elif in_data_section: f_out.write(line) # データセクションの終了を検出 if line.strip() == '\\.': print(f"{current_table} データセクション終了 (行 {line_num})") in_data_section = False current_table = None return temp_file def restore_core_data(cursor, restore_file): """コアデータの復元""" print(f"=== コアデータ復元実行 ===") print(f"復元ファイル: {restore_file}") try: # 外部キー制約を一時的に無効化 cursor.execute("SET session_replication_role = replica;") # 既存のコアデータをクリーンアップ(バックアップ前に実行) print("既存のコアデータをクリーンアップ中...") cursor.execute("DELETE FROM rog_entrymember") cursor.execute("DELETE FROM rog_entry") cursor.execute("DELETE FROM rog_member") cursor.execute("DELETE FROM rog_team") # SQLファイルをCOPYコマンド毎に分割して実行 print("バックアップデータを復元中...") with open(restore_file, 'r', encoding='utf-8') as f: content = f.read() # COPYコマンドごとに分割処理 sections = content.split('COPY ') for section in sections[1:]: # 最初の空セクションをスキップ if not section.strip(): continue copy_command = 'COPY ' + section lines = copy_command.split('\n') # COPYコマンドの最初の行を取得 copy_line = lines[0] if 'FROM stdin;' not in copy_line: continue # データ行を取得(COPYコマンドの次の行から \ . まで) data_lines = [] in_data = False for line in lines[1:]: if in_data: if line.strip() == '\.': break data_lines.append(line) elif not in_data and line.strip(): in_data = True if line.strip() != '\.': data_lines.append(line) if data_lines: # COPYコマンドを実行 print(f"復元中: {copy_line.split('(')[0]}...") cursor.execute(copy_line) # データを挿入 for data_line in data_lines: if data_line.strip(): cursor.execute(f"INSERT INTO {copy_line.split()[1]} VALUES ({data_line})") print("復元完了") except Exception as e: print(f"復元エラー: {e}") raise finally: # 外部キー制約を再有効化 cursor.execute("SET session_replication_role = DEFAULT;") def verify_restoration(cursor): """復元結果の検証""" print("\n=== 復元結果検証 ===") # データ数確認 counts = check_existing_data(cursor) # サンプルデータ確認 print("\n=== サンプルデータ確認 ===") # Entry サンプル cursor.execute(""" SELECT id, zekken_number, team_id, event_id FROM rog_entry ORDER BY id LIMIT 5 """) print("rog_entry サンプル:") for row in cursor.fetchall(): print(f" ID:{row[0]} ゼッケン:{row[1]} チーム:{row[2]} イベント:{row[3]}") # Team サンプル cursor.execute(""" SELECT id, team_name, category_id, owner_id FROM rog_team ORDER BY id LIMIT 5 """) print("rog_team サンプル:") for row in cursor.fetchall(): print(f" ID:{row[0]} チーム名:{row[1]} カテゴリ:{row[2]} オーナー:{row[3]}") # 復元成功の判定 if counts['rog_entry'] > 0 and counts['rog_team'] > 0: print(f"\n✅ 復元成功: Entry {counts['rog_entry']}件, Team {counts['rog_team']}件復元") return True else: print(f"\n❌ 復元失敗: データが復元されませんでした") return False def verify_location2025_post_restore(cursor): """復元後のLocation2025との整合性確認""" print("\n=== Location2025整合性確認 ===") try: # Location2025テーブルの存在確認 cursor.execute(""" SELECT COUNT(*) FROM information_schema.tables WHERE table_name = 'rog_location2025' """) table_exists = cursor.fetchone()[0] > 0 if table_exists: cursor.execute("SELECT COUNT(*) FROM rog_location2025") location2025_count = cursor.fetchone()[0] if location2025_count > 0: print(f"✅ Location2025テーブル利用可能: {location2025_count}件のチェックポイント") # イベント連携確認 cursor.execute(""" SELECT COUNT(DISTINCT e.event_code) FROM rog_location2025 l JOIN rog_newevent2 e ON l.event_id = e.id """) linked_events = cursor.fetchone()[0] print(f"✅ イベント連携: {linked_events}個のイベントでチェックポイント定義済み") return True else: print("⚠️ Location2025テーブルは存在しますが、チェックポイントが定義されていません") print(" Django管理画面でCSVアップロード機能を使用してチェックポイントを追加してください") return False else: print("⚠️ Location2025テーブルが見つかりません") print(" Location2025機能を使用するには、Django migrationsを実行してください") return False except Exception as e: print(f"❌ Location2025整合性確認エラー: {e}") return False def main(): """メイン復元処理""" print("=== バックアップからのコアデータ復元開始 ===") # データベース接続 try: conn = psycopg2.connect( host='postgres-db', # Docker環境での接続 port=5432, database='rogdb', user='admin', password='admin123456' ) cursor = conn.cursor() print("データベース接続成功") except Exception as e: print(f"データベース接続エラー: {e}") sys.exit(1) try: # 既存データ確認 existing_counts = check_existing_data(cursor) # 既存データがある場合の確認 if any(count > 0 for count in existing_counts.values()): response = input("既存のコアデータが検出されました。上書きしますか? (y/N): ") if response.lower() != 'y': print("復元を中止しました") return # バックアップからコアデータを抽出 restore_file = extract_core_data_from_backup() if not restore_file: print("コアデータの抽出に失敗しました") sys.exit(1) # コアデータ復元 restore_core_data(cursor, restore_file) conn.commit() # 復元結果検証 success = verify_restoration(cursor) # Location2025整合性確認 location2025_compatible = verify_location2025_post_restore(cursor) # 一時ファイル削除 if os.path.exists(restore_file): os.remove(restore_file) print(f"一時ファイル削除: {restore_file}") if success: print("\n🎉 コアデータ復元完了") print("supervisor画面でゼッケン番号候補が表示されるようになります") if location2025_compatible: print("✅ Location2025との整合性も確認済みです") else: print("⚠️ Location2025の設定が必要です") else: print("\n❌ 復元に失敗しました") sys.exit(1) except Exception as e: print(f"復元処理エラー: {e}") conn.rollback() sys.exit(1) finally: conn.close() if __name__ == "__main__": main()