#!/usr/bin/env python3 """ Location2025対応版移行プログラム 既存のentry、team、memberデータを削除せずに移行データを追加し、 Location2025テーブルとの整合性を確保する """ import os import sys import psycopg2 from datetime import datetime, time, timedelta import pytz def get_event_date(event_code): """イベントコードに基づいてイベント日付を返す""" event_dates = { '美濃加茂': datetime(2024, 5, 19), '岐阜市': datetime(2024, 4, 28), '大垣2': datetime(2024, 4, 20), '各務原': datetime(2024, 3, 24), '下呂': datetime(2024, 3, 10), '中津川': datetime(2024, 3, 2), '揖斐川': datetime(2024, 2, 18), '高山': datetime(2024, 2, 11), '大垣': datetime(2024, 1, 27), '多治見': datetime(2024, 1, 20), '養老ロゲ': datetime(2024, 6, 1), '郡上': datetime(2024, 11, 3), # 2025年新規イベント '岐阜ロゲイニング2025': datetime(2025, 9, 15), } return event_dates.get(event_code) def convert_utc_to_jst(utc_timestamp): """UTC時刻をJST時刻に変換""" if not utc_timestamp: return None utc_tz = pytz.UTC jst_tz = pytz.timezone('Asia/Tokyo') # UTCタイムゾーン情報を付加 if utc_timestamp.tzinfo is None: utc_timestamp = utc_tz.localize(utc_timestamp) # JSTに変換 return utc_timestamp.astimezone(jst_tz).replace(tzinfo=None) def parse_goal_time(goal_time_str, event_date_str): """goal_time文字列を適切なdatetimeに変換""" if not goal_time_str: return None try: # goal_timeが時刻のみの場合(例: "13:45:00") goal_time = datetime.strptime(goal_time_str, "%H:%M:%S").time() # event_date_strからイベント日付を解析 event_date = datetime.strptime(event_date_str, "%Y-%m-%d").date() # 日付と時刻を結合 goal_datetime = datetime.combine(event_date, goal_time) # JSTとして解釈 jst_tz = pytz.timezone('Asia/Tokyo') goal_datetime_jst = jst_tz.localize(goal_datetime) # UTCに変換して返す return goal_datetime_jst.astimezone(pytz.UTC) except (ValueError, TypeError) as e: print(f"goal_time変換エラー: {goal_time_str} - {e}") return None def verify_location2025_compatibility(target_cursor): """Location2025テーブルとの互換性を確認""" print("\n=== Location2025互換性確認 ===") try: # Location2025テーブルの存在確認 target_cursor.execute(""" SELECT COUNT(*) FROM information_schema.tables WHERE table_name = 'rog_location2025' """) table_exists = target_cursor.fetchone()[0] > 0 if table_exists: # Location2025のデータ数確認 target_cursor.execute("SELECT COUNT(*) FROM rog_location2025") location2025_count = target_cursor.fetchone()[0] print(f"✅ rog_location2025テーブル存在: {location2025_count}件のチェックポイント") # イベント別チェックポイント数確認 target_cursor.execute(""" SELECT e.event_code, COUNT(l.id) as checkpoint_count FROM rog_location2025 l JOIN rog_newevent2 e ON l.event_id = e.id GROUP BY e.event_code ORDER BY checkpoint_count DESC """) event_checkpoints = target_cursor.fetchall() print("イベント別チェックポイント数:") for event_code, count in event_checkpoints: print(f" {event_code}: {count}件") return True else: print("⚠️ rog_location2025テーブルが見つかりません") print("注意: 移行は可能ですが、チェックポイント管理機能は制限されます") return False except Exception as e: print(f"❌ Location2025互換性確認エラー: {e}") return False def validate_checkpoint_references(target_cursor, source_cursor): """チェックポイント参照の整合性検証""" print("\n=== チェックポイント参照整合性検証 ===") try: # ソースデータのチェックポイント番号を取得 source_cursor.execute(""" SELECT DISTINCT cp_number, colabo_company_memo as event_name FROM gps_information WHERE serial_number < 20000 AND cp_number IS NOT NULL ORDER BY cp_number """) source_checkpoints = source_cursor.fetchall() print(f"ソースデータのチェックポイント: {len(source_checkpoints)}種類") # Location2025のチェックポイント番号を取得 target_cursor.execute(""" SELECT DISTINCT l.cp_number, e.event_code FROM rog_location2025 l JOIN rog_newevent2 e ON l.event_id = e.id ORDER BY l.cp_number """) target_checkpoints = target_cursor.fetchall() print(f"Location2025のチェックポイント: {len(target_checkpoints)}種類") # 不一致のチェックポイントを特定 source_cp_set = set((cp, event) for cp, event in source_checkpoints if cp and event) target_cp_set = set((cp, event) for cp, event in target_checkpoints if cp and event) missing_in_target = source_cp_set - target_cp_set if missing_in_target: print("⚠️ Location2025で不足しているチェックポイント:") for cp, event in sorted(missing_in_target): print(f" CP{cp} ({event})") else: print("✅ すべてのチェックポイント参照が整合しています") return len(missing_in_target) == 0 except Exception as e: print(f"❌ チェックポイント参照検証エラー: {e}") return False def clean_target_database_selective(target_cursor): """ターゲットデータベースの選択的クリーンアップ(既存データを保護)""" print("=== ターゲットデータベースの選択的クリーンアップ ===") # 外部キー制約を一時的に無効化 target_cursor.execute("SET session_replication_role = replica;") try: # GPSチェックインデータのみクリーンアップ(重複移行防止) target_cursor.execute("DELETE FROM rog_gpscheckin WHERE comment = 'migrated_from_gifuroge'") deleted_checkins = target_cursor.rowcount print(f"過去の移行GPSチェックインデータを削除: {deleted_checkins}件") # 注意: rog_entry, rog_team, rog_member, rog_location2025 は削除しない! print("注意: 既存のentry、team、member、location2025データは保護されます") finally: # 外部キー制約を再有効化 target_cursor.execute("SET session_replication_role = DEFAULT;") def backup_existing_data(target_cursor): """既存データのバックアップ状況を確認""" print("\n=== 既存データ保護確認 ===") # 既存データ数を確認 target_cursor.execute("SELECT COUNT(*) FROM rog_entry") entry_count = target_cursor.fetchone()[0] target_cursor.execute("SELECT COUNT(*) FROM rog_team") team_count = target_cursor.fetchone()[0] target_cursor.execute("SELECT COUNT(*) FROM rog_member") member_count = target_cursor.fetchone()[0] # Location2025データ数も確認 try: target_cursor.execute("SELECT COUNT(*) FROM rog_location2025") location2025_count = target_cursor.fetchone()[0] print(f"✅ Location2025チェックポイント: {location2025_count}件") except Exception as e: print(f"⚠️ Location2025確認エラー: {e}") location2025_count = 0 if entry_count > 0 or team_count > 0 or member_count > 0: print("✅ 既存のcore application dataが検出されました。これらは保護されます。") print(f" - エントリー: {entry_count}件") print(f" - チーム: {team_count}件") print(f" - メンバー: {member_count}件") return True else: print("⚠️ 既存のcore application dataが見つかりません。") print("注意: restore_core_data.pyの実行を検討してください。") return False def migrate_gps_data_with_location2025_validation(source_cursor, target_cursor): """Location2025対応版GPSデータ移行""" print("\n=== Location2025対応版GPSデータ移行 ===") # GPS専用データ取得(serial_number < 20000) source_cursor.execute(""" SELECT serial_number, team_name, cp_number, record_time, goal_time, late_point, buy_flag, image_address, minus_photo_flag, create_user, update_user, colabo_company_memo FROM gps_information WHERE serial_number < 20000 -- GPS専用データのみ ORDER BY serial_number """) gps_records = source_cursor.fetchall() print(f"移行対象GPSデータ: {len(gps_records)}件") migrated_count = 0 error_count = 0 checkpoint_warnings = set() for record in gps_records: try: (serial_number, team_name, cp_number, record_time, goal_time, late_point, buy_flag, image_address, minus_photo_flag, create_user, update_user, colabo_company_memo) = record # Location2025でのチェックポイント存在確認(警告のみ) if cp_number and colabo_company_memo: target_cursor.execute(""" SELECT COUNT(*) FROM rog_location2025 l JOIN rog_newevent2 e ON l.event_id = e.id WHERE l.cp_number = %s AND e.event_code = %s """, (cp_number, colabo_company_memo)) checkpoint_exists = target_cursor.fetchone()[0] > 0 if not checkpoint_exists: warning_key = (cp_number, colabo_company_memo) if warning_key not in checkpoint_warnings: checkpoint_warnings.add(warning_key) print(f"⚠️ チェックポイント未定義: CP{cp_number} in {colabo_company_memo}") # UTC時刻をJST時刻に変換 record_time_jst = convert_utc_to_jst(record_time) goal_time_utc = None if goal_time: # goal_timeをUTCに変換 if isinstance(goal_time, str): # イベント名からイベント日付を取得 event_name = colabo_company_memo or "不明" event_date = get_event_date(event_name) if event_date: goal_time_utc = parse_goal_time(goal_time, event_date.strftime("%Y-%m-%d")) elif isinstance(goal_time, datetime): goal_time_utc = convert_utc_to_jst(goal_time) # rog_gpscheckinに挿入(Location2025対応マーカー付き) target_cursor.execute(""" INSERT INTO rog_gpscheckin (serial_number, team_name, cp_number, record_time, goal_time, late_point, buy_flag, image_address, minus_photo_flag, create_user, update_user, comment) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, ( serial_number, team_name, cp_number, record_time_jst, goal_time_utc, late_point, buy_flag, image_address, minus_photo_flag, create_user, update_user, 'migrated_from_gifuroge_location2025_compatible' )) migrated_count += 1 if migrated_count % 1000 == 0: print(f"移行進捗: {migrated_count}/{len(gps_records)}件") except Exception as e: error_count += 1 print(f"移行エラー (record {serial_number}): {e}") continue print(f"\n移行完了: {migrated_count}件成功, {error_count}件エラー") if checkpoint_warnings: print(f"チェックポイント警告: {len(checkpoint_warnings)}種類のチェックポイントがLocation2025で未定義") return migrated_count def generate_location2025_migration_report(target_cursor): """Location2025移行レポート生成""" print("\n=== Location2025移行レポート ===") try: # 移行されたGPSデータの統計 target_cursor.execute(""" SELECT COUNT(*) FROM rog_gpscheckin WHERE comment LIKE 'migrated_from_gifuroge%' """) migrated_gps_count = target_cursor.fetchone()[0] # イベント別移行データ統計 target_cursor.execute(""" SELECT COALESCE(update_user, 'unknown') as event_name, COUNT(*) as gps_count, COUNT(DISTINCT cp_number) as unique_checkpoints, MIN(record_time) as first_checkin, MAX(record_time) as last_checkin FROM rog_gpscheckin WHERE comment LIKE 'migrated_from_gifuroge%' GROUP BY update_user ORDER BY gps_count DESC """) event_stats = target_cursor.fetchall() print(f"📊 総移行GPS記録: {migrated_gps_count}件") print("📋 イベント別統計:") for event_name, gps_count, unique_cps, first_time, last_time in event_stats: print(f" {event_name}: {gps_count}件 (CP: {unique_cps}種類)") # Location2025との整合性確認 target_cursor.execute(""" SELECT COUNT(DISTINCT l.cp_number) FROM rog_location2025 l JOIN rog_newevent2 e ON l.event_id = e.id """) defined_checkpoints = target_cursor.fetchone()[0] print(f"🎯 Location2025定義済みチェックポイント: {defined_checkpoints}種類") except Exception as e: print(f"❌ レポート生成エラー: {e}") def main(): """メイン移行処理(Location2025対応版)""" print("=== Location2025対応版移行プログラム開始 ===") print("注意: 既存のentry、team、member、location2025データは削除されません") # データベース接続設定 source_config = { 'host': 'localhost', 'port': '5433', 'database': 'gifuroge', 'user': 'postgres', 'password': 'postgres' } target_config = { 'host': 'localhost', 'port': '5432', 'database': 'rogdb', 'user': 'postgres', 'password': 'postgres' } source_conn = None target_conn = None try: # データベース接続 print("データベースに接続中...") source_conn = psycopg2.connect(**source_config) target_conn = psycopg2.connect(**target_config) source_cursor = source_conn.cursor() target_cursor = target_conn.cursor() # Location2025互換性確認 location2025_available = verify_location2025_compatibility(target_cursor) # 既存データ保護確認 has_existing_data = backup_existing_data(target_cursor) # チェックポイント参照整合性検証(Location2025が利用可能な場合) if location2025_available: validate_checkpoint_references(target_cursor, source_cursor) # 確認プロンプト print(f"\nLocation2025対応: {'✅ 利用可能' if location2025_available else '⚠️ 制限あり'}") print(f"既存データ保護: {'✅ 検出済み' if has_existing_data else '⚠️ 未検出'}") response = input("\n移行を開始しますか? (y/N): ") if response.lower() != 'y': print("移行を中止しました。") return # ターゲットデータベースの選択的クリーンアップ clean_target_database_selective(target_cursor) target_conn.commit() # Location2025対応版GPSデータ移行 migrated_count = migrate_gps_data_with_location2025_validation(source_cursor, target_cursor) if migrated_count > 0: target_conn.commit() print("✅ 移行データをコミットしました") # 移行レポート生成 generate_location2025_migration_report(target_cursor) else: print("❌ 移行データがありません") except Exception as e: print(f"❌ 移行エラー: {e}") if target_conn: target_conn.rollback() finally: # 接続を閉じる if source_conn: source_conn.close() if target_conn: target_conn.close() print("=== Location2025対応版移行プログラム終了 ===") if __name__ == "__main__": main()