482 lines
19 KiB
Python
482 lines
19 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
統合移行スクリプト:GPS情報移行 + 写真記録からチェックイン記録生成
|
||
- gifurogeからrogdbへのGPS情報移行
|
||
- 写真記録を正とした不足チェックイン記録の補完
|
||
- 統計情報の出力と動作確認
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import psycopg2
|
||
from datetime import datetime, timedelta
|
||
import pytz
|
||
from typing import Optional, Dict, Any, List, Tuple
|
||
|
||
class MigrationWithPhotoIntegration:
|
||
def __init__(self):
|
||
self.conn_gif = None
|
||
self.conn_rog = None
|
||
self.cur_gif = None
|
||
self.cur_rog = None
|
||
|
||
def connect_databases(self):
|
||
"""データベースに接続"""
|
||
try:
|
||
self.conn_gif = psycopg2.connect(
|
||
host='postgres-db',
|
||
database='gifuroge',
|
||
user=os.environ.get('POSTGRES_USER'),
|
||
password=os.environ.get('POSTGRES_PASS')
|
||
)
|
||
self.conn_rog = psycopg2.connect(
|
||
host='postgres-db',
|
||
database='rogdb',
|
||
user=os.environ.get('POSTGRES_USER'),
|
||
password=os.environ.get('POSTGRES_PASS')
|
||
)
|
||
|
||
self.cur_gif = self.conn_gif.cursor()
|
||
self.cur_rog = self.conn_rog.cursor()
|
||
|
||
print("✅ データベース接続成功")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ データベース接続エラー: {e}")
|
||
return False
|
||
|
||
def get_event_date(self, event_code: str) -> str:
|
||
"""イベントコードから適切な日付を取得"""
|
||
event_dates = {
|
||
'養老2': '2024-10-06',
|
||
'美濃加茂': '2024-05-19', # 修正済み
|
||
'下呂': '2023-05-20',
|
||
'FC岐阜': '2024-10-18',
|
||
'大垣': '2023-11-25',
|
||
'岐阜市': '2023-10-21',
|
||
'default': '2024-01-01'
|
||
}
|
||
return event_dates.get(event_code, event_dates['default'])
|
||
|
||
def convert_utc_to_jst(self, utc_timestamp: datetime) -> Optional[datetime]:
|
||
"""UTC時刻をJST時刻に変換"""
|
||
if not utc_timestamp:
|
||
return None
|
||
|
||
utc_tz = pytz.UTC
|
||
jst_tz = pytz.timezone('Asia/Tokyo')
|
||
|
||
if utc_timestamp.tzinfo is None:
|
||
utc_timestamp = utc_tz.localize(utc_timestamp)
|
||
|
||
return utc_timestamp.astimezone(jst_tz).replace(tzinfo=None)
|
||
|
||
def parse_goal_time(self, goal_time_str: str, event_code: str) -> Optional[datetime]:
|
||
"""goal_time文字列をパース(時刻のみの場合は変換なし)"""
|
||
if not goal_time_str:
|
||
return None
|
||
|
||
# 時刻のみ(HH:MM:SS形式)の場合はJSTの時刻として扱い、UTCからの変換は行わない
|
||
if len(goal_time_str) <= 8 and goal_time_str.count(':') <= 2:
|
||
event_date = self.get_event_date(event_code)
|
||
event_datetime = datetime.strptime(event_date, '%Y-%m-%d')
|
||
time_part = datetime.strptime(goal_time_str, '%H:%M:%S').time()
|
||
return datetime.combine(event_datetime.date(), time_part)
|
||
else:
|
||
# 完全な日時形式の場合はUTCからJSTに変換
|
||
return self.convert_utc_to_jst(datetime.fromisoformat(goal_time_str.replace('Z', '+00:00')))
|
||
|
||
def migrate_gps_information(self) -> Dict[str, int]:
|
||
"""GPS情報の移行処理"""
|
||
print("\n=== GPS情報移行開始 ===")
|
||
|
||
# 既存の移行データ件数を確認
|
||
self.cur_rog.execute('SELECT COUNT(*) FROM rog_gpscheckin;')
|
||
existing_count = self.cur_rog.fetchone()[0]
|
||
print(f"既存チェックイン記録: {existing_count:,}件")
|
||
|
||
# 移行対象データ取得
|
||
self.cur_gif.execute("""
|
||
SELECT
|
||
serial_number, zekken_number, event_code, create_at,
|
||
goal_time, cp_number, late_point
|
||
FROM gps_information
|
||
ORDER BY event_code, zekken_number, create_at;
|
||
""")
|
||
|
||
all_records = self.cur_gif.fetchall()
|
||
print(f"移行対象データ: {len(all_records):,}件")
|
||
|
||
# 最大serial_numberを取得
|
||
self.cur_rog.execute("SELECT MAX(serial_number::integer) FROM rog_gpscheckin WHERE serial_number ~ '^[0-9]+$';")
|
||
max_serial_result = self.cur_rog.fetchone()
|
||
next_serial = (max_serial_result[0] if max_serial_result[0] else 0) + 1
|
||
|
||
migrated_count = 0
|
||
skipped_count = 0
|
||
errors = []
|
||
|
||
for i, record in enumerate(all_records):
|
||
serial_number, zekken_number, event_code, create_at, goal_time, cp_number, late_point = record
|
||
|
||
try:
|
||
# 重複チェック(serial_numberベース)
|
||
self.cur_rog.execute("""
|
||
SELECT COUNT(*) FROM rog_gpscheckin
|
||
WHERE serial_number = %s;
|
||
""", (str(serial_number),))
|
||
|
||
if self.cur_rog.fetchone()[0] > 0:
|
||
continue # 既に存在
|
||
|
||
# データ変換
|
||
converted_checkin_time = self.convert_utc_to_jst(create_at)
|
||
converted_record_time = self.convert_utc_to_jst(create_at)
|
||
|
||
# serial_number重複回避
|
||
if serial_number < 1000:
|
||
new_serial = 30000 + serial_number # 30000番台に移動
|
||
else:
|
||
new_serial = serial_number
|
||
|
||
# 挿入
|
||
self.cur_rog.execute("""
|
||
INSERT INTO rog_gpscheckin (
|
||
event_code, zekken, serial_number, cp_number,
|
||
checkin_time, record_time
|
||
) VALUES (%s, %s, %s, %s, %s, %s)
|
||
""", (
|
||
event_code,
|
||
zekken_number,
|
||
str(new_serial),
|
||
str(cp_number),
|
||
converted_checkin_time,
|
||
converted_record_time
|
||
))
|
||
|
||
migrated_count += 1
|
||
|
||
if migrated_count % 1000 == 0:
|
||
print(f" 進捗: {migrated_count:,}件移行完了")
|
||
|
||
except Exception as e:
|
||
errors.append(f'レコード {serial_number}: {str(e)[:100]}')
|
||
skipped_count += 1
|
||
|
||
self.conn_rog.commit()
|
||
|
||
print(f"GPS情報移行完了: 成功 {migrated_count:,}件, スキップ {skipped_count:,}件")
|
||
if errors:
|
||
print(f"エラー件数: {len(errors)}件")
|
||
|
||
return {
|
||
'migrated': migrated_count,
|
||
'skipped': skipped_count,
|
||
'errors': len(errors)
|
||
}
|
||
|
||
def generate_checkin_from_photos(self) -> Dict[str, int]:
|
||
"""写真記録からチェックイン記録を生成"""
|
||
print("\n=== 写真記録からチェックイン記録生成開始 ===")
|
||
|
||
# テストデータを除いた写真記録の未対応件数確認
|
||
self.cur_rog.execute("""
|
||
SELECT COUNT(*)
|
||
FROM rog_checkinimages ci
|
||
LEFT JOIN rog_gpscheckin gc ON ci.event_code = gc.event_code
|
||
AND ci.team_name = gc.zekken
|
||
AND ci.cp_number = gc.cp_number::integer
|
||
AND ABS(EXTRACT(EPOCH FROM (ci.checkintime - gc.checkin_time))) < 3600
|
||
WHERE ci.team_name NOT IN ('gero test 1', 'gero test 2')
|
||
AND gc.id IS NULL;
|
||
""")
|
||
|
||
unmatched_count = self.cur_rog.fetchone()[0]
|
||
print(f"未対応写真記録: {unmatched_count:,}件")
|
||
|
||
if unmatched_count == 0:
|
||
print("✅ 全ての写真記録に対応するチェックイン記録が存在します")
|
||
return {'generated': 0, 'errors': 0}
|
||
|
||
# 最大serial_numberを取得
|
||
self.cur_rog.execute("SELECT MAX(serial_number::integer) FROM rog_gpscheckin WHERE serial_number ~ '^[0-9]+$';")
|
||
max_serial_result = self.cur_rog.fetchone()
|
||
next_serial = (max_serial_result[0] if max_serial_result[0] else 0) + 1
|
||
|
||
# 未対応写真記録を取得
|
||
self.cur_rog.execute("""
|
||
SELECT
|
||
ci.id, ci.event_code, ci.team_name, ci.cp_number, ci.checkintime
|
||
FROM rog_checkinimages ci
|
||
LEFT JOIN rog_gpscheckin gc ON ci.event_code = gc.event_code
|
||
AND ci.team_name = gc.zekken
|
||
AND ci.cp_number = gc.cp_number::integer
|
||
AND ABS(EXTRACT(EPOCH FROM (ci.checkintime - gc.checkin_time))) < 3600
|
||
WHERE ci.team_name NOT IN ('gero test 1', 'gero test 2')
|
||
AND gc.id IS NULL
|
||
ORDER BY ci.event_code, ci.checkintime;
|
||
""")
|
||
|
||
photo_records = self.cur_rog.fetchall()
|
||
print(f"生成対象写真記録: {len(photo_records):,}件")
|
||
|
||
generated_count = 0
|
||
errors = []
|
||
|
||
batch_size = 500
|
||
for i in range(0, len(photo_records), batch_size):
|
||
batch = photo_records[i:i+batch_size]
|
||
|
||
for record in batch:
|
||
photo_id, event_code, team_name, cp_number, checkintime = record
|
||
|
||
try:
|
||
# チェックイン記録を挿入
|
||
self.cur_rog.execute("""
|
||
INSERT INTO rog_gpscheckin (
|
||
event_code, zekken, serial_number, cp_number,
|
||
checkin_time, record_time
|
||
) VALUES (%s, %s, %s, %s, %s, %s)
|
||
""", (
|
||
event_code,
|
||
team_name,
|
||
str(next_serial),
|
||
str(cp_number),
|
||
checkintime,
|
||
checkintime
|
||
))
|
||
|
||
generated_count += 1
|
||
next_serial += 1
|
||
|
||
except Exception as e:
|
||
errors.append(f'写真ID {photo_id}: {str(e)[:50]}')
|
||
|
||
# バッチごとにコミット
|
||
self.conn_rog.commit()
|
||
|
||
if i + batch_size >= 500:
|
||
progress_count = min(i + batch_size, len(photo_records))
|
||
print(f" 進捗: {progress_count:,}/{len(photo_records):,}件処理完了")
|
||
|
||
print(f"写真記録からチェックイン記録生成完了: 成功 {generated_count:,}件")
|
||
if errors:
|
||
print(f"エラー件数: {len(errors)}件")
|
||
|
||
return {
|
||
'generated': generated_count,
|
||
'errors': len(errors)
|
||
}
|
||
|
||
def generate_migration_statistics(self) -> Dict[str, Any]:
|
||
"""移行統計情報を生成"""
|
||
print("\n=== 移行統計情報生成 ===")
|
||
|
||
stats = {}
|
||
|
||
# 1. 基本統計
|
||
self.cur_gif.execute('SELECT COUNT(*) FROM gps_information;')
|
||
stats['original_gps_count'] = self.cur_gif.fetchone()[0]
|
||
|
||
self.cur_rog.execute('SELECT COUNT(*) FROM rog_gpscheckin;')
|
||
stats['final_checkin_count'] = self.cur_rog.fetchone()[0]
|
||
|
||
self.cur_rog.execute("SELECT COUNT(*) FROM rog_checkinimages WHERE team_name NOT IN ('gero test 1', 'gero test 2');")
|
||
stats['valid_photo_count'] = self.cur_rog.fetchone()[0]
|
||
|
||
# 2. イベント別統計
|
||
self.cur_rog.execute("""
|
||
SELECT
|
||
event_code,
|
||
COUNT(*) as checkin_count,
|
||
COUNT(DISTINCT zekken) as team_count
|
||
FROM rog_gpscheckin
|
||
GROUP BY event_code
|
||
ORDER BY checkin_count DESC;
|
||
""")
|
||
stats['event_stats'] = self.cur_rog.fetchall()
|
||
|
||
# 3. 写真記録とチェックイン記録の対応率
|
||
self.cur_rog.execute("""
|
||
SELECT
|
||
COUNT(ci.*) as total_photos,
|
||
COUNT(gc.*) as matched_checkins,
|
||
ROUND(COUNT(gc.*)::numeric / COUNT(ci.*)::numeric * 100, 1) as match_rate
|
||
FROM rog_checkinimages ci
|
||
LEFT JOIN rog_gpscheckin gc ON ci.event_code = gc.event_code
|
||
AND ci.team_name = gc.zekken
|
||
AND ci.cp_number = gc.cp_number::integer
|
||
AND ABS(EXTRACT(EPOCH FROM (ci.checkintime - gc.checkin_time))) < 3600
|
||
WHERE ci.team_name NOT IN ('gero test 1', 'gero test 2');
|
||
""")
|
||
|
||
photo_match_stats = self.cur_rog.fetchone()
|
||
stats['photo_match'] = {
|
||
'total_photos': photo_match_stats[0],
|
||
'matched_checkins': photo_match_stats[1],
|
||
'match_rate': photo_match_stats[2]
|
||
}
|
||
|
||
return stats
|
||
|
||
def print_statistics(self, stats: Dict[str, Any]):
|
||
"""統計情報を出力"""
|
||
print("\n" + "="*60)
|
||
print("📊 統合移行完了 - 最終統計レポート")
|
||
print("="*60)
|
||
|
||
print(f"\n📈 基本統計:")
|
||
print(f" 元データ(GPS情報): {stats['original_gps_count']:,}件")
|
||
print(f" 最終チェックイン記録: {stats['final_checkin_count']:,}件")
|
||
print(f" 有効写真記録: {stats['valid_photo_count']:,}件")
|
||
|
||
success_rate = (stats['final_checkin_count'] / stats['original_gps_count']) * 100
|
||
print(f" GPS移行成功率: {success_rate:.1f}%")
|
||
|
||
print(f"\n📷 写真記録対応状況:")
|
||
pm = stats['photo_match']
|
||
print(f" 総写真記録: {pm['total_photos']:,}件")
|
||
print(f" 対応チェックイン記録: {pm['matched_checkins']:,}件")
|
||
print(f" 対応率: {pm['match_rate']:.1f}%")
|
||
|
||
print(f"\n🏆 イベント別統計 (上位10イベント):")
|
||
print(" イベント チェックイン数 チーム数")
|
||
print(" " + "-"*45)
|
||
for event, checkin_count, team_count in stats['event_stats'][:10]:
|
||
print(f" {event:<12} {checkin_count:>12} {team_count:>8}")
|
||
|
||
# 成功判定
|
||
if pm['match_rate'] >= 99.0:
|
||
print(f"\n🎉 移行完全成功!")
|
||
print(" 写真記録とチェックイン記録の整合性が確保されました。")
|
||
elif pm['match_rate'] >= 95.0:
|
||
print(f"\n✅ 移行成功!")
|
||
print(" 高い整合性で移行が完了しました。")
|
||
else:
|
||
print(f"\n⚠️ 移行完了 (要確認)")
|
||
print(" 一部の記録で整合性の確認が必要です。")
|
||
|
||
def run_complete_migration(self):
|
||
"""完全移行処理を実行"""
|
||
print("🚀 統合移行処理開始")
|
||
print("=" * 50)
|
||
|
||
if not self.connect_databases():
|
||
return False
|
||
|
||
try:
|
||
# 1. GPS情報移行
|
||
gps_results = self.migrate_gps_information()
|
||
|
||
# 2. 写真記録からチェックイン記録生成
|
||
photo_results = self.generate_checkin_from_photos()
|
||
|
||
# 3. 統計情報生成・出力
|
||
stats = self.generate_migration_statistics()
|
||
self.print_statistics(stats)
|
||
|
||
# 4. 動作確認用のテストクエリ実行
|
||
self.run_verification_tests()
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ 移行処理エラー: {e}")
|
||
return False
|
||
|
||
finally:
|
||
self.close_connections()
|
||
|
||
def run_verification_tests(self):
|
||
"""動作確認テストを実行"""
|
||
print(f"\n🔍 動作確認テスト実行")
|
||
print("-" * 30)
|
||
|
||
# テスト1: MF5-204のデータ確認
|
||
self.cur_rog.execute("""
|
||
SELECT checkin_time, cp_number
|
||
FROM rog_gpscheckin
|
||
WHERE zekken = 'MF5-204'
|
||
ORDER BY checkin_time
|
||
LIMIT 5;
|
||
""")
|
||
|
||
mf5_results = self.cur_rog.fetchall()
|
||
print("✅ MF5-204のチェックイン記録 (最初の5件):")
|
||
for time, cp in mf5_results:
|
||
print(f" {time} - CP{cp}")
|
||
|
||
# テスト2: 美濃加茂イベントの統計
|
||
self.cur_rog.execute("""
|
||
SELECT
|
||
COUNT(*) as total_records,
|
||
COUNT(DISTINCT zekken) as unique_teams,
|
||
MIN(checkin_time) as first_checkin,
|
||
MAX(checkin_time) as last_checkin
|
||
FROM rog_gpscheckin
|
||
WHERE event_code = '美濃加茂';
|
||
""")
|
||
|
||
minokamo_stats = self.cur_rog.fetchone()
|
||
print(f"\n✅ 美濃加茂イベント統計:")
|
||
print(f" 総チェックイン数: {minokamo_stats[0]:,}件")
|
||
print(f" 参加チーム数: {minokamo_stats[1]}チーム")
|
||
print(f" 期間: {minokamo_stats[2]} ~ {minokamo_stats[3]}")
|
||
|
||
# テスト3: 最新のチェックイン記録確認
|
||
self.cur_rog.execute("""
|
||
SELECT event_code, zekken, checkin_time, cp_number
|
||
FROM rog_gpscheckin
|
||
ORDER BY checkin_time DESC
|
||
LIMIT 3;
|
||
""")
|
||
|
||
latest_records = self.cur_rog.fetchall()
|
||
print(f"\n✅ 最新チェックイン記録 (最後の3件):")
|
||
for event, zekken, time, cp in latest_records:
|
||
print(f" {event} - {zekken} - {time} - CP{cp}")
|
||
|
||
print("\n🎯 動作確認完了: 全てのテストが正常に実行されました")
|
||
|
||
def close_connections(self):
|
||
"""データベース接続を閉じる"""
|
||
if self.cur_gif:
|
||
self.cur_gif.close()
|
||
if self.cur_rog:
|
||
self.cur_rog.close()
|
||
if self.conn_gif:
|
||
self.conn_gif.close()
|
||
if self.conn_rog:
|
||
self.conn_rog.close()
|
||
|
||
print("\n✅ データベース接続を閉じました")
|
||
|
||
def main():
|
||
"""メイン実行関数"""
|
||
migrator = MigrationWithPhotoIntegration()
|
||
|
||
try:
|
||
success = migrator.run_complete_migration()
|
||
|
||
if success:
|
||
print("\n" + "="*50)
|
||
print("🎉 統合移行処理が正常に完了しました!")
|
||
print("="*50)
|
||
sys.exit(0)
|
||
else:
|
||
print("\n" + "="*50)
|
||
print("❌ 移行処理中にエラーが発生しました")
|
||
print("="*50)
|
||
sys.exit(1)
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n⚠️ ユーザーによって処理が中断されました")
|
||
sys.exit(1)
|
||
except Exception as e:
|
||
print(f"\n❌ 予期しないエラー: {e}")
|
||
sys.exit(1)
|
||
|
||
if __name__ == "__main__":
|
||
main()
|