#!/usr/bin/env python3 """ GPS記録データマイグレーション (既存データ保護版) gifurogeからrogdbへ12,665件のGPSチェックイン記録を移行 既存のアプリケーションデータ(188エントリ、226チーム、388メンバー)は保護 """ import os import sys import psycopg2 from datetime import datetime, timezone from typing import Optional, Dict, List, Tuple import pytz # 設定 GIFUROGE_DB = { 'host': 'postgres-db', 'database': 'gifuroge', 'user': 'rogadmin', 'password': 'rogpass', 'port': 5432 } ROGDB_DB = { 'host': 'postgres-db', 'database': 'rogdb', 'user': 'rogadmin', 'password': 'rogpass', 'port': 5432 } def convert_utc_to_jst(utc_time): """UTC時刻をJST時刻に変換""" if utc_time is None: return None if isinstance(utc_time, str): utc_time = datetime.fromisoformat(utc_time.replace('Z', '+00:00')) if utc_time.tzinfo is None: utc_time = utc_time.replace(tzinfo=timezone.utc) jst = pytz.timezone('Asia/Tokyo') return utc_time.astimezone(jst) def get_event_date(event_name: str) -> Optional[datetime]: """イベント名から開催日を推定""" event_dates = { '岐阜県ロゲイニング大会': datetime(2024, 11, 23), '高山市ロゲイニング大会': datetime(2024, 11, 23), 'ロゲイニング大会2024': datetime(2024, 11, 23), 'default': datetime(2024, 11, 23) } for key, date in event_dates.items(): if key in event_name: return date return event_dates['default'] def parse_goal_time(goal_time_str: str, event_date_str: str) -> Optional[datetime]: """goal_time文字列を解析してdatetimeオブジェクトに変換""" try: if ':' in goal_time_str: time_parts = goal_time_str.split(':') hour = int(time_parts[0]) minute = int(time_parts[1]) event_date = datetime.strptime(event_date_str, "%Y-%m-%d") goal_datetime = event_date.replace(hour=hour, minute=minute) jst = pytz.timezone('Asia/Tokyo') goal_datetime_jst = jst.localize(goal_datetime) return goal_datetime_jst except Exception as e: print(f"goal_time解析エラー: {goal_time_str} - {e}") return None def check_database_connectivity(): """データベース接続確認""" print("=== データベース接続確認 ===") try: # gifuroge DB接続確認 source_conn = psycopg2.connect(**GIFUROGE_DB) source_cursor = source_conn.cursor() source_cursor.execute("SELECT COUNT(*) FROM gps_information") source_count = source_cursor.fetchone()[0] print(f"✅ gifuroge DB接続成功: gps_information {source_count}件") source_conn.close() # rogdb DB接続確認 target_conn = psycopg2.connect(**ROGDB_DB) target_cursor = target_conn.cursor() target_cursor.execute("SELECT COUNT(*) FROM rog_gpscheckin") target_count = target_cursor.fetchone()[0] print(f"✅ rogdb DB接続成功: rog_gpscheckin {target_count}件") target_conn.close() return True except Exception as e: print(f"❌ データベース接続エラー: {e}") return False def verify_location2025_compatibility(target_cursor): """rog_location2025テーブルとの互換性確認""" print("\n=== Location2025互換性確認 ===") try: # テーブル存在確認 target_cursor.execute(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'rog_location2025' ) """) table_exists = target_cursor.fetchone()[0] if not table_exists: print("⚠️ rog_location2025テーブルが存在しません") return True # テーブルが存在しない場合は互換性チェックをスキップ # カラム構造を動的に確認 target_cursor.execute(""" SELECT column_name FROM information_schema.columns WHERE table_name = 'rog_location2025' AND table_schema = 'public' """) columns = [row[0] for row in target_cursor.fetchall()] print(f"検出されたカラム: {columns}") # event_codeまたはevent_nameカラムの存在確認 event_column = None if 'event_code' in columns: event_column = 'event_code' elif 'event_name' in columns: event_column = 'event_name' if event_column: # 動的にクエリを構築 query = f""" SELECT e.{event_column}, COUNT(l.id) as location_count FROM rog_entry e LEFT JOIN rog_location2025 l ON e.id = l.entry_id GROUP BY e.{event_column} HAVING COUNT(l.id) > 0 """ target_cursor.execute(query) location_data = target_cursor.fetchall() print(f"既存のLocation2025データ:") for event_id, count in location_data: print(f" {event_column} {event_id}: {count}件") print("✅ Location2025互換性確認完了") return True else: print("⚠️ event_codeもevent_nameも見つかりません") return True except Exception as e: print(f"❌ Location2025互換性確認エラー: {e}") # トランザクションエラーの場合はロールバック try: target_cursor.connection.rollback() except: pass return False def backup_existing_data(target_cursor): """既存データのバックアップ状況を確認""" print("\n=== 既存データ保護確認 ===") try: # 既存データ数を確認 target_cursor.execute("SELECT COUNT(*) FROM rog_entry") entry_count = target_cursor.fetchone()[0] target_cursor.execute("SELECT COUNT(*) FROM rog_team") team_count = target_cursor.fetchone()[0] target_cursor.execute("SELECT COUNT(*) FROM rog_member") member_count = target_cursor.fetchone()[0] target_cursor.execute("SELECT COUNT(*) FROM rog_gpscheckin") checkin_count = target_cursor.fetchone()[0] # Location2025データ数も確認 try: target_cursor.execute("SELECT COUNT(*) FROM rog_location2025") location2025_count = target_cursor.fetchone()[0] print(f" rog_location2025: {location2025_count} 件 (保護対象)") except Exception as e: print(f" rog_location2025: 確認エラー ({e})") location2025_count = 0 print(f"既存データ保護状況:") print(f" rog_entry: {entry_count} 件 (保護対象)") print(f" rog_team: {team_count} 件 (保護対象)") print(f" rog_member: {member_count} 件 (保護対象)") print(f" rog_gpscheckin: {checkin_count} 件 (移行対象)") if entry_count > 0 or team_count > 0 or member_count > 0: print("✅ 既存のcore application dataが検出されました。これらは保護されます。") return True else: print("⚠️ 既存のcore application dataが見つかりません。") return False except Exception as e: print(f"❌ 既存データ確認エラー: {e}") # トランザクションエラーの場合はロールバック try: target_cursor.connection.rollback() except: pass return False def migrate_gps_data(source_cursor, target_cursor): """GPS記録データのみを移行(写真記録データは除外)""" print("\n=== GPS記録データの移行 ===") try: # GPS記録のみを取得(不正な写真記録データを除外) source_cursor.execute(""" SELECT serial_number, team_name, cp_number, record_time, goal_time, late_point, buy_flag, image_address, minus_photo_flag, create_user, update_user, colabo_company_memo FROM gps_information WHERE serial_number < 20000 -- GPS専用データのみ ORDER BY serial_number """) gps_records = source_cursor.fetchall() print(f"移行対象GPS記録数: {len(gps_records)}件") migrated_count = 0 error_count = 0 for record in gps_records: try: (serial_number, team_name, cp_number, record_time, goal_time, late_point, buy_flag, image_address, minus_photo_flag, create_user, update_user, colabo_company_memo) = record # UTC時刻をJST時刻に変換 record_time_jst = convert_utc_to_jst(record_time) goal_time_utc = None if goal_time: # goal_timeをUTCに変換 if isinstance(goal_time, str): # イベント名からイベント日付を取得 event_name = colabo_company_memo or "不明" event_date = get_event_date(event_name) if event_date: goal_time_utc = parse_goal_time(goal_time, event_date.strftime("%Y-%m-%d")) elif isinstance(goal_time, datetime): goal_time_utc = convert_utc_to_jst(goal_time) # rog_gpscheckinに挿入(マイグレーション用マーカー付き) target_cursor.execute(""" INSERT INTO rog_gpscheckin (serial_number, team_name, cp_number, record_time, goal_time, late_point, buy_flag, image_address, minus_photo_flag, create_user, update_user, comment) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, ( serial_number, team_name, cp_number, record_time_jst, goal_time_utc, late_point, buy_flag, image_address, minus_photo_flag, create_user, update_user, 'migrated_from_gifuroge' )) migrated_count += 1 if migrated_count % 1000 == 0: print(f" 移行進捗: {migrated_count}/{len(gps_records)} 件") target_cursor.connection.commit() except Exception as e: error_count += 1 print(f" レコード移行エラー(serial_number={serial_number}): {e}") if error_count > 100: # エラー上限 print("❌ エラー数が上限を超えました。移行を中止します。") raise target_cursor.connection.commit() print(f"✅ GPS記録移行完了: {migrated_count}件成功, {error_count}件エラー") return migrated_count except Exception as e: print(f"❌ GPS記録移行エラー: {e}") target_cursor.connection.rollback() raise def main(): """メイン移行処理(既存データ保護版)""" print("=" * 60) print("GPS記録データ移行スクリプト (既存データ保護版)") print("=" * 60) print("移行対象: gifuroge.gps_information → rogdb.rog_gpscheckin") print("既存データ保護: rog_entry, rog_team, rog_member, rog_location2025") print("=" * 60) try: # 1. データベース接続確認 if not check_database_connectivity(): return False # 2. 実際の移行処理 source_conn = psycopg2.connect(**GIFUROGE_DB) target_conn = psycopg2.connect(**ROGDB_DB) source_cursor = source_conn.cursor() target_cursor = target_conn.cursor() # 3. 既存データ確認 has_existing_data = backup_existing_data(target_cursor) # 4. Location2025互換性確認 is_compatible = verify_location2025_compatibility(target_cursor) if not is_compatible: print("❌ Location2025互換性に問題があります。") return False # 5. 安全確認 if has_existing_data: print("\n⚠️ 既存のアプリケーションデータが検出されました。") print("この移行操作は既存データを保護しながらGPS記録のみを移行します。") confirm = input("続行しますか? (yes/no): ") if confirm.lower() != 'yes': print("移行を中止しました。") return False # 6. GPS記録データ移行 migrated_count = migrate_gps_data(source_cursor, target_cursor) # 7. 完了確認 print("\n" + "=" * 60) print("移行完了サマリー") print("=" * 60) print(f"移行されたGPS記録: {migrated_count}件") print("保護された既存データ: rog_entry, rog_team, rog_member, rog_location2025") print("✅ データ移行が完了しました!") return True except Exception as e: print(f"\n❌ 移行処理エラー: {e}") return False finally: try: source_conn.close() target_conn.close() except: pass if __name__ == "__main__": success = main() sys.exit(0 if success else 1)