#!/usr/bin/env python """ 最終クリーン移行プログラム(シンプル版) - GPS記録のみ移行 - 写真記録由来のデータは除外 - トランザクション管理を簡素化 - エラーハンドリングを強化 """ import psycopg2 from datetime import datetime, timedelta import pytz import os from collections import defaultdict def get_event_date(event_code): """イベントコードに基づいてイベント日付を返す""" event_dates = { '美濃加茂': datetime(2024, 5, 19), '岐阜市': datetime(2024, 4, 28), '大垣2': datetime(2024, 4, 20), '各務原': datetime(2024, 3, 24), '下呂': datetime(2024, 3, 10), '中津川': datetime(2024, 3, 2), '揖斐川': datetime(2024, 2, 18), '高山': datetime(2024, 2, 11), '大垣': datetime(2024, 1, 27), '多治見': datetime(2024, 1, 20), '養老ロゲ': datetime(2024, 6, 1), '郡上': datetime(2024, 11, 3), } return event_dates.get(event_code) def parse_goal_time(goal_time_str, event_date): """goal_time文字列をパースしてdatetimeに変換""" if not goal_time_str: return None try: # HH:MM:SS形式の場合 if len(goal_time_str) <= 8: time_parts = goal_time_str.split(':') if len(time_parts) >= 2: hour = int(time_parts[0]) minute = int(time_parts[1]) second = int(time_parts[2]) if len(time_parts) > 2 else 0 # イベント日の時刻として設定 goal_datetime = event_date.replace(hour=hour, minute=minute, second=second) return goal_datetime else: # フルdatetime形式の場合 goal_datetime = datetime.strptime(goal_time_str, '%Y-%m-%d %H:%M:%S') return goal_datetime except Exception as e: print(f"goal_time解析エラー: {goal_time_str} - {e}") return None def convert_utc_to_jst(utc_datetime): """UTC時刻をJST時刻に変換""" try: if not utc_datetime: return None utc_tz = pytz.UTC jst_tz = pytz.timezone('Asia/Tokyo') if isinstance(utc_datetime, str): utc_datetime = datetime.strptime(utc_datetime, '%Y-%m-%d %H:%M:%S') if utc_datetime.tzinfo is None: utc_datetime = utc_tz.localize(utc_datetime) jst_datetime = utc_datetime.astimezone(jst_tz) return jst_datetime.replace(tzinfo=None) except Exception as e: print(f"時刻変換エラー: {utc_datetime} - {e}") return None def clean_target_database(target_cursor): """ターゲットデータベースのクリーンアップ""" print("ターゲットデータベースをクリーンアップ中...") try: # 外部キー制約を一時的に無効化 target_cursor.execute("SET session_replication_role = replica;") # テーブルをクリア target_cursor.execute("DELETE FROM rog_gpscheckin;") target_cursor.execute("DELETE FROM rog_member;") target_cursor.execute("DELETE FROM rog_entry;") target_cursor.execute("DELETE FROM rog_team;") target_cursor.execute("DELETE FROM rog_event;") # 外部キー制約を再有効化 target_cursor.execute("SET session_replication_role = DEFAULT;") print("ターゲットデータベースのクリーンアップ完了") return True except Exception as e: print(f"クリーンアップエラー: {e}") return False def create_events_and_teams(target_cursor, event_stats): """イベントとチームを作成""" print("イベントとチームを作成中...") created_events = set() created_teams = set() for event_code, teams in event_stats.items(): event_date = get_event_date(event_code) if not event_date: continue # イベント作成 if event_code not in created_events: try: target_cursor.execute(""" INSERT INTO rog_event (event_code, event_name, event_date, created_at, updated_at) VALUES (%s, %s, %s, %s, %s) """, (event_code, event_code, event_date.date(), datetime.now(), datetime.now())) created_events.add(event_code) print(f"イベント作成: {event_code}") except Exception as e: print(f"イベント作成エラー: {event_code} - {e}") # チーム作成 for team_zekken in teams: team_key = (event_code, team_zekken) if team_key not in created_teams: try: target_cursor.execute(""" INSERT INTO rog_team (zekken, event_code, created_at, updated_at) VALUES (%s, %s, %s, %s) """, (team_zekken, event_code, datetime.now(), datetime.now())) created_teams.add(team_key) except Exception as e: print(f"チーム作成エラー: {team_key} - {e}") print(f"作成完了: {len(created_events)}イベント, {len(created_teams)}チーム") def migrate_gps_data(source_cursor, target_cursor): """GPS記録のみを移行""" print("GPS記録の移行を開始...") # GPS記録のみ取得(serial_number < 20000) source_cursor.execute(""" SELECT serial_number, zekken_number, event_code, cp_number, create_at, goal_time FROM gps_information WHERE serial_number < 20000 ORDER BY serial_number """) gps_records = source_cursor.fetchall() print(f"GPS記録数: {len(gps_records)}件") success_count = 0 skip_count = 0 error_count = 0 event_stats = defaultdict(set) for record in gps_records: serial_number, zekken, event_code, cp_number, create_at, goal_time = record try: # イベント日付取得 event_date = get_event_date(event_code) if not event_date: print(f"未知のイベントコード: {event_code}") skip_count += 1 continue # 時刻変換 jst_create_at = convert_utc_to_jst(create_at) jst_goal_time = parse_goal_time(goal_time, event_date) if goal_time else None if not jst_create_at: print(f"時刻変換失敗: {serial_number}") error_count += 1 continue # チェックイン記録挿入 target_cursor.execute(""" INSERT INTO rog_gpscheckin ( zekken, event_code, cp_number, checkin_time, record_time, serial_number ) VALUES (%s, %s, %s, %s, %s, %s) """, (zekken, event_code, cp_number, jst_create_at, jst_create_at, str(serial_number))) event_stats[event_code].add(zekken) success_count += 1 if success_count % 100 == 0: print(f"移行進捗: {success_count}件完了") except Exception as e: print(f"移行エラー (Serial: {serial_number}): {e}") error_count += 1 print(f"GPS移行完了: 成功 {success_count}件, スキップ {skip_count}件, エラー {error_count}件") return event_stats, success_count, skip_count, error_count def generate_statistics(target_cursor, success_count): """統計情報を生成""" print("\n" + "="*60) print("📊 移行統計情報") print("="*60) if success_count == 0: print("移行されたデータがありません") return # イベント別統計 target_cursor.execute(""" SELECT event_code, COUNT(*) as record_count, COUNT(DISTINCT zekken) as team_count, MIN(checkin_time) as start_time, MAX(checkin_time) as end_time FROM rog_gpscheckin GROUP BY event_code ORDER BY record_count DESC """) stats = target_cursor.fetchall() print("\n📋 イベント別統計:") print("イベント名 記録数 チーム数 開始時刻 終了時刻") print("-" * 75) total_records = 0 total_teams = 0 for stat in stats: event_code, record_count, team_count, start_time, end_time = stat total_records += record_count total_teams += team_count start_str = start_time.strftime("%Y-%m-%d %H:%M") if start_time else "N/A" end_str = end_time.strftime("%Y-%m-%d %H:%M") if end_time else "N/A" print(f"{event_code:<12} {record_count:>6}件 {team_count:>4}チーム {start_str} {end_str}") print(f"\n✅ 合計: {total_records}件のチェックイン記録, {total_teams}チーム") # 0時台データチェック target_cursor.execute(""" SELECT COUNT(*) FROM rog_gpscheckin WHERE EXTRACT(hour FROM checkin_time) = 0 """) zero_hour_count = target_cursor.fetchone()[0] print(f"\n🔍 データ品質確認:") print(f"0時台データ: {zero_hour_count}件") if zero_hour_count == 0: print("✅ 0時台データは正常に除外されました") else: print("⚠️ 0時台データが残っています") def main(): """メイン処理""" print("最終クリーン移行プログラム(シンプル版)を開始...") try: # データベース接続 print("データベースに接続中...") source_conn = psycopg2.connect( host='postgres-db', database='gifuroge', user=os.environ.get('POSTGRES_USER'), password=os.environ.get('POSTGRES_PASS') ) source_conn.autocommit = True target_conn = psycopg2.connect( host='postgres-db', database='rogdb', user=os.environ.get('POSTGRES_USER'), password=os.environ.get('POSTGRES_PASS') ) target_conn.autocommit = True source_cursor = source_conn.cursor() target_cursor = target_conn.cursor() # 1. ターゲットデータベースのクリーンアップ if not clean_target_database(target_cursor): print("クリーンアップに失敗しました") return # 2. GPS記録の移行 event_stats, success_count, skip_count, error_count = migrate_gps_data(source_cursor, target_cursor) # 3. イベントとチームの作成 create_events_and_teams(target_cursor, event_stats) # 4. 統計情報の生成 generate_statistics(target_cursor, success_count) print("\n✅ 移行処理が完了しました") except Exception as e: print(f"❌ 移行処理中にエラーが発生しました: {e}") import traceback traceback.print_exc() finally: try: source_cursor.close() target_cursor.close() source_conn.close() target_conn.close() except: pass if __name__ == "__main__": main()