451 lines
16 KiB
Python
451 lines
16 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
最終クリーン移行プログラム
|
||
不正な写真記録データを除外し、正確なGPS記録のみを移行する
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import psycopg2
|
||
from datetime import datetime, time, timedelta
|
||
import pytz
|
||
|
||
def get_event_date(event_code):
|
||
"""イベントコードに基づいてイベント日付を返す"""
|
||
event_dates = {
|
||
'美濃加茂': datetime(2024, 5, 19), # 修正済み
|
||
'岐阜市': datetime(2024, 4, 28),
|
||
'大垣2': datetime(2024, 4, 20),
|
||
'各務原': datetime(2024, 3, 24),
|
||
'下呂': datetime(2024, 3, 10),
|
||
'中津川': datetime(2024, 3, 2),
|
||
'揖斐川': datetime(2024, 2, 18),
|
||
'高山': datetime(2024, 2, 11),
|
||
'大垣': datetime(2024, 1, 27),
|
||
'多治見': datetime(2024, 1, 20),
|
||
# 2024年のその他のイベント
|
||
'養老ロゲ': datetime(2024, 6, 1),
|
||
'郡上': datetime(2024, 11, 3), # 郡上イベント追加
|
||
}
|
||
return event_dates.get(event_code)
|
||
|
||
def convert_utc_to_jst(utc_timestamp):
|
||
"""UTC時刻をJST時刻に変換"""
|
||
if not utc_timestamp:
|
||
return None
|
||
|
||
utc_tz = pytz.UTC
|
||
jst_tz = pytz.timezone('Asia/Tokyo')
|
||
|
||
# UTCタイムゾーン情報を付加
|
||
if utc_timestamp.tzinfo is None:
|
||
utc_timestamp = utc_tz.localize(utc_timestamp)
|
||
|
||
# JSTに変換
|
||
return utc_timestamp.astimezone(jst_tz).replace(tzinfo=None)
|
||
|
||
def parse_goal_time(goal_time_str, event_date_str):
|
||
"""goal_time文字列を適切なdatetimeに変換"""
|
||
if not goal_time_str:
|
||
return None
|
||
|
||
try:
|
||
# フルの日時形式の場合(UTCからJST変換)
|
||
if len(goal_time_str) > 10:
|
||
dt = datetime.fromisoformat(goal_time_str.replace('Z', '+00:00'))
|
||
return convert_utc_to_jst(dt)
|
||
|
||
# 時刻のみの場合(JSTとして扱う、1日前の問題を修正)
|
||
elif ':' in goal_time_str:
|
||
time_obj = datetime.strptime(goal_time_str, '%H:%M:%S').time()
|
||
# イベント日の翌日の時刻として扱う(競技が翌日まで続くため)
|
||
event_date = datetime.strptime(event_date_str, '%Y-%m-%d').date()
|
||
next_day = datetime.combine(event_date, time_obj) + timedelta(days=1)
|
||
return next_day
|
||
|
||
return None
|
||
except Exception as e:
|
||
print(f"goal_time解析エラー: {goal_time_str} - {e}")
|
||
return None
|
||
|
||
def create_rog_event_if_not_exists(cursor, event_code):
|
||
"""rog_eventレコードが存在しない場合は作成"""
|
||
cursor.execute("SELECT COUNT(*) FROM rog_event WHERE event_name = %s", (event_code,))
|
||
if cursor.fetchone()[0] == 0:
|
||
event_date = get_event_date(event_code)
|
||
if event_date:
|
||
start_time = f"{event_date} 08:00:00"
|
||
end_time = f"{event_date} 18:00:00"
|
||
|
||
cursor.execute("""
|
||
INSERT INTO rog_event (event_name, start_time, end_time)
|
||
VALUES (%s, %s, %s)
|
||
""", (event_code, start_time, end_time))
|
||
print(f"rog_eventに{event_code}イベントを作成しました")
|
||
|
||
def create_rog_team_if_not_exists(cursor, zekken, event_code):
|
||
"""rog_teamレコードが存在しない場合は作成"""
|
||
cursor.execute("""
|
||
SELECT COUNT(*) FROM rog_team
|
||
WHERE team_number = %s AND event_name = %s
|
||
""", (zekken, event_code))
|
||
|
||
if cursor.fetchone()[0] == 0:
|
||
cursor.execute("""
|
||
INSERT INTO rog_team (team_number, event_name, team_name)
|
||
VALUES (%s, %s, %s)
|
||
""", (zekken, event_code, f"チーム{zekken}"))
|
||
print(f"rog_teamに{zekken}チームを作成しました")
|
||
|
||
def clean_target_database(target_cursor):
|
||
"""ターゲットデータベースの既存データをクリーンアップ"""
|
||
print("=== ターゲットデータベースのクリーンアップ ===")
|
||
|
||
# 外部キー制約を一時的に無効化
|
||
target_cursor.execute("SET session_replication_role = replica;")
|
||
|
||
try:
|
||
# 1. rog_gpscheckinデータを削除
|
||
target_cursor.execute("DELETE FROM rog_gpscheckin")
|
||
deleted_checkins = target_cursor.rowcount
|
||
print(f"チェックインデータを削除: {deleted_checkins}件")
|
||
|
||
# 2. 関連テーブルの削除
|
||
target_cursor.execute("DELETE FROM rog_member")
|
||
deleted_members = target_cursor.rowcount
|
||
print(f"メンバーデータを削除: {deleted_members}件")
|
||
|
||
target_cursor.execute("DELETE FROM rog_entry")
|
||
deleted_entries = target_cursor.rowcount
|
||
print(f"エントリーデータを削除: {deleted_entries}件")
|
||
|
||
# 3. rog_teamデータを削除
|
||
target_cursor.execute("DELETE FROM rog_team")
|
||
deleted_teams = target_cursor.rowcount
|
||
print(f"チームデータを削除: {deleted_teams}件")
|
||
|
||
# 4. rog_eventデータを削除
|
||
target_cursor.execute("DELETE FROM rog_event")
|
||
deleted_events = target_cursor.rowcount
|
||
print(f"イベントデータを削除: {deleted_events}件")
|
||
|
||
finally:
|
||
# 外部キー制約を再有効化
|
||
target_cursor.execute("SET session_replication_role = DEFAULT;")
|
||
|
||
def migrate_gps_data(source_cursor, target_cursor):
|
||
"""GPS記録データのみを移行(写真記録データは除外)"""
|
||
print("\n=== GPS記録データの移行 ===")
|
||
|
||
# GPS記録のみを取得(不正な写真記録データを除外)
|
||
source_cursor.execute("""
|
||
SELECT
|
||
zekken_number,
|
||
event_code,
|
||
cp_number,
|
||
create_at,
|
||
goal_time,
|
||
serial_number
|
||
FROM gps_information
|
||
WHERE serial_number < 20000 -- GPS記録のみ(写真記録を除外)
|
||
ORDER BY serial_number
|
||
""")
|
||
|
||
gps_records = source_cursor.fetchall()
|
||
print(f"GPS記録取得: {len(gps_records)}件")
|
||
|
||
migrated_count = 0
|
||
skipped_count = 0
|
||
error_count = 0
|
||
|
||
for record in gps_records:
|
||
zekken_number, event_code, cp_number, create_at, goal_time, serial_number = record
|
||
|
||
try:
|
||
# イベント日の取得
|
||
event_date = get_event_date(event_code)
|
||
if not event_date:
|
||
print(f"未知のイベントコード: {event_code}")
|
||
skipped_count += 1
|
||
continue
|
||
|
||
# イベントとチームの作成
|
||
create_rog_event_if_not_exists(target_cursor, event_code)
|
||
create_rog_team_if_not_exists(target_cursor, zekken_number, event_code)
|
||
|
||
# 時刻の変換
|
||
checkin_time = convert_utc_to_jst(create_at) if create_at else None
|
||
record_time = checkin_time
|
||
|
||
if checkin_time:
|
||
# rog_gpscheckinに挿入
|
||
target_cursor.execute("""
|
||
INSERT INTO rog_gpscheckin (
|
||
zekken, event_code, cp_number, checkin_time,
|
||
record_time, serial_number
|
||
) VALUES (%s, %s, %s, %s, %s, %s)
|
||
""", (zekken_number, event_code, cp_number, checkin_time, record_time, serial_number))
|
||
|
||
migrated_count += 1
|
||
else:
|
||
skipped_count += 1
|
||
|
||
except Exception as e:
|
||
print(f"移行エラー (Serial: {serial_number}): {e}")
|
||
error_count += 1
|
||
|
||
print(f"GPS移行完了: 成功 {migrated_count}件, スキップ {skipped_count}件, エラー {error_count}件")
|
||
return migrated_count, skipped_count, error_count
|
||
|
||
def generate_migration_statistics(target_cursor):
|
||
"""移行統計情報を生成"""
|
||
print("\n" + "="*60)
|
||
print("📊 移行統計情報")
|
||
print("="*60)
|
||
|
||
# 1. イベント別統計
|
||
target_cursor.execute("""
|
||
SELECT
|
||
event_code,
|
||
COUNT(*) as total_records,
|
||
COUNT(DISTINCT zekken) as unique_teams,
|
||
MIN(checkin_time) as earliest_checkin,
|
||
MAX(checkin_time) as latest_checkin
|
||
FROM rog_gpscheckin
|
||
GROUP BY event_code
|
||
ORDER BY total_records DESC
|
||
""")
|
||
|
||
events_stats = target_cursor.fetchall()
|
||
print("\n📋 イベント別統計:")
|
||
print("イベント名 記録数 チーム数 開始時刻 終了時刻")
|
||
print("-" * 75)
|
||
total_records = 0
|
||
total_teams = 0
|
||
for event, records, teams, start, end in events_stats:
|
||
print(f"{event:<12} {records:>6}件 {teams:>6}組 {start} {end}")
|
||
total_records += records
|
||
total_teams += teams
|
||
|
||
print(f"\n✅ 合計: {total_records:,}件のチェックイン記録, {total_teams}チーム")
|
||
|
||
# 2. 時間帯分析(美濃加茂イベント)
|
||
print("\n⏰ 美濃加茂イベントの時間帯分析:")
|
||
target_cursor.execute("""
|
||
SELECT
|
||
EXTRACT(HOUR FROM checkin_time) as hour,
|
||
COUNT(*) as count
|
||
FROM rog_gpscheckin
|
||
WHERE event_code = '美濃加茂'
|
||
GROUP BY EXTRACT(HOUR FROM checkin_time)
|
||
ORDER BY hour
|
||
""")
|
||
|
||
hourly_stats = target_cursor.fetchall()
|
||
print("時間 件数")
|
||
print("-" * 15)
|
||
for hour, count in hourly_stats:
|
||
if hour is not None:
|
||
hour_int = int(hour)
|
||
bar = "█" * min(int(count/50), 20)
|
||
print(f"{hour_int:>2}時 {count:>5}件 {bar}")
|
||
|
||
# 3. データ品質確認
|
||
print("\n🔍 データ品質確認:")
|
||
|
||
# 0時台データの確認
|
||
target_cursor.execute("""
|
||
SELECT COUNT(*)
|
||
FROM rog_gpscheckin
|
||
WHERE EXTRACT(HOUR FROM checkin_time) = 0
|
||
""")
|
||
zero_hour_count = target_cursor.fetchone()[0]
|
||
print(f"0時台データ: {zero_hour_count}件")
|
||
|
||
# タイムゾーン確認
|
||
target_cursor.execute("""
|
||
SELECT
|
||
EXTRACT(TIMEZONE FROM checkin_time) as tz_offset,
|
||
COUNT(*) as count
|
||
FROM rog_gpscheckin
|
||
GROUP BY EXTRACT(TIMEZONE FROM checkin_time)
|
||
ORDER BY tz_offset
|
||
""")
|
||
|
||
tz_stats = target_cursor.fetchall()
|
||
print("タイムゾーン分布:")
|
||
for tz_offset, count in tz_stats:
|
||
if tz_offset is not None:
|
||
tz_hours = int(tz_offset) // 3600
|
||
tz_name = "JST" if tz_hours == 9 else f"UTC{tz_hours:+d}"
|
||
print(f" {tz_name}: {count}件")
|
||
|
||
# 4. MF5-204 サンプル確認
|
||
print("\n🎯 MF5-204 サンプルデータ:")
|
||
target_cursor.execute("""
|
||
SELECT
|
||
cp_number,
|
||
checkin_time,
|
||
EXTRACT(HOUR FROM checkin_time) as hour
|
||
FROM rog_gpscheckin
|
||
WHERE zekken = 'MF5-204'
|
||
ORDER BY checkin_time
|
||
LIMIT 10
|
||
""")
|
||
|
||
mf5_samples = target_cursor.fetchall()
|
||
if mf5_samples:
|
||
print("CP 時刻 JST時")
|
||
print("-" * 40)
|
||
for cp, time, hour in mf5_samples:
|
||
hour_int = int(hour) if hour is not None else 0
|
||
print(f"CP{cp:<3} {time} {hour_int:>2}")
|
||
else:
|
||
print("MF5-204のデータが見つかりません")
|
||
|
||
def run_verification_tests(target_cursor):
|
||
"""移行結果の検証テスト"""
|
||
print("\n" + "="*60)
|
||
print("🧪 移行結果検証テスト")
|
||
print("="*60)
|
||
|
||
tests_passed = 0
|
||
tests_total = 0
|
||
|
||
# テスト1: 0時台データが存在しないこと
|
||
tests_total += 1
|
||
target_cursor.execute("""
|
||
SELECT COUNT(*)
|
||
FROM rog_gpscheckin
|
||
WHERE EXTRACT(HOUR FROM checkin_time) = 0
|
||
""")
|
||
zero_hour_count = target_cursor.fetchone()[0]
|
||
if zero_hour_count == 0:
|
||
print("✅ テスト1: 0時台データ除去 - 成功")
|
||
tests_passed += 1
|
||
else:
|
||
print(f"❌ テスト1: 0時台データ除去 - 失敗 ({zero_hour_count}件残存)")
|
||
|
||
# テスト2: MF5-204のデータが正常な時間帯に分散
|
||
tests_total += 1
|
||
target_cursor.execute("""
|
||
SELECT
|
||
MIN(EXTRACT(HOUR FROM checkin_time)) as min_hour,
|
||
MAX(EXTRACT(HOUR FROM checkin_time)) as max_hour,
|
||
COUNT(DISTINCT EXTRACT(HOUR FROM checkin_time)) as hour_variety
|
||
FROM rog_gpscheckin
|
||
WHERE zekken = 'MF5-204'
|
||
""")
|
||
mf5_stats = target_cursor.fetchone()
|
||
if mf5_stats and mf5_stats[0] >= 9 and mf5_stats[1] <= 23 and mf5_stats[2] >= 3:
|
||
print("✅ テスト2: MF5-204時間分散 - 成功")
|
||
tests_passed += 1
|
||
else:
|
||
print(f"❌ テスト2: MF5-204時間分散 - 失敗 (範囲: {mf5_stats})")
|
||
|
||
# テスト3: GPS記録のみが存在すること
|
||
tests_total += 1
|
||
target_cursor.execute("""
|
||
SELECT
|
||
MIN(serial_number::integer) as min_serial,
|
||
MAX(serial_number::integer) as max_serial
|
||
FROM rog_gpscheckin
|
||
""")
|
||
serial_range = target_cursor.fetchone()
|
||
if serial_range and serial_range[1] < 20000:
|
||
print("✅ テスト3: GPS記録のみ - 成功")
|
||
tests_passed += 1
|
||
else:
|
||
print(f"❌ テスト3: GPS記録のみ - 失敗 (Serial範囲: {serial_range})")
|
||
|
||
# テスト4: 全データがJST時刻であること
|
||
tests_total += 1
|
||
target_cursor.execute("""
|
||
SELECT COUNT(*)
|
||
FROM rog_gpscheckin
|
||
WHERE EXTRACT(TIMEZONE FROM checkin_time) != 32400 -- JST以外
|
||
""")
|
||
non_jst_count = target_cursor.fetchone()[0]
|
||
if non_jst_count == 0:
|
||
print("✅ テスト4: JST時刻統一 - 成功")
|
||
tests_passed += 1
|
||
else:
|
||
print(f"❌ テスト4: JST時刻統一 - 失敗 ({non_jst_count}件が非JST)")
|
||
|
||
print(f"\n🏆 検証結果: {tests_passed}/{tests_total} テスト成功")
|
||
|
||
if tests_passed == tests_total:
|
||
print("🎉 すべてのテストに合格しました!")
|
||
return True
|
||
else:
|
||
print("⚠️ 一部のテストに失敗しました")
|
||
return False
|
||
|
||
def main():
|
||
"""メイン実行関数"""
|
||
print("🚀 最終クリーン移行プログラム開始")
|
||
print("="*60)
|
||
|
||
try:
|
||
# データベース接続
|
||
print("データベースに接続中...")
|
||
|
||
source_conn = psycopg2.connect(
|
||
host='postgres-db',
|
||
database='gifuroge',
|
||
user=os.environ.get('POSTGRES_USER'),
|
||
password=os.environ.get('POSTGRES_PASS')
|
||
)
|
||
|
||
target_conn = psycopg2.connect(
|
||
host='postgres-db',
|
||
database='rogdb',
|
||
user=os.environ.get('POSTGRES_USER'),
|
||
password=os.environ.get('POSTGRES_PASS')
|
||
)
|
||
|
||
source_cursor = source_conn.cursor()
|
||
target_cursor = target_conn.cursor()
|
||
|
||
# 1. ターゲットデータベースのクリーンアップ
|
||
clean_target_database(target_cursor)
|
||
target_conn.commit()
|
||
|
||
# 2. GPS記録データの移行
|
||
migrated, skipped, errors = migrate_gps_data(source_cursor, target_cursor)
|
||
target_conn.commit()
|
||
|
||
# 3. 統計情報生成
|
||
generate_migration_statistics(target_cursor)
|
||
|
||
# 4. 検証テスト実行
|
||
verification_passed = run_verification_tests(target_cursor)
|
||
|
||
# 5. 最終レポート
|
||
print("\n" + "="*60)
|
||
print("📋 最終移行レポート")
|
||
print("="*60)
|
||
print(f"✅ 移行成功: {migrated}件")
|
||
print(f"⏭️ スキップ: {skipped}件")
|
||
print(f"❌ エラー: {errors}件")
|
||
print(f"🧪 検証: {'合格' if verification_passed else '不合格'}")
|
||
print("")
|
||
|
||
if verification_passed and errors == 0:
|
||
print("🎉 移行プロジェクト完全成功!")
|
||
print("✨ 「あり得ない通過データ」問題が根本解決されました")
|
||
else:
|
||
print("⚠️ 移行に問題があります。ログを確認してください")
|
||
|
||
source_conn.close()
|
||
target_conn.close()
|
||
|
||
except Exception as e:
|
||
print(f"❌ 移行処理中にエラーが発生しました: {e}")
|
||
return 1
|
||
|
||
return 0
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|