Files
rogaining_srv/restore_core_data.py

315 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
バックアップからのコアデータ復元スクリプトLocation2025対応
testdb/rogdb.sqlからentry、team、memberデータを選択的に復元する
Location2025テーブルとの整合性を確認し、必要に応じて警告を表示する
"""
import os
import sys
import psycopg2
import subprocess
def check_existing_data(cursor):
"""既存データの確認"""
print("=== 既存データ確認 ===")
tables = ['rog_entry', 'rog_team', 'rog_member', 'rog_entrymember']
counts = {}
for table in tables:
cursor.execute(f"SELECT COUNT(*) FROM {table}")
counts[table] = cursor.fetchone()[0]
print(f"{table}: {counts[table]}")
# Location2025データも確認
try:
cursor.execute("SELECT COUNT(*) FROM rog_location2025")
location2025_count = cursor.fetchone()[0]
print(f"rog_location2025: {location2025_count}")
counts['rog_location2025'] = location2025_count
except Exception as e:
print(f"rog_location2025: 確認エラー ({e})")
counts['rog_location2025'] = 0
return counts
def extract_core_data_from_backup():
"""バックアップファイルからコアデータ部分を抽出"""
backup_file = '/app/testdb/rogdb.sql'
temp_file = '/tmp/core_data_restore.sql'
if not os.path.exists(backup_file):
print(f"エラー: バックアップファイルが見つかりません: {backup_file}")
return None
print(f"バックアップファイルからコアデータを抽出中: {backup_file}")
with open(backup_file, 'r', encoding='utf-8') as f_in, open(temp_file, 'w', encoding='utf-8') as f_out:
in_data_section = False
current_table = None
for line_num, line in enumerate(f_in, 1):
# COPYコマンドの開始を検出
if line.startswith('COPY public.rog_entry '):
current_table = 'rog_entry'
in_data_section = True
f_out.write(line)
print(f"rog_entry データセクション開始 (行 {line_num})")
elif line.startswith('COPY public.rog_team '):
current_table = 'rog_team'
in_data_section = True
f_out.write(line)
print(f"rog_team データセクション開始 (行 {line_num})")
elif line.startswith('COPY public.rog_member '):
current_table = 'rog_member'
in_data_section = True
f_out.write(line)
print(f"rog_member データセクション開始 (行 {line_num})")
elif line.startswith('COPY public.rog_entrymember '):
current_table = 'rog_entrymember'
in_data_section = True
f_out.write(line)
print(f"rog_entrymember データセクション開始 (行 {line_num})")
elif in_data_section:
f_out.write(line)
# データセクションの終了を検出
if line.strip() == '\\.':
print(f"{current_table} データセクション終了 (行 {line_num})")
in_data_section = False
current_table = None
return temp_file
def restore_core_data(cursor, restore_file):
"""コアデータの復元"""
print(f"=== コアデータ復元実行 ===")
print(f"復元ファイル: {restore_file}")
try:
# 外部キー制約を一時的に無効化
cursor.execute("SET session_replication_role = replica;")
# 既存のコアデータをクリーンアップ(バックアップ前に実行)
print("既存のコアデータをクリーンアップ中...")
cursor.execute("DELETE FROM rog_entrymember")
cursor.execute("DELETE FROM rog_entry")
cursor.execute("DELETE FROM rog_member")
cursor.execute("DELETE FROM rog_team")
# SQLファイルをCOPYコマンド毎に分割して実行
print("バックアップデータを復元中...")
with open(restore_file, 'r', encoding='utf-8') as f:
content = f.read()
# COPYコマンドごとに分割処理
sections = content.split('COPY ')
for section in sections[1:]: # 最初の空セクションをスキップ
if not section.strip():
continue
copy_command = 'COPY ' + section
lines = copy_command.split('\n')
# COPYコマンドの最初の行を取得
copy_line = lines[0]
if 'FROM stdin;' not in copy_line:
continue
# データ行を取得COPYコマンドの次の行から \ . まで)
data_lines = []
in_data = False
for line in lines[1:]:
if in_data:
if line.strip() == '\.':
break
data_lines.append(line)
elif not in_data and line.strip():
in_data = True
if line.strip() != '\.':
data_lines.append(line)
if data_lines:
# COPYコマンドを実行
print(f"復元中: {copy_line.split('(')[0]}...")
cursor.execute(copy_line)
# データを挿入
for data_line in data_lines:
if data_line.strip():
cursor.execute(f"INSERT INTO {copy_line.split()[1]} VALUES ({data_line})")
print("復元完了")
except Exception as e:
print(f"復元エラー: {e}")
raise
finally:
# 外部キー制約を再有効化
cursor.execute("SET session_replication_role = DEFAULT;")
def verify_restoration(cursor):
"""復元結果の検証"""
print("\n=== 復元結果検証 ===")
# データ数確認
counts = check_existing_data(cursor)
# サンプルデータ確認
print("\n=== サンプルデータ確認 ===")
# Entry サンプル
cursor.execute("""
SELECT id, zekken_number, team_id, event_id
FROM rog_entry
ORDER BY id
LIMIT 5
""")
print("rog_entry サンプル:")
for row in cursor.fetchall():
print(f" ID:{row[0]} ゼッケン:{row[1]} チーム:{row[2]} イベント:{row[3]}")
# Team サンプル
cursor.execute("""
SELECT id, team_name, category_id, owner_id
FROM rog_team
ORDER BY id
LIMIT 5
""")
print("rog_team サンプル:")
for row in cursor.fetchall():
print(f" ID:{row[0]} チーム名:{row[1]} カテゴリ:{row[2]} オーナー:{row[3]}")
# 復元成功の判定
if counts['rog_entry'] > 0 and counts['rog_team'] > 0:
print(f"\n✅ 復元成功: Entry {counts['rog_entry']}件, Team {counts['rog_team']}件復元")
return True
else:
print(f"\n❌ 復元失敗: データが復元されませんでした")
return False
def verify_location2025_post_restore(cursor):
"""復元後のLocation2025との整合性確認"""
print("\n=== Location2025整合性確認 ===")
try:
# Location2025テーブルの存在確認
cursor.execute("""
SELECT COUNT(*) FROM information_schema.tables
WHERE table_name = 'rog_location2025'
""")
table_exists = cursor.fetchone()[0] > 0
if table_exists:
cursor.execute("SELECT COUNT(*) FROM rog_location2025")
location2025_count = cursor.fetchone()[0]
if location2025_count > 0:
print(f"✅ Location2025テーブル利用可能: {location2025_count}件のチェックポイント")
# イベント連携確認
cursor.execute("""
SELECT COUNT(DISTINCT e.event_code)
FROM rog_location2025 l
JOIN rog_newevent2 e ON l.event_id = e.id
""")
linked_events = cursor.fetchone()[0]
print(f"✅ イベント連携: {linked_events}個のイベントでチェックポイント定義済み")
return True
else:
print("⚠️ Location2025テーブルは存在しますが、チェックポイントが定義されていません")
print(" Django管理画面でCSVアップロード機能を使用してチェックポイントを追加してください")
return False
else:
print("⚠️ Location2025テーブルが見つかりません")
print(" Location2025機能を使用するには、Django migrationsを実行してください")
return False
except Exception as e:
print(f"❌ Location2025整合性確認エラー: {e}")
return False
def main():
"""メイン復元処理"""
print("=== バックアップからのコアデータ復元開始 ===")
# データベース接続
try:
conn = psycopg2.connect(
host='postgres-db', # Docker環境での接続
port=5432,
database='rogdb',
user='admin',
password='admin123456'
)
cursor = conn.cursor()
print("データベース接続成功")
except Exception as e:
print(f"データベース接続エラー: {e}")
sys.exit(1)
try:
# 既存データ確認
existing_counts = check_existing_data(cursor)
# 既存データがある場合の確認
if any(count > 0 for count in existing_counts.values()):
response = input("既存のコアデータが検出されました。上書きしますか? (y/N): ")
if response.lower() != 'y':
print("復元を中止しました")
return
# バックアップからコアデータを抽出
restore_file = extract_core_data_from_backup()
if not restore_file:
print("コアデータの抽出に失敗しました")
sys.exit(1)
# コアデータ復元
restore_core_data(cursor, restore_file)
conn.commit()
# 復元結果検証
success = verify_restoration(cursor)
# Location2025整合性確認
location2025_compatible = verify_location2025_post_restore(cursor)
# 一時ファイル削除
if os.path.exists(restore_file):
os.remove(restore_file)
print(f"一時ファイル削除: {restore_file}")
if success:
print("\n🎉 コアデータ復元完了")
print("supervisor画面でゼッケン番号候補が表示されるようになります")
if location2025_compatible:
print("✅ Location2025との整合性も確認済みです")
else:
print("⚠️ Location2025の設定が必要です")
else:
print("\n❌ 復元に失敗しました")
sys.exit(1)
except Exception as e:
print(f"復元処理エラー: {e}")
conn.rollback()
sys.exit(1)
finally:
conn.close()
if __name__ == "__main__":
main()