#!/usr/bin/env python3 """ 統合移行スクリプト:GPS情報移行 + 写真記録からチェックイン記録生成 - gifurogeからrogdbへのGPS情報移行 - 写真記録を正とした不足チェックイン記録の補完 - 統計情報の出力と動作確認 """ import os import sys import psycopg2 from datetime import datetime, timedelta import pytz from typing import Optional, Dict, Any, List, Tuple class MigrationWithPhotoIntegration: def __init__(self): self.conn_gif = None self.conn_rog = None self.cur_gif = None self.cur_rog = None def connect_databases(self): """データベースに接続""" try: self.conn_gif = psycopg2.connect( host='postgres-db', database='gifuroge', user=os.environ.get('POSTGRES_USER'), password=os.environ.get('POSTGRES_PASS') ) self.conn_rog = psycopg2.connect( host='postgres-db', database='rogdb', user=os.environ.get('POSTGRES_USER'), password=os.environ.get('POSTGRES_PASS') ) self.cur_gif = self.conn_gif.cursor() self.cur_rog = self.conn_rog.cursor() print("✅ データベース接続成功") return True except Exception as e: print(f"❌ データベース接続エラー: {e}") return False def get_event_date(self, event_code: str) -> str: """イベントコードから適切な日付を取得""" event_dates = { '養老2': '2024-10-06', '美濃加茂': '2024-05-19', # 修正済み '下呂': '2023-05-20', 'FC岐阜': '2024-10-18', '大垣': '2023-11-25', '岐阜市': '2023-10-21', 'default': '2024-01-01' } return event_dates.get(event_code, event_dates['default']) def convert_utc_to_jst(self, utc_timestamp: datetime) -> Optional[datetime]: """UTC時刻をJST時刻に変換""" if not utc_timestamp: return None utc_tz = pytz.UTC jst_tz = pytz.timezone('Asia/Tokyo') if utc_timestamp.tzinfo is None: utc_timestamp = utc_tz.localize(utc_timestamp) return utc_timestamp.astimezone(jst_tz).replace(tzinfo=None) def parse_goal_time(self, goal_time_str: str, event_code: str) -> Optional[datetime]: """goal_time文字列をパース(時刻のみの場合は変換なし)""" if not goal_time_str: return None # 時刻のみ(HH:MM:SS形式)の場合はJSTの時刻として扱い、UTCからの変換は行わない if len(goal_time_str) <= 8 and goal_time_str.count(':') <= 2: event_date = self.get_event_date(event_code) event_datetime = datetime.strptime(event_date, '%Y-%m-%d') time_part = datetime.strptime(goal_time_str, '%H:%M:%S').time() return datetime.combine(event_datetime.date(), time_part) else: # 完全な日時形式の場合はUTCからJSTに変換 return self.convert_utc_to_jst(datetime.fromisoformat(goal_time_str.replace('Z', '+00:00'))) def migrate_gps_information(self) -> Dict[str, int]: """GPS情報の移行処理""" print("\n=== GPS情報移行開始 ===") # 既存の移行データ件数を確認 self.cur_rog.execute('SELECT COUNT(*) FROM rog_gpscheckin;') existing_count = self.cur_rog.fetchone()[0] print(f"既存チェックイン記録: {existing_count:,}件") # 移行対象データ取得 self.cur_gif.execute(""" SELECT serial_number, zekken_number, event_code, create_at, goal_time, cp_number, late_point FROM gps_information ORDER BY event_code, zekken_number, create_at; """) all_records = self.cur_gif.fetchall() print(f"移行対象データ: {len(all_records):,}件") # 最大serial_numberを取得 self.cur_rog.execute("SELECT MAX(serial_number::integer) FROM rog_gpscheckin WHERE serial_number ~ '^[0-9]+$';") max_serial_result = self.cur_rog.fetchone() next_serial = (max_serial_result[0] if max_serial_result[0] else 0) + 1 migrated_count = 0 skipped_count = 0 errors = [] for i, record in enumerate(all_records): serial_number, zekken_number, event_code, create_at, goal_time, cp_number, late_point = record try: # 重複チェック(serial_numberベース) self.cur_rog.execute(""" SELECT COUNT(*) FROM rog_gpscheckin WHERE serial_number = %s; """, (str(serial_number),)) if self.cur_rog.fetchone()[0] > 0: continue # 既に存在 # データ変換 converted_checkin_time = self.convert_utc_to_jst(create_at) converted_record_time = self.convert_utc_to_jst(create_at) # serial_number重複回避 if serial_number < 1000: new_serial = 30000 + serial_number # 30000番台に移動 else: new_serial = serial_number # 挿入 self.cur_rog.execute(""" INSERT INTO rog_gpscheckin ( event_code, zekken, serial_number, cp_number, checkin_time, record_time ) VALUES (%s, %s, %s, %s, %s, %s) """, ( event_code, zekken_number, str(new_serial), str(cp_number), converted_checkin_time, converted_record_time )) migrated_count += 1 if migrated_count % 1000 == 0: print(f" 進捗: {migrated_count:,}件移行完了") except Exception as e: errors.append(f'レコード {serial_number}: {str(e)[:100]}') skipped_count += 1 self.conn_rog.commit() print(f"GPS情報移行完了: 成功 {migrated_count:,}件, スキップ {skipped_count:,}件") if errors: print(f"エラー件数: {len(errors)}件") return { 'migrated': migrated_count, 'skipped': skipped_count, 'errors': len(errors) } def generate_checkin_from_photos(self) -> Dict[str, int]: """写真記録からチェックイン記録を生成""" print("\n=== 写真記録からチェックイン記録生成開始 ===") # テストデータを除いた写真記録の未対応件数確認 self.cur_rog.execute(""" SELECT COUNT(*) FROM rog_checkinimages ci LEFT JOIN rog_gpscheckin gc ON ci.event_code = gc.event_code AND ci.team_name = gc.zekken AND ci.cp_number = gc.cp_number::integer AND ABS(EXTRACT(EPOCH FROM (ci.checkintime - gc.checkin_time))) < 3600 WHERE ci.team_name NOT IN ('gero test 1', 'gero test 2') AND gc.id IS NULL; """) unmatched_count = self.cur_rog.fetchone()[0] print(f"未対応写真記録: {unmatched_count:,}件") if unmatched_count == 0: print("✅ 全ての写真記録に対応するチェックイン記録が存在します") return {'generated': 0, 'errors': 0} # 最大serial_numberを取得 self.cur_rog.execute("SELECT MAX(serial_number::integer) FROM rog_gpscheckin WHERE serial_number ~ '^[0-9]+$';") max_serial_result = self.cur_rog.fetchone() next_serial = (max_serial_result[0] if max_serial_result[0] else 0) + 1 # 未対応写真記録を取得 self.cur_rog.execute(""" SELECT ci.id, ci.event_code, ci.team_name, ci.cp_number, ci.checkintime FROM rog_checkinimages ci LEFT JOIN rog_gpscheckin gc ON ci.event_code = gc.event_code AND ci.team_name = gc.zekken AND ci.cp_number = gc.cp_number::integer AND ABS(EXTRACT(EPOCH FROM (ci.checkintime - gc.checkin_time))) < 3600 WHERE ci.team_name NOT IN ('gero test 1', 'gero test 2') AND gc.id IS NULL ORDER BY ci.event_code, ci.checkintime; """) photo_records = self.cur_rog.fetchall() print(f"生成対象写真記録: {len(photo_records):,}件") generated_count = 0 errors = [] batch_size = 500 for i in range(0, len(photo_records), batch_size): batch = photo_records[i:i+batch_size] for record in batch: photo_id, event_code, team_name, cp_number, checkintime = record try: # チェックイン記録を挿入 self.cur_rog.execute(""" INSERT INTO rog_gpscheckin ( event_code, zekken, serial_number, cp_number, checkin_time, record_time ) VALUES (%s, %s, %s, %s, %s, %s) """, ( event_code, team_name, str(next_serial), str(cp_number), checkintime, checkintime )) generated_count += 1 next_serial += 1 except Exception as e: errors.append(f'写真ID {photo_id}: {str(e)[:50]}') # バッチごとにコミット self.conn_rog.commit() if i + batch_size >= 500: progress_count = min(i + batch_size, len(photo_records)) print(f" 進捗: {progress_count:,}/{len(photo_records):,}件処理完了") print(f"写真記録からチェックイン記録生成完了: 成功 {generated_count:,}件") if errors: print(f"エラー件数: {len(errors)}件") return { 'generated': generated_count, 'errors': len(errors) } def generate_migration_statistics(self) -> Dict[str, Any]: """移行統計情報を生成""" print("\n=== 移行統計情報生成 ===") stats = {} # 1. 基本統計 self.cur_gif.execute('SELECT COUNT(*) FROM gps_information;') stats['original_gps_count'] = self.cur_gif.fetchone()[0] self.cur_rog.execute('SELECT COUNT(*) FROM rog_gpscheckin;') stats['final_checkin_count'] = self.cur_rog.fetchone()[0] self.cur_rog.execute("SELECT COUNT(*) FROM rog_checkinimages WHERE team_name NOT IN ('gero test 1', 'gero test 2');") stats['valid_photo_count'] = self.cur_rog.fetchone()[0] # 2. イベント別統計 self.cur_rog.execute(""" SELECT event_code, COUNT(*) as checkin_count, COUNT(DISTINCT zekken) as team_count FROM rog_gpscheckin GROUP BY event_code ORDER BY checkin_count DESC; """) stats['event_stats'] = self.cur_rog.fetchall() # 3. 写真記録とチェックイン記録の対応率 self.cur_rog.execute(""" SELECT COUNT(ci.*) as total_photos, COUNT(gc.*) as matched_checkins, ROUND(COUNT(gc.*)::numeric / COUNT(ci.*)::numeric * 100, 1) as match_rate FROM rog_checkinimages ci LEFT JOIN rog_gpscheckin gc ON ci.event_code = gc.event_code AND ci.team_name = gc.zekken AND ci.cp_number = gc.cp_number::integer AND ABS(EXTRACT(EPOCH FROM (ci.checkintime - gc.checkin_time))) < 3600 WHERE ci.team_name NOT IN ('gero test 1', 'gero test 2'); """) photo_match_stats = self.cur_rog.fetchone() stats['photo_match'] = { 'total_photos': photo_match_stats[0], 'matched_checkins': photo_match_stats[1], 'match_rate': photo_match_stats[2] } return stats def print_statistics(self, stats: Dict[str, Any]): """統計情報を出力""" print("\n" + "="*60) print("📊 統合移行完了 - 最終統計レポート") print("="*60) print(f"\n📈 基本統計:") print(f" 元データ(GPS情報): {stats['original_gps_count']:,}件") print(f" 最終チェックイン記録: {stats['final_checkin_count']:,}件") print(f" 有効写真記録: {stats['valid_photo_count']:,}件") success_rate = (stats['final_checkin_count'] / stats['original_gps_count']) * 100 print(f" GPS移行成功率: {success_rate:.1f}%") print(f"\n📷 写真記録対応状況:") pm = stats['photo_match'] print(f" 総写真記録: {pm['total_photos']:,}件") print(f" 対応チェックイン記録: {pm['matched_checkins']:,}件") print(f" 対応率: {pm['match_rate']:.1f}%") print(f"\n🏆 イベント別統計 (上位10イベント):") print(" イベント チェックイン数 チーム数") print(" " + "-"*45) for event, checkin_count, team_count in stats['event_stats'][:10]: print(f" {event:<12} {checkin_count:>12} {team_count:>8}") # 成功判定 if pm['match_rate'] >= 99.0: print(f"\n🎉 移行完全成功!") print(" 写真記録とチェックイン記録の整合性が確保されました。") elif pm['match_rate'] >= 95.0: print(f"\n✅ 移行成功!") print(" 高い整合性で移行が完了しました。") else: print(f"\n⚠️ 移行完了 (要確認)") print(" 一部の記録で整合性の確認が必要です。") def run_complete_migration(self): """完全移行処理を実行""" print("🚀 統合移行処理開始") print("=" * 50) if not self.connect_databases(): return False try: # 1. GPS情報移行 gps_results = self.migrate_gps_information() # 2. 写真記録からチェックイン記録生成 photo_results = self.generate_checkin_from_photos() # 3. 統計情報生成・出力 stats = self.generate_migration_statistics() self.print_statistics(stats) # 4. 動作確認用のテストクエリ実行 self.run_verification_tests() return True except Exception as e: print(f"❌ 移行処理エラー: {e}") return False finally: self.close_connections() def run_verification_tests(self): """動作確認テストを実行""" print(f"\n🔍 動作確認テスト実行") print("-" * 30) # テスト1: MF5-204のデータ確認 self.cur_rog.execute(""" SELECT checkin_time, cp_number FROM rog_gpscheckin WHERE zekken = 'MF5-204' ORDER BY checkin_time LIMIT 5; """) mf5_results = self.cur_rog.fetchall() print("✅ MF5-204のチェックイン記録 (最初の5件):") for time, cp in mf5_results: print(f" {time} - CP{cp}") # テスト2: 美濃加茂イベントの統計 self.cur_rog.execute(""" SELECT COUNT(*) as total_records, COUNT(DISTINCT zekken) as unique_teams, MIN(checkin_time) as first_checkin, MAX(checkin_time) as last_checkin FROM rog_gpscheckin WHERE event_code = '美濃加茂'; """) minokamo_stats = self.cur_rog.fetchone() print(f"\n✅ 美濃加茂イベント統計:") print(f" 総チェックイン数: {minokamo_stats[0]:,}件") print(f" 参加チーム数: {minokamo_stats[1]}チーム") print(f" 期間: {minokamo_stats[2]} ~ {minokamo_stats[3]}") # テスト3: 最新のチェックイン記録確認 self.cur_rog.execute(""" SELECT event_code, zekken, checkin_time, cp_number FROM rog_gpscheckin ORDER BY checkin_time DESC LIMIT 3; """) latest_records = self.cur_rog.fetchall() print(f"\n✅ 最新チェックイン記録 (最後の3件):") for event, zekken, time, cp in latest_records: print(f" {event} - {zekken} - {time} - CP{cp}") print("\n🎯 動作確認完了: 全てのテストが正常に実行されました") def close_connections(self): """データベース接続を閉じる""" if self.cur_gif: self.cur_gif.close() if self.cur_rog: self.cur_rog.close() if self.conn_gif: self.conn_gif.close() if self.conn_rog: self.conn_rog.close() print("\n✅ データベース接続を閉じました") def main(): """メイン実行関数""" migrator = MigrationWithPhotoIntegration() try: success = migrator.run_complete_migration() if success: print("\n" + "="*50) print("🎉 統合移行処理が正常に完了しました!") print("="*50) sys.exit(0) else: print("\n" + "="*50) print("❌ 移行処理中にエラーが発生しました") print("="*50) sys.exit(1) except KeyboardInterrupt: print("\n⚠️ ユーザーによって処理が中断されました") sys.exit(1) except Exception as e: print(f"\n❌ 予期しないエラー: {e}") sys.exit(1) if __name__ == "__main__": main()