Files
rogaining_srv/migration_with_photo_integration.py

482 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
統合移行スクリプトGPS情報移行 + 写真記録からチェックイン記録生成
- gifurogeからrogdbへのGPS情報移行
- 写真記録を正とした不足チェックイン記録の補完
- 統計情報の出力と動作確認
"""
import os
import sys
import psycopg2
from datetime import datetime, timedelta
import pytz
from typing import Optional, Dict, Any, List, Tuple
class MigrationWithPhotoIntegration:
def __init__(self):
self.conn_gif = None
self.conn_rog = None
self.cur_gif = None
self.cur_rog = None
def connect_databases(self):
"""データベースに接続"""
try:
self.conn_gif = psycopg2.connect(
host='postgres-db',
database='gifuroge',
user=os.environ.get('POSTGRES_USER'),
password=os.environ.get('POSTGRES_PASS')
)
self.conn_rog = psycopg2.connect(
host='postgres-db',
database='rogdb',
user=os.environ.get('POSTGRES_USER'),
password=os.environ.get('POSTGRES_PASS')
)
self.cur_gif = self.conn_gif.cursor()
self.cur_rog = self.conn_rog.cursor()
print("✅ データベース接続成功")
return True
except Exception as e:
print(f"❌ データベース接続エラー: {e}")
return False
def get_event_date(self, event_code: str) -> str:
"""イベントコードから適切な日付を取得"""
event_dates = {
'養老2': '2024-10-06',
'美濃加茂': '2024-05-19', # 修正済み
'下呂': '2023-05-20',
'FC岐阜': '2024-10-18',
'大垣': '2023-11-25',
'岐阜市': '2023-10-21',
'default': '2024-01-01'
}
return event_dates.get(event_code, event_dates['default'])
def convert_utc_to_jst(self, utc_timestamp: datetime) -> Optional[datetime]:
"""UTC時刻をJST時刻に変換"""
if not utc_timestamp:
return None
utc_tz = pytz.UTC
jst_tz = pytz.timezone('Asia/Tokyo')
if utc_timestamp.tzinfo is None:
utc_timestamp = utc_tz.localize(utc_timestamp)
return utc_timestamp.astimezone(jst_tz).replace(tzinfo=None)
def parse_goal_time(self, goal_time_str: str, event_code: str) -> Optional[datetime]:
"""goal_time文字列をパース時刻のみの場合は変換なし"""
if not goal_time_str:
return None
# 時刻のみHH:MM:SS形式の場合はJSTの時刻として扱い、UTCからの変換は行わない
if len(goal_time_str) <= 8 and goal_time_str.count(':') <= 2:
event_date = self.get_event_date(event_code)
event_datetime = datetime.strptime(event_date, '%Y-%m-%d')
time_part = datetime.strptime(goal_time_str, '%H:%M:%S').time()
return datetime.combine(event_datetime.date(), time_part)
else:
# 完全な日時形式の場合はUTCからJSTに変換
return self.convert_utc_to_jst(datetime.fromisoformat(goal_time_str.replace('Z', '+00:00')))
def migrate_gps_information(self) -> Dict[str, int]:
"""GPS情報の移行処理"""
print("\n=== GPS情報移行開始 ===")
# 既存の移行データ件数を確認
self.cur_rog.execute('SELECT COUNT(*) FROM rog_gpscheckin;')
existing_count = self.cur_rog.fetchone()[0]
print(f"既存チェックイン記録: {existing_count:,}")
# 移行対象データ取得
self.cur_gif.execute("""
SELECT
serial_number, zekken_number, event_code, create_at,
goal_time, cp_number, late_point
FROM gps_information
ORDER BY event_code, zekken_number, create_at;
""")
all_records = self.cur_gif.fetchall()
print(f"移行対象データ: {len(all_records):,}")
# 最大serial_numberを取得
self.cur_rog.execute("SELECT MAX(serial_number::integer) FROM rog_gpscheckin WHERE serial_number ~ '^[0-9]+$';")
max_serial_result = self.cur_rog.fetchone()
next_serial = (max_serial_result[0] if max_serial_result[0] else 0) + 1
migrated_count = 0
skipped_count = 0
errors = []
for i, record in enumerate(all_records):
serial_number, zekken_number, event_code, create_at, goal_time, cp_number, late_point = record
try:
# 重複チェックserial_numberベース
self.cur_rog.execute("""
SELECT COUNT(*) FROM rog_gpscheckin
WHERE serial_number = %s;
""", (str(serial_number),))
if self.cur_rog.fetchone()[0] > 0:
continue # 既に存在
# データ変換
converted_checkin_time = self.convert_utc_to_jst(create_at)
converted_record_time = self.convert_utc_to_jst(create_at)
# serial_number重複回避
if serial_number < 1000:
new_serial = 30000 + serial_number # 30000番台に移動
else:
new_serial = serial_number
# 挿入
self.cur_rog.execute("""
INSERT INTO rog_gpscheckin (
event_code, zekken, serial_number, cp_number,
checkin_time, record_time
) VALUES (%s, %s, %s, %s, %s, %s)
""", (
event_code,
zekken_number,
str(new_serial),
str(cp_number),
converted_checkin_time,
converted_record_time
))
migrated_count += 1
if migrated_count % 1000 == 0:
print(f" 進捗: {migrated_count:,}件移行完了")
except Exception as e:
errors.append(f'レコード {serial_number}: {str(e)[:100]}')
skipped_count += 1
self.conn_rog.commit()
print(f"GPS情報移行完了: 成功 {migrated_count:,}件, スキップ {skipped_count:,}")
if errors:
print(f"エラー件数: {len(errors)}")
return {
'migrated': migrated_count,
'skipped': skipped_count,
'errors': len(errors)
}
def generate_checkin_from_photos(self) -> Dict[str, int]:
"""写真記録からチェックイン記録を生成"""
print("\n=== 写真記録からチェックイン記録生成開始 ===")
# テストデータを除いた写真記録の未対応件数確認
self.cur_rog.execute("""
SELECT COUNT(*)
FROM rog_checkinimages ci
LEFT JOIN rog_gpscheckin gc ON ci.event_code = gc.event_code
AND ci.team_name = gc.zekken
AND ci.cp_number = gc.cp_number::integer
AND ABS(EXTRACT(EPOCH FROM (ci.checkintime - gc.checkin_time))) < 3600
WHERE ci.team_name NOT IN ('gero test 1', 'gero test 2')
AND gc.id IS NULL;
""")
unmatched_count = self.cur_rog.fetchone()[0]
print(f"未対応写真記録: {unmatched_count:,}")
if unmatched_count == 0:
print("✅ 全ての写真記録に対応するチェックイン記録が存在します")
return {'generated': 0, 'errors': 0}
# 最大serial_numberを取得
self.cur_rog.execute("SELECT MAX(serial_number::integer) FROM rog_gpscheckin WHERE serial_number ~ '^[0-9]+$';")
max_serial_result = self.cur_rog.fetchone()
next_serial = (max_serial_result[0] if max_serial_result[0] else 0) + 1
# 未対応写真記録を取得
self.cur_rog.execute("""
SELECT
ci.id, ci.event_code, ci.team_name, ci.cp_number, ci.checkintime
FROM rog_checkinimages ci
LEFT JOIN rog_gpscheckin gc ON ci.event_code = gc.event_code
AND ci.team_name = gc.zekken
AND ci.cp_number = gc.cp_number::integer
AND ABS(EXTRACT(EPOCH FROM (ci.checkintime - gc.checkin_time))) < 3600
WHERE ci.team_name NOT IN ('gero test 1', 'gero test 2')
AND gc.id IS NULL
ORDER BY ci.event_code, ci.checkintime;
""")
photo_records = self.cur_rog.fetchall()
print(f"生成対象写真記録: {len(photo_records):,}")
generated_count = 0
errors = []
batch_size = 500
for i in range(0, len(photo_records), batch_size):
batch = photo_records[i:i+batch_size]
for record in batch:
photo_id, event_code, team_name, cp_number, checkintime = record
try:
# チェックイン記録を挿入
self.cur_rog.execute("""
INSERT INTO rog_gpscheckin (
event_code, zekken, serial_number, cp_number,
checkin_time, record_time
) VALUES (%s, %s, %s, %s, %s, %s)
""", (
event_code,
team_name,
str(next_serial),
str(cp_number),
checkintime,
checkintime
))
generated_count += 1
next_serial += 1
except Exception as e:
errors.append(f'写真ID {photo_id}: {str(e)[:50]}')
# バッチごとにコミット
self.conn_rog.commit()
if i + batch_size >= 500:
progress_count = min(i + batch_size, len(photo_records))
print(f" 進捗: {progress_count:,}/{len(photo_records):,}件処理完了")
print(f"写真記録からチェックイン記録生成完了: 成功 {generated_count:,}")
if errors:
print(f"エラー件数: {len(errors)}")
return {
'generated': generated_count,
'errors': len(errors)
}
def generate_migration_statistics(self) -> Dict[str, Any]:
"""移行統計情報を生成"""
print("\n=== 移行統計情報生成 ===")
stats = {}
# 1. 基本統計
self.cur_gif.execute('SELECT COUNT(*) FROM gps_information;')
stats['original_gps_count'] = self.cur_gif.fetchone()[0]
self.cur_rog.execute('SELECT COUNT(*) FROM rog_gpscheckin;')
stats['final_checkin_count'] = self.cur_rog.fetchone()[0]
self.cur_rog.execute("SELECT COUNT(*) FROM rog_checkinimages WHERE team_name NOT IN ('gero test 1', 'gero test 2');")
stats['valid_photo_count'] = self.cur_rog.fetchone()[0]
# 2. イベント別統計
self.cur_rog.execute("""
SELECT
event_code,
COUNT(*) as checkin_count,
COUNT(DISTINCT zekken) as team_count
FROM rog_gpscheckin
GROUP BY event_code
ORDER BY checkin_count DESC;
""")
stats['event_stats'] = self.cur_rog.fetchall()
# 3. 写真記録とチェックイン記録の対応率
self.cur_rog.execute("""
SELECT
COUNT(ci.*) as total_photos,
COUNT(gc.*) as matched_checkins,
ROUND(COUNT(gc.*)::numeric / COUNT(ci.*)::numeric * 100, 1) as match_rate
FROM rog_checkinimages ci
LEFT JOIN rog_gpscheckin gc ON ci.event_code = gc.event_code
AND ci.team_name = gc.zekken
AND ci.cp_number = gc.cp_number::integer
AND ABS(EXTRACT(EPOCH FROM (ci.checkintime - gc.checkin_time))) < 3600
WHERE ci.team_name NOT IN ('gero test 1', 'gero test 2');
""")
photo_match_stats = self.cur_rog.fetchone()
stats['photo_match'] = {
'total_photos': photo_match_stats[0],
'matched_checkins': photo_match_stats[1],
'match_rate': photo_match_stats[2]
}
return stats
def print_statistics(self, stats: Dict[str, Any]):
"""統計情報を出力"""
print("\n" + "="*60)
print("📊 統合移行完了 - 最終統計レポート")
print("="*60)
print(f"\n📈 基本統計:")
print(f" 元データ(GPS情報): {stats['original_gps_count']:,}")
print(f" 最終チェックイン記録: {stats['final_checkin_count']:,}")
print(f" 有効写真記録: {stats['valid_photo_count']:,}")
success_rate = (stats['final_checkin_count'] / stats['original_gps_count']) * 100
print(f" GPS移行成功率: {success_rate:.1f}%")
print(f"\n📷 写真記録対応状況:")
pm = stats['photo_match']
print(f" 総写真記録: {pm['total_photos']:,}")
print(f" 対応チェックイン記録: {pm['matched_checkins']:,}")
print(f" 対応率: {pm['match_rate']:.1f}%")
print(f"\n🏆 イベント別統計 (上位10イベント):")
print(" イベント チェックイン数 チーム数")
print(" " + "-"*45)
for event, checkin_count, team_count in stats['event_stats'][:10]:
print(f" {event:<12} {checkin_count:>12} {team_count:>8}")
# 成功判定
if pm['match_rate'] >= 99.0:
print(f"\n🎉 移行完全成功!")
print(" 写真記録とチェックイン記録の整合性が確保されました。")
elif pm['match_rate'] >= 95.0:
print(f"\n✅ 移行成功!")
print(" 高い整合性で移行が完了しました。")
else:
print(f"\n⚠️ 移行完了 (要確認)")
print(" 一部の記録で整合性の確認が必要です。")
def run_complete_migration(self):
"""完全移行処理を実行"""
print("🚀 統合移行処理開始")
print("=" * 50)
if not self.connect_databases():
return False
try:
# 1. GPS情報移行
gps_results = self.migrate_gps_information()
# 2. 写真記録からチェックイン記録生成
photo_results = self.generate_checkin_from_photos()
# 3. 統計情報生成・出力
stats = self.generate_migration_statistics()
self.print_statistics(stats)
# 4. 動作確認用のテストクエリ実行
self.run_verification_tests()
return True
except Exception as e:
print(f"❌ 移行処理エラー: {e}")
return False
finally:
self.close_connections()
def run_verification_tests(self):
"""動作確認テストを実行"""
print(f"\n🔍 動作確認テスト実行")
print("-" * 30)
# テスト1: MF5-204のデータ確認
self.cur_rog.execute("""
SELECT checkin_time, cp_number
FROM rog_gpscheckin
WHERE zekken = 'MF5-204'
ORDER BY checkin_time
LIMIT 5;
""")
mf5_results = self.cur_rog.fetchall()
print("✅ MF5-204のチェックイン記録 (最初の5件):")
for time, cp in mf5_results:
print(f" {time} - CP{cp}")
# テスト2: 美濃加茂イベントの統計
self.cur_rog.execute("""
SELECT
COUNT(*) as total_records,
COUNT(DISTINCT zekken) as unique_teams,
MIN(checkin_time) as first_checkin,
MAX(checkin_time) as last_checkin
FROM rog_gpscheckin
WHERE event_code = '美濃加茂';
""")
minokamo_stats = self.cur_rog.fetchone()
print(f"\n✅ 美濃加茂イベント統計:")
print(f" 総チェックイン数: {minokamo_stats[0]:,}")
print(f" 参加チーム数: {minokamo_stats[1]}チーム")
print(f" 期間: {minokamo_stats[2]} {minokamo_stats[3]}")
# テスト3: 最新のチェックイン記録確認
self.cur_rog.execute("""
SELECT event_code, zekken, checkin_time, cp_number
FROM rog_gpscheckin
ORDER BY checkin_time DESC
LIMIT 3;
""")
latest_records = self.cur_rog.fetchall()
print(f"\n✅ 最新チェックイン記録 (最後の3件):")
for event, zekken, time, cp in latest_records:
print(f" {event} - {zekken} - {time} - CP{cp}")
print("\n🎯 動作確認完了: 全てのテストが正常に実行されました")
def close_connections(self):
"""データベース接続を閉じる"""
if self.cur_gif:
self.cur_gif.close()
if self.cur_rog:
self.cur_rog.close()
if self.conn_gif:
self.conn_gif.close()
if self.conn_rog:
self.conn_rog.close()
print("\n✅ データベース接続を閉じました")
def main():
"""メイン実行関数"""
migrator = MigrationWithPhotoIntegration()
try:
success = migrator.run_complete_migration()
if success:
print("\n" + "="*50)
print("🎉 統合移行処理が正常に完了しました!")
print("="*50)
sys.exit(0)
else:
print("\n" + "="*50)
print("❌ 移行処理中にエラーが発生しました")
print("="*50)
sys.exit(1)
except KeyboardInterrupt:
print("\n⚠️ ユーザーによって処理が中断されました")
sys.exit(1)
except Exception as e:
print(f"\n❌ 予期しないエラー: {e}")
sys.exit(1)
if __name__ == "__main__":
main()