Files
rogaining_srv/migrate_rog_goalimages_enhanced.py

366 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
rog_goalimages テーブル専用移行スクリプト (team_name → zekken_number変換対応)
old_rogdb の rog_goalimages から rogdb の rog_goalimages へ
zekken_numberがブランクの場合、team_nameからrog_entryを検索してzekken_numberを取得
"""
import os
import sys
import psycopg2
from datetime import datetime, timezone
import logging
# ログ設定
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# データベース設定
OLD_ROGDB_CONFIG = {
'host': os.getenv('OLD_ROGDB_HOST', 'postgres-db'),
'database': os.getenv('OLD_ROGDB_NAME', 'old_rogdb'),
'user': os.getenv('OLD_ROGDB_USER', 'admin'),
'password': os.getenv('OLD_ROGDB_PASSWORD', 'admin123456'),
'port': int(os.getenv('OLD_ROGDB_PORT', 5432))
}
ROGDB_CONFIG = {
'host': os.getenv('ROGDB_HOST', 'postgres-db'),
'database': os.getenv('ROGDB_NAME', 'rogdb'),
'user': os.getenv('ROGDB_USER', 'admin'),
'password': os.getenv('ROGDB_PASSWORD', 'admin123456'),
'port': int(os.getenv('ROGDB_PORT', 5432))
}
class RogGoalImagesMigrator:
"""rog_goalimages テーブル専用移行クラス"""
def __init__(self):
self.old_conn = None
self.new_conn = None
self.old_cursor = None
self.new_cursor = None
self.team_zekken_cache = {} # team_name → zekken_number キャッシュ
def connect_databases(self):
"""データベース接続"""
try:
logger.info("データベースに接続中...")
self.old_conn = psycopg2.connect(**OLD_ROGDB_CONFIG)
self.new_conn = psycopg2.connect(**ROGDB_CONFIG)
self.old_conn.autocommit = True
self.new_conn.autocommit = False
self.old_cursor = self.old_conn.cursor()
self.new_cursor = self.new_conn.cursor()
logger.info("✅ データベース接続成功")
return True
except Exception as e:
logger.error(f"❌ データベース接続エラー: {e}")
return False
def close_connections(self):
"""データベース接続クローズ"""
try:
if self.old_cursor:
self.old_cursor.close()
if self.new_cursor:
self.new_cursor.close()
if self.old_conn:
self.old_conn.close()
if self.new_conn:
self.new_conn.close()
logger.info("データベース接続をクローズしました")
except Exception as e:
logger.warning(f"接続クローズ時の警告: {e}")
def build_team_zekken_cache(self):
"""team_name → zekken_number のキャッシュを構築"""
logger.info("team_name → zekken_number キャッシュを構築中...")
try:
# 新DBのrog_entryから team_name → zekken_number マッピングを取得
self.new_cursor.execute("""
SELECT DISTINCT t.team_name, e.zekken_number
FROM rog_entry e
JOIN rog_team t ON e.team_id = t.id
WHERE t.team_name IS NOT NULL
AND e.zekken_number IS NOT NULL
ORDER BY t.team_name, e.zekken_number
""")
team_zekken_pairs = self.new_cursor.fetchall()
for team_name, zekken_number in team_zekken_pairs:
if team_name not in self.team_zekken_cache:
self.team_zekken_cache[team_name] = zekken_number
logger.debug(f"キャッシュ追加: {team_name}{zekken_number}")
logger.info(f"✅ キャッシュ構築完了: {len(self.team_zekken_cache)}件のteam_name → zekken_numberマッピング")
# キャッシュ内容の一部をログ出力
if self.team_zekken_cache:
sample_items = list(self.team_zekken_cache.items())[:5]
logger.info(f"キャッシュサンプル: {sample_items}")
return True
except Exception as e:
logger.error(f"❌ キャッシュ構築エラー: {e}")
return False
def resolve_zekken_number(self, old_zekken_number, team_name):
"""zekken_numberを解決ブランクの場合はteam_nameから取得"""
# zekken_numberが既に設定されている場合はそのまま使用
if old_zekken_number and old_zekken_number.strip():
return old_zekken_number.strip()
# team_nameからzekken_numberを検索
if team_name and team_name.strip():
clean_team_name = team_name.strip()
if clean_team_name in self.team_zekken_cache:
resolved_zekken = self.team_zekken_cache[clean_team_name]
logger.debug(f"team_name '{clean_team_name}' → zekken_number '{resolved_zekken}'")
return str(resolved_zekken)
else:
logger.warning(f"team_name '{clean_team_name}' に対応するzekken_numberが見つかりません")
# 解決できない場合は空文字列を返す
logger.warning(f"zekken_number解決失敗: zekken='{old_zekken_number}', team='{team_name}'")
return ''
def get_old_goalimages_structure(self):
"""旧DBのrog_goalimagesテーブル構造を取得"""
try:
self.old_cursor.execute("""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = 'rog_goalimages'
AND table_schema = 'public'
ORDER BY ordinal_position
""")
columns = self.old_cursor.fetchall()
column_names = [col[0] for col in columns]
logger.info(f"旧DBのrog_goalimagesカラム: {column_names}")
return column_names
except Exception as e:
logger.error(f"❌ 旧DBテーブル構造取得エラー: {e}")
return []
def migrate_rog_goalimages(self):
"""rog_goalimages テーブルのデータ移行"""
logger.info("=" * 60)
logger.info("rog_goalimages テーブルデータ移行開始")
logger.info("=" * 60)
try:
# team_name → zekken_number キャッシュ構築
if not self.build_team_zekken_cache():
logger.error("❌ キャッシュ構築に失敗しました")
return False
# 旧DBテーブル構造確認
old_columns = self.get_old_goalimages_structure()
if not old_columns:
logger.error("❌ 旧DBのテーブル構造を取得できませんでした")
return False
# 旧データ取得
logger.info("旧rog_goalimagesデータを取得中...")
# カラム存在チェック
has_zekken_number = 'zekken_number' in old_columns
if has_zekken_number:
select_query = """
SELECT id, goalimage, goaltime, team_name, event_code,
cp_number, user_id, zekken_number
FROM rog_goalimages
ORDER BY id
"""
else:
select_query = """
SELECT id, goalimage, goaltime, team_name, event_code,
cp_number, user_id, NULL as zekken_number
FROM rog_goalimages
ORDER BY id
"""
self.old_cursor.execute(select_query)
old_records = self.old_cursor.fetchall()
if not old_records:
logger.info("✅ 移行対象データがありません")
return True
logger.info(f"移行対象レコード数: {len(old_records)}")
# 統計情報
inserted_count = 0
updated_count = 0
error_count = 0
zekken_resolved_count = 0
# レコード別処理
for i, old_record in enumerate(old_records):
try:
# レコードデータの展開
record_id, goalimage, goaltime, team_name, event_code, \
cp_number, user_id, old_zekken_number = old_record
# zekken_number解決
resolved_zekken_number = self.resolve_zekken_number(old_zekken_number, team_name)
if not old_zekken_number and resolved_zekken_number:
zekken_resolved_count += 1
# 新レコードデータ
new_record = {
'id': record_id,
'goalimage': goalimage,
'goaltime': goaltime,
'team_name': team_name or '',
'event_code': event_code or '',
'cp_number': cp_number or 0,
'user_id': user_id,
'zekken_number': resolved_zekken_number
}
# 既存レコード確認
self.new_cursor.execute(
"SELECT COUNT(*) FROM rog_goalimages WHERE id = %s",
(record_id,)
)
exists = self.new_cursor.fetchone()[0] > 0
if exists:
# UPDATE処理
update_query = """
UPDATE rog_goalimages SET
goalimage = %s,
goaltime = %s,
team_name = %s,
event_code = %s,
cp_number = %s,
user_id = %s,
zekken_number = %s
WHERE id = %s
"""
self.new_cursor.execute(update_query, (
new_record['goalimage'],
new_record['goaltime'],
new_record['team_name'],
new_record['event_code'],
new_record['cp_number'],
new_record['user_id'],
new_record['zekken_number'],
record_id
))
updated_count += 1
else:
# INSERT処理
insert_query = """
INSERT INTO rog_goalimages (
id, goalimage, goaltime, team_name, event_code,
cp_number, user_id, zekken_number
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s
)
"""
self.new_cursor.execute(insert_query, (
new_record['id'],
new_record['goalimage'],
new_record['goaltime'],
new_record['team_name'],
new_record['event_code'],
new_record['cp_number'],
new_record['user_id'],
new_record['zekken_number']
))
inserted_count += 1
# 進捗表示とコミット
if (i + 1) % 100 == 0:
self.new_conn.commit()
logger.info(f" 進捗: {i + 1}/{len(old_records)} 件処理完了")
except Exception as e:
error_count += 1
logger.error(f" レコード処理エラー (ID: {record_id}): {e}")
# トランザクションロールバック
try:
self.new_conn.rollback()
except:
pass
if error_count > 10:
logger.error("❌ エラー数が上限を超えました")
break
# 最終コミット
self.new_conn.commit()
# 結果サマリー
logger.info("=" * 60)
logger.info("rog_goalimages 移行完了")
logger.info("=" * 60)
logger.info(f"挿入: {inserted_count}")
logger.info(f"更新: {updated_count}")
logger.info(f"エラー: {error_count}")
logger.info(f"zekken_number解決: {zekken_resolved_count}")
logger.info(f"総処理: {len(old_records)}")
if error_count == 0:
logger.info("✅ rog_goalimages移行が正常に完了しました")
return True
else:
logger.warning(f"⚠️ {error_count}件のエラーがありました")
return False
except Exception as e:
logger.error(f"❌ rog_goalimages移行エラー: {e}")
try:
self.new_conn.rollback()
except:
pass
return False
def run(self):
"""移行実行"""
try:
if not self.connect_databases():
return False
return self.migrate_rog_goalimages()
finally:
self.close_connections()
def main():
"""メイン処理"""
logger.info("rog_goalimages テーブル移行スクリプト (team_name → zekken_number変換対応)")
migrator = RogGoalImagesMigrator()
success = migrator.run()
if success:
logger.info("🎉 移行が正常に完了しました!")
else:
logger.error("💥 移行中にエラーが発生しました")
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()