Files
rogaining_srv/migration_final_simple.py

318 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
"""
最終クリーン移行プログラム(シンプル版)
- GPS記録のみ移行
- 写真記録由来のデータは除外
- トランザクション管理を簡素化
- エラーハンドリングを強化
"""
import psycopg2
from datetime import datetime, timedelta
import pytz
import os
from collections import defaultdict
def get_event_date(event_code):
"""イベントコードに基づいてイベント日付を返す"""
event_dates = {
'美濃加茂': datetime(2024, 5, 19),
'岐阜市': datetime(2024, 4, 28),
'大垣2': datetime(2024, 4, 20),
'各務原': datetime(2024, 3, 24),
'下呂': datetime(2024, 3, 10),
'中津川': datetime(2024, 3, 2),
'揖斐川': datetime(2024, 2, 18),
'高山': datetime(2024, 2, 11),
'大垣': datetime(2024, 1, 27),
'多治見': datetime(2024, 1, 20),
'養老ロゲ': datetime(2024, 6, 1),
'郡上': datetime(2024, 11, 3),
}
return event_dates.get(event_code)
def parse_goal_time(goal_time_str, event_date):
"""goal_time文字列をパースしてdatetimeに変換"""
if not goal_time_str:
return None
try:
# HH:MM:SS形式の場合
if len(goal_time_str) <= 8:
time_parts = goal_time_str.split(':')
if len(time_parts) >= 2:
hour = int(time_parts[0])
minute = int(time_parts[1])
second = int(time_parts[2]) if len(time_parts) > 2 else 0
# イベント日の時刻として設定
goal_datetime = event_date.replace(hour=hour, minute=minute, second=second)
return goal_datetime
else:
# フルdatetime形式の場合
goal_datetime = datetime.strptime(goal_time_str, '%Y-%m-%d %H:%M:%S')
return goal_datetime
except Exception as e:
print(f"goal_time解析エラー: {goal_time_str} - {e}")
return None
def convert_utc_to_jst(utc_datetime):
"""UTC時刻をJST時刻に変換"""
try:
if not utc_datetime:
return None
utc_tz = pytz.UTC
jst_tz = pytz.timezone('Asia/Tokyo')
if isinstance(utc_datetime, str):
utc_datetime = datetime.strptime(utc_datetime, '%Y-%m-%d %H:%M:%S')
if utc_datetime.tzinfo is None:
utc_datetime = utc_tz.localize(utc_datetime)
jst_datetime = utc_datetime.astimezone(jst_tz)
return jst_datetime.replace(tzinfo=None)
except Exception as e:
print(f"時刻変換エラー: {utc_datetime} - {e}")
return None
def clean_target_database(target_cursor):
"""ターゲットデータベースのクリーンアップ"""
print("ターゲットデータベースをクリーンアップ中...")
try:
# 外部キー制約を一時的に無効化
target_cursor.execute("SET session_replication_role = replica;")
# テーブルをクリア
target_cursor.execute("DELETE FROM rog_gpscheckin;")
target_cursor.execute("DELETE FROM rog_member;")
target_cursor.execute("DELETE FROM rog_entry;")
target_cursor.execute("DELETE FROM rog_team;")
target_cursor.execute("DELETE FROM rog_event;")
# 外部キー制約を再有効化
target_cursor.execute("SET session_replication_role = DEFAULT;")
print("ターゲットデータベースのクリーンアップ完了")
return True
except Exception as e:
print(f"クリーンアップエラー: {e}")
return False
def create_events_and_teams(target_cursor, event_stats):
"""イベントとチームを作成"""
print("イベントとチームを作成中...")
created_events = set()
created_teams = set()
for event_code, teams in event_stats.items():
event_date = get_event_date(event_code)
if not event_date:
continue
# イベント作成
if event_code not in created_events:
try:
target_cursor.execute("""
INSERT INTO rog_event (event_code, event_name, event_date, created_at, updated_at)
VALUES (%s, %s, %s, %s, %s)
""", (event_code, event_code, event_date.date(), datetime.now(), datetime.now()))
created_events.add(event_code)
print(f"イベント作成: {event_code}")
except Exception as e:
print(f"イベント作成エラー: {event_code} - {e}")
# チーム作成
for team_zekken in teams:
team_key = (event_code, team_zekken)
if team_key not in created_teams:
try:
target_cursor.execute("""
INSERT INTO rog_team (zekken, event_code, created_at, updated_at)
VALUES (%s, %s, %s, %s)
""", (team_zekken, event_code, datetime.now(), datetime.now()))
created_teams.add(team_key)
except Exception as e:
print(f"チーム作成エラー: {team_key} - {e}")
print(f"作成完了: {len(created_events)}イベント, {len(created_teams)}チーム")
def migrate_gps_data(source_cursor, target_cursor):
"""GPS記録のみを移行"""
print("GPS記録の移行を開始...")
# GPS記録のみ取得serial_number < 20000
source_cursor.execute("""
SELECT serial_number, zekken_number, event_code, cp_number, create_at, goal_time
FROM gps_information
WHERE serial_number < 20000
ORDER BY serial_number
""")
gps_records = source_cursor.fetchall()
print(f"GPS記録数: {len(gps_records)}")
success_count = 0
skip_count = 0
error_count = 0
event_stats = defaultdict(set)
for record in gps_records:
serial_number, zekken, event_code, cp_number, create_at, goal_time = record
try:
# イベント日付取得
event_date = get_event_date(event_code)
if not event_date:
print(f"未知のイベントコード: {event_code}")
skip_count += 1
continue
# 時刻変換
jst_create_at = convert_utc_to_jst(create_at)
jst_goal_time = parse_goal_time(goal_time, event_date) if goal_time else None
if not jst_create_at:
print(f"時刻変換失敗: {serial_number}")
error_count += 1
continue
# チェックイン記録挿入
target_cursor.execute("""
INSERT INTO rog_gpscheckin (
zekken, event_code, cp_number, checkin_time, record_time, serial_number
) VALUES (%s, %s, %s, %s, %s, %s)
""", (zekken, event_code, cp_number, jst_create_at, jst_create_at, str(serial_number)))
event_stats[event_code].add(zekken)
success_count += 1
if success_count % 100 == 0:
print(f"移行進捗: {success_count}件完了")
except Exception as e:
print(f"移行エラー (Serial: {serial_number}): {e}")
error_count += 1
print(f"GPS移行完了: 成功 {success_count}件, スキップ {skip_count}件, エラー {error_count}")
return event_stats, success_count, skip_count, error_count
def generate_statistics(target_cursor, success_count):
"""統計情報を生成"""
print("\n" + "="*60)
print("📊 移行統計情報")
print("="*60)
if success_count == 0:
print("移行されたデータがありません")
return
# イベント別統計
target_cursor.execute("""
SELECT event_code, COUNT(*) as record_count,
COUNT(DISTINCT zekken) as team_count,
MIN(checkin_time) as start_time,
MAX(checkin_time) as end_time
FROM rog_gpscheckin
GROUP BY event_code
ORDER BY record_count DESC
""")
stats = target_cursor.fetchall()
print("\n📋 イベント別統計:")
print("イベント名 記録数 チーム数 開始時刻 終了時刻")
print("-" * 75)
total_records = 0
total_teams = 0
for stat in stats:
event_code, record_count, team_count, start_time, end_time = stat
total_records += record_count
total_teams += team_count
start_str = start_time.strftime("%Y-%m-%d %H:%M") if start_time else "N/A"
end_str = end_time.strftime("%Y-%m-%d %H:%M") if end_time else "N/A"
print(f"{event_code:<12} {record_count:>6}{team_count:>4}チーム {start_str} {end_str}")
print(f"\n✅ 合計: {total_records}件のチェックイン記録, {total_teams}チーム")
# 0時台データチェック
target_cursor.execute("""
SELECT COUNT(*) FROM rog_gpscheckin
WHERE EXTRACT(hour FROM checkin_time) = 0
""")
zero_hour_count = target_cursor.fetchone()[0]
print(f"\n🔍 データ品質確認:")
print(f"0時台データ: {zero_hour_count}")
if zero_hour_count == 0:
print("✅ 0時台データは正常に除外されました")
else:
print("⚠️ 0時台データが残っています")
def main():
"""メイン処理"""
print("最終クリーン移行プログラム(シンプル版)を開始...")
try:
# データベース接続
print("データベースに接続中...")
source_conn = psycopg2.connect(
host='postgres-db',
database='gifuroge',
user=os.environ.get('POSTGRES_USER'),
password=os.environ.get('POSTGRES_PASS')
)
source_conn.autocommit = True
target_conn = psycopg2.connect(
host='postgres-db',
database='rogdb',
user=os.environ.get('POSTGRES_USER'),
password=os.environ.get('POSTGRES_PASS')
)
target_conn.autocommit = True
source_cursor = source_conn.cursor()
target_cursor = target_conn.cursor()
# 1. ターゲットデータベースのクリーンアップ
if not clean_target_database(target_cursor):
print("クリーンアップに失敗しました")
return
# 2. GPS記録の移行
event_stats, success_count, skip_count, error_count = migrate_gps_data(source_cursor, target_cursor)
# 3. イベントとチームの作成
create_events_and_teams(target_cursor, event_stats)
# 4. 統計情報の生成
generate_statistics(target_cursor, success_count)
print("\n✅ 移行処理が完了しました")
except Exception as e:
print(f"❌ 移行処理中にエラーが発生しました: {e}")
import traceback
traceback.print_exc()
finally:
try:
source_cursor.close()
target_cursor.close()
source_conn.close()
target_conn.close()
except:
pass
if __name__ == "__main__":
main()