Files
rogaining_srv/migration_clean_final.py

451 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
最終クリーン移行プログラム
不正な写真記録データを除外し、正確なGPS記録のみを移行する
"""
import os
import sys
import psycopg2
from datetime import datetime, time, timedelta
import pytz
def get_event_date(event_code):
"""イベントコードに基づいてイベント日付を返す"""
event_dates = {
'美濃加茂': datetime(2024, 5, 19), # 修正済み
'岐阜市': datetime(2024, 4, 28),
'大垣2': datetime(2024, 4, 20),
'各務原': datetime(2024, 3, 24),
'下呂': datetime(2024, 3, 10),
'中津川': datetime(2024, 3, 2),
'揖斐川': datetime(2024, 2, 18),
'高山': datetime(2024, 2, 11),
'大垣': datetime(2024, 1, 27),
'多治見': datetime(2024, 1, 20),
# 2024年のその他のイベント
'養老ロゲ': datetime(2024, 6, 1),
'郡上': datetime(2024, 11, 3), # 郡上イベント追加
}
return event_dates.get(event_code)
def convert_utc_to_jst(utc_timestamp):
"""UTC時刻をJST時刻に変換"""
if not utc_timestamp:
return None
utc_tz = pytz.UTC
jst_tz = pytz.timezone('Asia/Tokyo')
# UTCタイムゾーン情報を付加
if utc_timestamp.tzinfo is None:
utc_timestamp = utc_tz.localize(utc_timestamp)
# JSTに変換
return utc_timestamp.astimezone(jst_tz).replace(tzinfo=None)
def parse_goal_time(goal_time_str, event_date_str):
"""goal_time文字列を適切なdatetimeに変換"""
if not goal_time_str:
return None
try:
# フルの日時形式の場合UTCからJST変換
if len(goal_time_str) > 10:
dt = datetime.fromisoformat(goal_time_str.replace('Z', '+00:00'))
return convert_utc_to_jst(dt)
# 時刻のみの場合JSTとして扱う、1日前の問題を修正
elif ':' in goal_time_str:
time_obj = datetime.strptime(goal_time_str, '%H:%M:%S').time()
# イベント日の翌日の時刻として扱う(競技が翌日まで続くため)
event_date = datetime.strptime(event_date_str, '%Y-%m-%d').date()
next_day = datetime.combine(event_date, time_obj) + timedelta(days=1)
return next_day
return None
except Exception as e:
print(f"goal_time解析エラー: {goal_time_str} - {e}")
return None
def create_rog_event_if_not_exists(cursor, event_code):
"""rog_eventレコードが存在しない場合は作成"""
cursor.execute("SELECT COUNT(*) FROM rog_event WHERE event_name = %s", (event_code,))
if cursor.fetchone()[0] == 0:
event_date = get_event_date(event_code)
if event_date:
start_time = f"{event_date} 08:00:00"
end_time = f"{event_date} 18:00:00"
cursor.execute("""
INSERT INTO rog_event (event_name, start_time, end_time)
VALUES (%s, %s, %s)
""", (event_code, start_time, end_time))
print(f"rog_eventに{event_code}イベントを作成しました")
def create_rog_team_if_not_exists(cursor, zekken, event_code):
"""rog_teamレコードが存在しない場合は作成"""
cursor.execute("""
SELECT COUNT(*) FROM rog_team
WHERE team_number = %s AND event_name = %s
""", (zekken, event_code))
if cursor.fetchone()[0] == 0:
cursor.execute("""
INSERT INTO rog_team (team_number, event_name, team_name)
VALUES (%s, %s, %s)
""", (zekken, event_code, f"チーム{zekken}"))
print(f"rog_teamに{zekken}チームを作成しました")
def clean_target_database(target_cursor):
"""ターゲットデータベースの既存データをクリーンアップ"""
print("=== ターゲットデータベースのクリーンアップ ===")
# 外部キー制約を一時的に無効化
target_cursor.execute("SET session_replication_role = replica;")
try:
# 1. rog_gpscheckinデータを削除
target_cursor.execute("DELETE FROM rog_gpscheckin")
deleted_checkins = target_cursor.rowcount
print(f"チェックインデータを削除: {deleted_checkins}")
# 2. 関連テーブルの削除
target_cursor.execute("DELETE FROM rog_member")
deleted_members = target_cursor.rowcount
print(f"メンバーデータを削除: {deleted_members}")
target_cursor.execute("DELETE FROM rog_entry")
deleted_entries = target_cursor.rowcount
print(f"エントリーデータを削除: {deleted_entries}")
# 3. rog_teamデータを削除
target_cursor.execute("DELETE FROM rog_team")
deleted_teams = target_cursor.rowcount
print(f"チームデータを削除: {deleted_teams}")
# 4. rog_eventデータを削除
target_cursor.execute("DELETE FROM rog_event")
deleted_events = target_cursor.rowcount
print(f"イベントデータを削除: {deleted_events}")
finally:
# 外部キー制約を再有効化
target_cursor.execute("SET session_replication_role = DEFAULT;")
def migrate_gps_data(source_cursor, target_cursor):
"""GPS記録データのみを移行写真記録データは除外"""
print("\n=== GPS記録データの移行 ===")
# GPS記録のみを取得不正な写真記録データを除外
source_cursor.execute("""
SELECT
zekken_number,
event_code,
cp_number,
create_at,
goal_time,
serial_number
FROM gps_information
WHERE serial_number < 20000 -- GPS記録のみ写真記録を除外
ORDER BY serial_number
""")
gps_records = source_cursor.fetchall()
print(f"GPS記録取得: {len(gps_records)}")
migrated_count = 0
skipped_count = 0
error_count = 0
for record in gps_records:
zekken_number, event_code, cp_number, create_at, goal_time, serial_number = record
try:
# イベント日の取得
event_date = get_event_date(event_code)
if not event_date:
print(f"未知のイベントコード: {event_code}")
skipped_count += 1
continue
# イベントとチームの作成
create_rog_event_if_not_exists(target_cursor, event_code)
create_rog_team_if_not_exists(target_cursor, zekken_number, event_code)
# 時刻の変換
checkin_time = convert_utc_to_jst(create_at) if create_at else None
record_time = checkin_time
if checkin_time:
# rog_gpscheckinに挿入
target_cursor.execute("""
INSERT INTO rog_gpscheckin (
zekken, event_code, cp_number, checkin_time,
record_time, serial_number
) VALUES (%s, %s, %s, %s, %s, %s)
""", (zekken_number, event_code, cp_number, checkin_time, record_time, serial_number))
migrated_count += 1
else:
skipped_count += 1
except Exception as e:
print(f"移行エラー (Serial: {serial_number}): {e}")
error_count += 1
print(f"GPS移行完了: 成功 {migrated_count}件, スキップ {skipped_count}件, エラー {error_count}")
return migrated_count, skipped_count, error_count
def generate_migration_statistics(target_cursor):
"""移行統計情報を生成"""
print("\n" + "="*60)
print("📊 移行統計情報")
print("="*60)
# 1. イベント別統計
target_cursor.execute("""
SELECT
event_code,
COUNT(*) as total_records,
COUNT(DISTINCT zekken) as unique_teams,
MIN(checkin_time) as earliest_checkin,
MAX(checkin_time) as latest_checkin
FROM rog_gpscheckin
GROUP BY event_code
ORDER BY total_records DESC
""")
events_stats = target_cursor.fetchall()
print("\n📋 イベント別統計:")
print("イベント名 記録数 チーム数 開始時刻 終了時刻")
print("-" * 75)
total_records = 0
total_teams = 0
for event, records, teams, start, end in events_stats:
print(f"{event:<12} {records:>6}{teams:>6}{start} {end}")
total_records += records
total_teams += teams
print(f"\n✅ 合計: {total_records:,}件のチェックイン記録, {total_teams}チーム")
# 2. 時間帯分析(美濃加茂イベント)
print("\n⏰ 美濃加茂イベントの時間帯分析:")
target_cursor.execute("""
SELECT
EXTRACT(HOUR FROM checkin_time) as hour,
COUNT(*) as count
FROM rog_gpscheckin
WHERE event_code = '美濃加茂'
GROUP BY EXTRACT(HOUR FROM checkin_time)
ORDER BY hour
""")
hourly_stats = target_cursor.fetchall()
print("時間 件数")
print("-" * 15)
for hour, count in hourly_stats:
if hour is not None:
hour_int = int(hour)
bar = "" * min(int(count/50), 20)
print(f"{hour_int:>2}{count:>5}{bar}")
# 3. データ品質確認
print("\n🔍 データ品質確認:")
# 0時台データの確認
target_cursor.execute("""
SELECT COUNT(*)
FROM rog_gpscheckin
WHERE EXTRACT(HOUR FROM checkin_time) = 0
""")
zero_hour_count = target_cursor.fetchone()[0]
print(f"0時台データ: {zero_hour_count}")
# タイムゾーン確認
target_cursor.execute("""
SELECT
EXTRACT(TIMEZONE FROM checkin_time) as tz_offset,
COUNT(*) as count
FROM rog_gpscheckin
GROUP BY EXTRACT(TIMEZONE FROM checkin_time)
ORDER BY tz_offset
""")
tz_stats = target_cursor.fetchall()
print("タイムゾーン分布:")
for tz_offset, count in tz_stats:
if tz_offset is not None:
tz_hours = int(tz_offset) // 3600
tz_name = "JST" if tz_hours == 9 else f"UTC{tz_hours:+d}"
print(f" {tz_name}: {count}")
# 4. MF5-204 サンプル確認
print("\n🎯 MF5-204 サンプルデータ:")
target_cursor.execute("""
SELECT
cp_number,
checkin_time,
EXTRACT(HOUR FROM checkin_time) as hour
FROM rog_gpscheckin
WHERE zekken = 'MF5-204'
ORDER BY checkin_time
LIMIT 10
""")
mf5_samples = target_cursor.fetchall()
if mf5_samples:
print("CP 時刻 JST時")
print("-" * 40)
for cp, time, hour in mf5_samples:
hour_int = int(hour) if hour is not None else 0
print(f"CP{cp:<3} {time} {hour_int:>2}")
else:
print("MF5-204のデータが見つかりません")
def run_verification_tests(target_cursor):
"""移行結果の検証テスト"""
print("\n" + "="*60)
print("🧪 移行結果検証テスト")
print("="*60)
tests_passed = 0
tests_total = 0
# テスト1: 0時台データが存在しないこと
tests_total += 1
target_cursor.execute("""
SELECT COUNT(*)
FROM rog_gpscheckin
WHERE EXTRACT(HOUR FROM checkin_time) = 0
""")
zero_hour_count = target_cursor.fetchone()[0]
if zero_hour_count == 0:
print("✅ テスト1: 0時台データ除去 - 成功")
tests_passed += 1
else:
print(f"❌ テスト1: 0時台データ除去 - 失敗 ({zero_hour_count}件残存)")
# テスト2: MF5-204のデータが正常な時間帯に分散
tests_total += 1
target_cursor.execute("""
SELECT
MIN(EXTRACT(HOUR FROM checkin_time)) as min_hour,
MAX(EXTRACT(HOUR FROM checkin_time)) as max_hour,
COUNT(DISTINCT EXTRACT(HOUR FROM checkin_time)) as hour_variety
FROM rog_gpscheckin
WHERE zekken = 'MF5-204'
""")
mf5_stats = target_cursor.fetchone()
if mf5_stats and mf5_stats[0] >= 9 and mf5_stats[1] <= 23 and mf5_stats[2] >= 3:
print("✅ テスト2: MF5-204時間分散 - 成功")
tests_passed += 1
else:
print(f"❌ テスト2: MF5-204時間分散 - 失敗 (範囲: {mf5_stats})")
# テスト3: GPS記録のみが存在すること
tests_total += 1
target_cursor.execute("""
SELECT
MIN(serial_number::integer) as min_serial,
MAX(serial_number::integer) as max_serial
FROM rog_gpscheckin
""")
serial_range = target_cursor.fetchone()
if serial_range and serial_range[1] < 20000:
print("✅ テスト3: GPS記録のみ - 成功")
tests_passed += 1
else:
print(f"❌ テスト3: GPS記録のみ - 失敗 (Serial範囲: {serial_range})")
# テスト4: 全データがJST時刻であること
tests_total += 1
target_cursor.execute("""
SELECT COUNT(*)
FROM rog_gpscheckin
WHERE EXTRACT(TIMEZONE FROM checkin_time) != 32400 -- JST以外
""")
non_jst_count = target_cursor.fetchone()[0]
if non_jst_count == 0:
print("✅ テスト4: JST時刻統一 - 成功")
tests_passed += 1
else:
print(f"❌ テスト4: JST時刻統一 - 失敗 ({non_jst_count}件が非JST)")
print(f"\n🏆 検証結果: {tests_passed}/{tests_total} テスト成功")
if tests_passed == tests_total:
print("🎉 すべてのテストに合格しました!")
return True
else:
print("⚠️ 一部のテストに失敗しました")
return False
def main():
"""メイン実行関数"""
print("🚀 最終クリーン移行プログラム開始")
print("="*60)
try:
# データベース接続
print("データベースに接続中...")
source_conn = psycopg2.connect(
host='postgres-db',
database='gifuroge',
user=os.environ.get('POSTGRES_USER'),
password=os.environ.get('POSTGRES_PASS')
)
target_conn = psycopg2.connect(
host='postgres-db',
database='rogdb',
user=os.environ.get('POSTGRES_USER'),
password=os.environ.get('POSTGRES_PASS')
)
source_cursor = source_conn.cursor()
target_cursor = target_conn.cursor()
# 1. ターゲットデータベースのクリーンアップ
clean_target_database(target_cursor)
target_conn.commit()
# 2. GPS記録データの移行
migrated, skipped, errors = migrate_gps_data(source_cursor, target_cursor)
target_conn.commit()
# 3. 統計情報生成
generate_migration_statistics(target_cursor)
# 4. 検証テスト実行
verification_passed = run_verification_tests(target_cursor)
# 5. 最終レポート
print("\n" + "="*60)
print("📋 最終移行レポート")
print("="*60)
print(f"✅ 移行成功: {migrated}")
print(f"⏭️ スキップ: {skipped}")
print(f"❌ エラー: {errors}")
print(f"🧪 検証: {'合格' if verification_passed else '不合格'}")
print("")
if verification_passed and errors == 0:
print("🎉 移行プロジェクト完全成功!")
print("✨ 「あり得ない通過データ」問題が根本解決されました")
else:
print("⚠️ 移行に問題があります。ログを確認してください")
source_conn.close()
target_conn.close()
except Exception as e:
print(f"❌ 移行処理中にエラーが発生しました: {e}")
return 1
return 0
if __name__ == "__main__":
sys.exit(main())