Files
rogaining_srv/migrate_rog_team_enhanced.py
2025-08-25 20:04:28 +09:00

408 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
rog_team テーブル専用移行スクリプト (構造変換対応)
old_rogdb の rog_team から rogdb の rog_team へ構造変換を行いながらデータ移行
"""
import os
import sys
import psycopg2
from datetime import datetime, timezone
import logging
# ログ設定
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# データベース設定
OLD_ROGDB_CONFIG = {
'host': os.getenv('OLD_ROGDB_HOST', 'postgres-db'),
'database': os.getenv('OLD_ROGDB_NAME', 'old_rogdb'),
'user': os.getenv('OLD_ROGDB_USER', 'admin'),
'password': os.getenv('OLD_ROGDB_PASSWORD', 'admin123456'),
'port': int(os.getenv('OLD_ROGDB_PORT', 5432))
}
ROGDB_CONFIG = {
'host': os.getenv('ROGDB_HOST', 'postgres-db'),
'database': os.getenv('ROGDB_NAME', 'rogdb'),
'user': os.getenv('ROGDB_USER', 'admin'),
'password': os.getenv('ROGDB_PASSWORD', 'admin123456'),
'port': int(os.getenv('ROGDB_PORT', 5432))
}
class RogTeamMigrator:
"""rog_team テーブル専用移行クラス"""
def __init__(self):
self.old_conn = None
self.new_conn = None
self.old_cursor = None
self.new_cursor = None
self.default_event_id = None
def connect_databases(self):
"""データベース接続"""
try:
logger.info("データベースに接続中...")
self.old_conn = psycopg2.connect(**OLD_ROGDB_CONFIG)
self.new_conn = psycopg2.connect(**ROGDB_CONFIG)
self.old_conn.autocommit = True
self.new_conn.autocommit = False
self.old_cursor = self.old_conn.cursor()
self.new_cursor = self.new_conn.cursor()
logger.info("✅ データベース接続成功")
return True
except Exception as e:
logger.error(f"❌ データベース接続エラー: {e}")
return False
def close_connections(self):
"""データベース接続クローズ"""
try:
if self.old_cursor:
self.old_cursor.close()
if self.new_cursor:
self.new_cursor.close()
if self.old_conn:
self.old_conn.close()
if self.new_conn:
self.new_conn.close()
logger.info("データベース接続をクローズしました")
except Exception as e:
logger.warning(f"接続クローズ時の警告: {e}")
def get_team_zekken_and_event(self, team_id):
"""team_idから最新のrog_entryのzekken_numberとevent_idを取得"""
try:
# 旧DBのrog_entryから該当team_idの最新レコードを取得
self.old_cursor.execute("""
SELECT zekken_number, event_id, date
FROM rog_entry
WHERE team_id = %s
AND zekken_number IS NOT NULL
AND event_id IS NOT NULL
ORDER BY date DESC, id DESC
LIMIT 1
""", (team_id,))
result = self.old_cursor.fetchone()
if result:
zekken_number, event_id, entry_date = result
logger.debug(f"team_id {team_id}: zekken_number={zekken_number}, event_id={event_id}, date={entry_date}")
return str(zekken_number), event_id
else:
# rog_entryにレコードがない場合はデフォルト値を返す
logger.warning(f"team_id {team_id}: rog_entryにレコードが見つかりません")
return '', self.default_event_id
except Exception as e:
logger.error(f"team_id {team_id} のzekken_number/event_id取得エラー: {e}")
return '', self.default_event_id
def get_default_event_id(self):
"""デフォルトのevent_idを取得または作成"""
try:
# 既存のイベントを探す
self.new_cursor.execute("""
SELECT id FROM rog_newevent2
WHERE event_name LIKE '%移行%' OR event_name LIKE '%default%'
ORDER BY id LIMIT 1
""")
result = self.new_cursor.fetchone()
if result:
event_id = result[0]
logger.info(f"既存のデフォルトイベントを使用: event_id = {event_id}")
return event_id
# なければ最初のイベントを使用
self.new_cursor.execute("""
SELECT id FROM rog_newevent2
ORDER BY id LIMIT 1
""")
result = self.new_cursor.fetchone()
if result:
event_id = result[0]
logger.info(f"最初のイベントをデフォルトとして使用: event_id = {event_id}")
return event_id
# イベントがない場合はエラー
logger.error("❌ rog_newevent2 テーブルにイベントが存在しません")
return None
except Exception as e:
logger.error(f"❌ デフォルトevent_id取得エラー: {e}")
return None
def get_category_mapping(self):
"""カテゴリIDのマッピングを確認"""
try:
# 旧DBのカテゴリ
self.old_cursor.execute("SELECT id, category_name FROM rog_newcategory ORDER BY id")
old_categories = dict(self.old_cursor.fetchall())
# 新DBのカテゴリ
self.new_cursor.execute("SELECT id, category_name FROM rog_newcategory ORDER BY id")
new_categories = dict(self.new_cursor.fetchall())
logger.info(f"旧DB カテゴリ: {old_categories}")
logger.info(f"新DB カテゴリ: {new_categories}")
# 名前ベースでマッピング作成
category_mapping = {}
for old_id, old_name in old_categories.items():
for new_id, new_name in new_categories.items():
if old_name == new_name:
category_mapping[old_id] = new_id
break
else:
# マッチしない場合は最初のカテゴリを使用
if new_categories:
category_mapping[old_id] = min(new_categories.keys())
logger.warning(f"カテゴリマッピング失敗 - デフォルト使用: {old_id} -> {category_mapping[old_id]}")
return category_mapping
except Exception as e:
logger.error(f"❌ カテゴリマッピング取得エラー: {e}")
return {}
def convert_team_record(self, old_record, category_mapping):
"""旧レコードを新レコード形式に変換"""
old_id, old_team_name, old_category_id, old_owner_id = old_record
# team_idから最新のzekken_numberとevent_idを取得
zekken_number, event_id = self.get_team_zekken_and_event(old_id)
# 新しいレコード作成
new_record = {
'id': old_id,
'team_name': old_team_name,
'category_id': category_mapping.get(old_category_id, old_category_id),
'owner_id': old_owner_id,
# 新しいフィールドにデフォルト値を設定
'class_name': '', # 空文字列
'event_id': event_id, # rog_entryから取得したevent_id
'location': None, # PostGIS座標は後で設定可能
'password': '', # パスワードなし
'trial': False, # 本番チーム
'zekken_number': zekken_number, # rog_entryから取得したzekken_number
'created_at': datetime.now(timezone.utc),
'updated_at': datetime.now(timezone.utc)
}
return new_record
def migrate_rog_team(self):
"""rog_team テーブルのデータ移行"""
logger.info("=" * 60)
logger.info("rog_team テーブル構造変換移行開始")
logger.info("=" * 60)
try:
# デフォルトevent_id取得
self.default_event_id = self.get_default_event_id()
if not self.default_event_id:
return False
# カテゴリマッピング取得
category_mapping = self.get_category_mapping()
# 旧データ取得
logger.info("旧rog_teamデータを取得中...")
self.old_cursor.execute("""
SELECT id, team_name, category_id, owner_id
FROM rog_team
ORDER BY id
""")
old_records = self.old_cursor.fetchall()
if not old_records:
logger.info("✅ 移行対象データがありません")
return True
logger.info(f"移行対象レコード数: {len(old_records)}")
# 統計情報
inserted_count = 0
updated_count = 0
error_count = 0
zekken_resolved_count = 0
constraint_avoided_count = 0
# レコード別処理
for i, old_record in enumerate(old_records):
try:
# レコード変換
new_record = self.convert_team_record(old_record, category_mapping)
team_id = new_record['id']
# zekken_number解決統計
if new_record['zekken_number']:
zekken_resolved_count += 1
# 既存レコード確認IDベース
self.new_cursor.execute(
"SELECT COUNT(*) FROM rog_team WHERE id = %s",
(team_id,)
)
exists_by_id = self.new_cursor.fetchone()[0] > 0
# 重複制約確認zekken_number + event_id の組み合わせ)
if new_record['zekken_number']: # zekken_numberが空でない場合のみチェック
self.new_cursor.execute(
"SELECT COUNT(*) FROM rog_team WHERE zekken_number = %s AND event_id = %s",
(new_record['zekken_number'], new_record['event_id'])
)
exists_by_constraint = self.new_cursor.fetchone()[0] > 0
if exists_by_constraint and not exists_by_id:
# 制約違反が発生する場合は、zekken_numberを空にしてデフォルトevent_idを使用
logger.warning(f"Team ID {team_id}: zekken_number制約回避のため空文字に変更")
new_record['zekken_number'] = ''
new_record['event_id'] = self.default_event_id
constraint_avoided_count += 1
if exists_by_id:
# UPDATE処理
update_query = """
UPDATE rog_team SET
team_name = %s,
category_id = %s,
owner_id = %s,
class_name = %s,
event_id = %s,
location = %s,
password = %s,
trial = %s,
zekken_number = %s,
updated_at = %s
WHERE id = %s
"""
self.new_cursor.execute(update_query, (
new_record['team_name'],
new_record['category_id'],
new_record['owner_id'],
new_record['class_name'],
new_record['event_id'],
new_record['location'],
new_record['password'],
new_record['trial'],
new_record['zekken_number'],
new_record['updated_at'],
team_id
))
updated_count += 1
else:
# INSERT処理
insert_query = """
INSERT INTO rog_team (
id, team_name, category_id, owner_id,
class_name, event_id, location, password,
trial, zekken_number, created_at, updated_at
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
)
"""
self.new_cursor.execute(insert_query, (
new_record['id'],
new_record['team_name'],
new_record['category_id'],
new_record['owner_id'],
new_record['class_name'],
new_record['event_id'],
new_record['location'],
new_record['password'],
new_record['trial'],
new_record['zekken_number'],
new_record['created_at'],
new_record['updated_at']
))
inserted_count += 1
# 進捗表示
if (i + 1) % 50 == 0:
self.new_conn.commit()
logger.info(f" 進捗: {i + 1}/{len(old_records)} 件処理完了")
except Exception as e:
error_count += 1
logger.error(f" レコード処理エラー (ID: {old_record[0]}): {e}")
# トランザクションロールバック
try:
self.new_conn.rollback()
except:
pass
if error_count > 10:
logger.error("❌ エラー数が上限を超えました")
break
# 最終コミット
self.new_conn.commit()
# 結果サマリー
logger.info("=" * 60)
logger.info("rog_team 移行完了")
logger.info("=" * 60)
logger.info(f"挿入: {inserted_count}")
logger.info(f"更新: {updated_count}")
logger.info(f"エラー: {error_count}")
logger.info(f"zekken_number解決: {zekken_resolved_count}")
logger.info(f"制約回避: {constraint_avoided_count}")
logger.info(f"総処理: {len(old_records)}")
if error_count == 0:
logger.info("✅ rog_team移行が正常に完了しました")
return True
else:
logger.warning(f"⚠️ {error_count}件のエラーがありました")
return False
except Exception as e:
logger.error(f"❌ rog_team移行エラー: {e}")
try:
self.new_conn.rollback()
except:
pass
return False
def run(self):
"""移行実行"""
try:
if not self.connect_databases():
return False
return self.migrate_rog_team()
finally:
self.close_connections()
def main():
"""メイン処理"""
logger.info("rog_team テーブル構造変換移行スクリプト")
migrator = RogTeamMigrator()
success = migrator.run()
if success:
logger.info("🎉 移行が正常に完了しました!")
else:
logger.error("💥 移行中にエラーが発生しました")
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()