#!/usr/bin/env python3 """ 既存データ保護版移行プログラム(Location2025対応) 既存のentry、team、memberデータを削除せずに移行データを追加する Location2025テーブルとの整合性を確認し、チェックポイント参照の妥当性を検証する """ import os import sys import psycopg2 from datetime import datetime, time, timedelta import pytz def get_event_date(event_code): """イベントコードに基づいてイベント日付を返す""" event_dates = { '美濃加茂': datetime(2024, 5, 19), # 修正済み '岐阜市': datetime(2024, 4, 28), '大垣2': datetime(2024, 4, 20), '各務原': datetime(2024, 3, 24), '下呂': datetime(2024, 3, 10), '中津川': datetime(2024, 3, 2), '揖斐川': datetime(2024, 2, 18), '高山': datetime(2024, 2, 11), '大垣': datetime(2024, 1, 27), '多治見': datetime(2024, 1, 20), # 2024年のその他のイベント '養老ロゲ': datetime(2024, 6, 1), '郡上': datetime(2024, 11, 3), # 郡上イベント追加 # 2025年新規イベント '岐阜ロゲイニング2025': datetime(2025, 9, 15), } return event_dates.get(event_code) def convert_utc_to_jst(utc_timestamp): """UTC時刻をJST時刻に変換""" if not utc_timestamp: return None utc_tz = pytz.UTC jst_tz = pytz.timezone('Asia/Tokyo') # UTCタイムゾーン情報を付加 if utc_timestamp.tzinfo is None: utc_timestamp = utc_tz.localize(utc_timestamp) # JSTに変換 return utc_timestamp.astimezone(jst_tz).replace(tzinfo=None) def parse_goal_time(goal_time_str, event_date_str): """goal_time文字列を適切なdatetimeに変換""" if not goal_time_str: return None try: # goal_timeが時刻のみの場合(例: "13:45:00") goal_time = datetime.strptime(goal_time_str, "%H:%M:%S").time() # event_date_strからイベント日付を解析 event_date = datetime.strptime(event_date_str, "%Y-%m-%d").date() # 日付と時刻を結合 goal_datetime = datetime.combine(event_date, goal_time) # JSTとして解釈 jst_tz = pytz.timezone('Asia/Tokyo') goal_datetime_jst = jst_tz.localize(goal_datetime) # UTCに変換して返す return goal_datetime_jst.astimezone(pytz.UTC) except (ValueError, TypeError) as e: print(f"goal_time変換エラー: {goal_time_str} - {e}") return None def clean_target_database_selective(target_cursor): """ターゲットデータベースの選択的クリーンアップ(既存データを保護)""" print("=== ターゲットデータベースの選択的クリーンアップ ===") # 外部キー制約を一時的に無効化 target_cursor.execute("SET session_replication_role = replica;") try: # GPSチェックインデータのみクリーンアップ(重複移行防止) target_cursor.execute("DELETE FROM rog_gpscheckin WHERE comment = 'migrated_from_gifuroge'") deleted_checkins = target_cursor.rowcount print(f"過去の移行GPSチェックインデータを削除: {deleted_checkins}件") # 注意: rog_entry, rog_team, rog_member, rog_location2025 は削除しない! print("注意: 既存のentry、team、member、location2025データは保護されます") finally: # 外部キー制約を再有効化 target_cursor.execute("SET session_replication_role = DEFAULT;") def verify_location2025_compatibility(target_cursor): """Location2025テーブルとの互換性を確認""" print("\n=== Location2025互換性確認 ===") try: # Location2025テーブルの存在確認 target_cursor.execute(""" SELECT COUNT(*) FROM information_schema.tables WHERE table_name = 'rog_location2025' """) table_exists = target_cursor.fetchone()[0] > 0 if table_exists: # Location2025のデータ数確認 target_cursor.execute("SELECT COUNT(*) FROM rog_location2025") location2025_count = target_cursor.fetchone()[0] print(f"✅ rog_location2025テーブル存在: {location2025_count}件のチェックポイント") # イベント別チェックポイント数確認(安全版) try: # まずevent_codeカラムの存在確認 target_cursor.execute(""" SELECT column_name FROM information_schema.columns WHERE table_name = 'rog_newevent2' AND column_name IN ('event_code', 'event_name') """) event_columns = [row[0] for row in target_cursor.fetchall()] if 'event_code' in event_columns: event_field = 'e.event_code' elif 'event_name' in event_columns: event_field = 'e.event_name' else: event_field = 'e.id' target_cursor.execute(f""" SELECT {event_field}, COUNT(l.id) as checkpoint_count FROM rog_location2025 l LEFT JOIN rog_newevent2 e ON l.event_id = e.id GROUP BY {event_field} ORDER BY checkpoint_count DESC LIMIT 10 """) results = target_cursor.fetchall() if results: print("イベント別チェックポイント数(上位10件):") for event_identifier, count in results: print(f" {event_identifier}: {count}件") except Exception as e: print(f"⚠️ イベント別集計でエラー: {e}") # エラーでも続行 ORDER BY checkpoint_count DESC LIMIT 10 """) event_checkpoints = target_cursor.fetchall() if event_checkpoints: print("イベント別チェックポイント数(上位10件):") for event_code, count in event_checkpoints: print(f" {event_code}: {count}件") return True else: print("⚠️ rog_location2025テーブルが見つかりません") print("注意: 移行は可能ですが、チェックポイント管理機能は制限されます") return False except Exception as e: print(f"❌ Location2025互換性確認エラー: {e}") # トランザクションエラーの場合はロールバック try: target_cursor.connection.rollback() except: pass return False def backup_existing_data(target_cursor): """既存データのバックアップ状況を確認""" print("\n=== 既存データ保護確認 ===") try: # 既存データ数を確認 target_cursor.execute("SELECT COUNT(*) FROM rog_entry") entry_count = target_cursor.fetchone()[0] target_cursor.execute("SELECT COUNT(*) FROM rog_team") team_count = target_cursor.fetchone()[0] target_cursor.execute("SELECT COUNT(*) FROM rog_member") member_count = target_cursor.fetchone()[0] target_cursor.execute("SELECT COUNT(*) FROM rog_gpscheckin") checkin_count = target_cursor.fetchone()[0] # Location2025データ数も確認 try: target_cursor.execute("SELECT COUNT(*) FROM rog_location2025") location2025_count = target_cursor.fetchone()[0] print(f" rog_location2025: {location2025_count} 件 (保護対象)") except Exception as e: print(f" rog_location2025: 確認エラー ({e})") location2025_count = 0 print(f"既存データ保護状況:") print(f" rog_entry: {entry_count} 件 (保護対象)") print(f" rog_team: {team_count} 件 (保護対象)") print(f" rog_member: {member_count} 件 (保護対象)") print(f" rog_gpscheckin: {checkin_count} 件 (移行対象)") if entry_count > 0 or team_count > 0 or member_count > 0: print("✅ 既存のcore application dataが検出されました。これらは保護されます。") return True else: print("⚠️ 既存のcore application dataが見つかりません。") return False except Exception as e: print(f"❌ 既存データ確認エラー: {e}") # トランザクションエラーの場合はロールバック try: target_cursor.connection.rollback() except: pass return False def migrate_gps_data(source_cursor, target_cursor): """GPS記録データのみを移行(写真記録データは除外)""" print("\n=== GPS記録データの移行 ===") # GPS記録のみを取得(不正な写真記録データを除外) source_cursor.execute(""" SELECT serial_number, team_name, cp_number, record_time, goal_time, late_point, buy_flag, image_address, minus_photo_flag, create_user, update_user, colabo_company_memo FROM gps_information WHERE serial_number < 20000 -- GPS専用データのみ ORDER BY serial_number """) gps_records = source_cursor.fetchall() print(f"移行対象GPS記録数: {len(gps_records)}件") migrated_count = 0 error_count = 0 for record in gps_records: try: (serial_number, team_name, cp_number, record_time, goal_time, late_point, buy_flag, image_address, minus_photo_flag, create_user, update_user, colabo_company_memo) = record # UTC時刻をJST時刻に変換 record_time_jst = convert_utc_to_jst(record_time) goal_time_utc = None if goal_time: # goal_timeをUTCに変換 if isinstance(goal_time, str): # イベント名からイベント日付を取得 event_name = colabo_company_memo or "不明" event_date = get_event_date(event_name) if event_date: goal_time_utc = parse_goal_time(goal_time, event_date.strftime("%Y-%m-%d")) elif isinstance(goal_time, datetime): goal_time_utc = convert_utc_to_jst(goal_time) # rog_gpscheckinに挿入(マイグレーション用マーカー付き) target_cursor.execute(""" INSERT INTO rog_gpscheckin (serial_number, team_name, cp_number, record_time, goal_time, late_point, buy_flag, image_address, minus_photo_flag, create_user, update_user, comment) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, ( serial_number, team_name, cp_number, record_time_jst, goal_time_utc, late_point, buy_flag, image_address, minus_photo_flag, create_user, update_user, 'migrated_from_gifuroge' )) migrated_count += 1 if migrated_count % 1000 == 0: print(f"移行進捗: {migrated_count}/{len(gps_records)}件") except Exception as e: error_count += 1 print(f"移行エラー (record {serial_number}): {e}") continue print(f"\n移行完了: {migrated_count}件成功, {error_count}件エラー") return migrated_count def main(): """メイン移行処理(既存データ保護版)""" print("=== 既存データ保護版移行プログラム開始 ===") print("注意: 既存のentry、team、memberデータは削除されません") # データベース接続設定 source_config = { 'host': 'postgres-db', 'port': 5432, 'database': 'gifuroge', 'user': 'admin', 'password': 'admin123456' } target_config = { 'host': 'postgres-db', 'port': 5432, 'database': 'rogdb', 'user': 'admin', 'password': 'admin123456' } source_conn = None target_conn = None try: # データベース接続 print("データベースに接続中...") source_conn = psycopg2.connect(**source_config) target_conn = psycopg2.connect(**target_config) source_cursor = source_conn.cursor() target_cursor = target_conn.cursor() # Location2025互換性確認 location2025_available = verify_location2025_compatibility(target_cursor) # 既存データ保護確認 has_existing_data = backup_existing_data(target_cursor) # 確認プロンプト print(f"\nLocation2025対応: {'✅ 利用可能' if location2025_available else '⚠️ 制限あり'}") print(f"既存データ保護: {'✅ 検出済み' if has_existing_data else '⚠️ 未検出'}") response = input("\n移行を開始しますか? (y/N): ") if response.lower() != 'y': print("移行を中止しました。") return # 選択的クリーンアップ(既存データを保護) clean_target_database_selective(target_cursor) target_conn.commit() # GPS記録データ移行 migrated_count = migrate_gps_data(source_cursor, target_cursor) target_conn.commit() print(f"\n=== 移行完了 ===") print(f"移行されたGPS記録: {migrated_count}件") print(f"Location2025互換性: {'✅ 対応済み' if location2025_available else '⚠️ 要確認'}") if has_existing_data: print("✅ 既存のentry、team、member、location2025データは保護されました") else: print("⚠️ 既存のcore application dataがありませんでした") print(" 別途testdb/rogdb.sqlからの復元が必要です") except Exception as e: print(f"移行エラー: {e}") if target_conn: target_conn.rollback() sys.exit(1) finally: if source_conn: source_conn.close() if target_conn: target_conn.close() if __name__ == "__main__": main()