Files
rogaining_srv/migration_location2025_support.py

438 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Location2025対応版移行プログラム
既存のentry、team、memberデータを削除せずに移行データを追加し、
Location2025テーブルとの整合性を確保する
"""
import os
import sys
import psycopg2
from datetime import datetime, time, timedelta
import pytz
def get_event_date(event_code):
"""イベントコードに基づいてイベント日付を返す"""
event_dates = {
'美濃加茂': datetime(2024, 5, 19),
'岐阜市': datetime(2024, 4, 28),
'大垣2': datetime(2024, 4, 20),
'各務原': datetime(2024, 3, 24),
'下呂': datetime(2024, 3, 10),
'中津川': datetime(2024, 3, 2),
'揖斐川': datetime(2024, 2, 18),
'高山': datetime(2024, 2, 11),
'大垣': datetime(2024, 1, 27),
'多治見': datetime(2024, 1, 20),
'養老ロゲ': datetime(2024, 6, 1),
'郡上': datetime(2024, 11, 3),
# 2025年新規イベント
'岐阜ロゲイニング2025': datetime(2025, 9, 15),
}
return event_dates.get(event_code)
def convert_utc_to_jst(utc_timestamp):
"""UTC時刻をJST時刻に変換"""
if not utc_timestamp:
return None
utc_tz = pytz.UTC
jst_tz = pytz.timezone('Asia/Tokyo')
# UTCタイムゾーン情報を付加
if utc_timestamp.tzinfo is None:
utc_timestamp = utc_tz.localize(utc_timestamp)
# JSTに変換
return utc_timestamp.astimezone(jst_tz).replace(tzinfo=None)
def parse_goal_time(goal_time_str, event_date_str):
"""goal_time文字列を適切なdatetimeに変換"""
if not goal_time_str:
return None
try:
# goal_timeが時刻のみの場合例: "13:45:00"
goal_time = datetime.strptime(goal_time_str, "%H:%M:%S").time()
# event_date_strからイベント日付を解析
event_date = datetime.strptime(event_date_str, "%Y-%m-%d").date()
# 日付と時刻を結合
goal_datetime = datetime.combine(event_date, goal_time)
# JSTとして解釈
jst_tz = pytz.timezone('Asia/Tokyo')
goal_datetime_jst = jst_tz.localize(goal_datetime)
# UTCに変換して返す
return goal_datetime_jst.astimezone(pytz.UTC)
except (ValueError, TypeError) as e:
print(f"goal_time変換エラー: {goal_time_str} - {e}")
return None
def verify_location2025_compatibility(target_cursor):
"""Location2025テーブルとの互換性を確認"""
print("\n=== Location2025互換性確認 ===")
try:
# Location2025テーブルの存在確認
target_cursor.execute("""
SELECT COUNT(*) FROM information_schema.tables
WHERE table_name = 'rog_location2025'
""")
table_exists = target_cursor.fetchone()[0] > 0
if table_exists:
# Location2025のデータ数確認
target_cursor.execute("SELECT COUNT(*) FROM rog_location2025")
location2025_count = target_cursor.fetchone()[0]
print(f"✅ rog_location2025テーブル存在: {location2025_count}件のチェックポイント")
# イベント別チェックポイント数確認
target_cursor.execute("""
SELECT e.event_code, COUNT(l.id) as checkpoint_count
FROM rog_location2025 l
JOIN rog_newevent2 e ON l.event_id = e.id
GROUP BY e.event_code
ORDER BY checkpoint_count DESC
""")
event_checkpoints = target_cursor.fetchall()
print("イベント別チェックポイント数:")
for event_code, count in event_checkpoints:
print(f" {event_code}: {count}")
return True
else:
print("⚠️ rog_location2025テーブルが見つかりません")
print("注意: 移行は可能ですが、チェックポイント管理機能は制限されます")
return False
except Exception as e:
print(f"❌ Location2025互換性確認エラー: {e}")
return False
def validate_checkpoint_references(target_cursor, source_cursor):
"""チェックポイント参照の整合性検証"""
print("\n=== チェックポイント参照整合性検証 ===")
try:
# ソースデータのチェックポイント番号を取得
source_cursor.execute("""
SELECT DISTINCT cp_number, colabo_company_memo as event_name
FROM gps_information
WHERE serial_number < 20000
AND cp_number IS NOT NULL
ORDER BY cp_number
""")
source_checkpoints = source_cursor.fetchall()
print(f"ソースデータのチェックポイント: {len(source_checkpoints)}種類")
# Location2025のチェックポイント番号を取得
target_cursor.execute("""
SELECT DISTINCT l.cp_number, e.event_code
FROM rog_location2025 l
JOIN rog_newevent2 e ON l.event_id = e.id
ORDER BY l.cp_number
""")
target_checkpoints = target_cursor.fetchall()
print(f"Location2025のチェックポイント: {len(target_checkpoints)}種類")
# 不一致のチェックポイントを特定
source_cp_set = set((cp, event) for cp, event in source_checkpoints if cp and event)
target_cp_set = set((cp, event) for cp, event in target_checkpoints if cp and event)
missing_in_target = source_cp_set - target_cp_set
if missing_in_target:
print("⚠️ Location2025で不足しているチェックポイント:")
for cp, event in sorted(missing_in_target):
print(f" CP{cp} ({event})")
else:
print("✅ すべてのチェックポイント参照が整合しています")
return len(missing_in_target) == 0
except Exception as e:
print(f"❌ チェックポイント参照検証エラー: {e}")
return False
def clean_target_database_selective(target_cursor):
"""ターゲットデータベースの選択的クリーンアップ(既存データを保護)"""
print("=== ターゲットデータベースの選択的クリーンアップ ===")
# 外部キー制約を一時的に無効化
target_cursor.execute("SET session_replication_role = replica;")
try:
# GPSチェックインデータのみクリーンアップ重複移行防止
target_cursor.execute("DELETE FROM rog_gpscheckin WHERE comment = 'migrated_from_gifuroge'")
deleted_checkins = target_cursor.rowcount
print(f"過去の移行GPSチェックインデータを削除: {deleted_checkins}")
# 注意: rog_entry, rog_team, rog_member, rog_location2025 は削除しない!
print("注意: 既存のentry、team、member、location2025データは保護されます")
finally:
# 外部キー制約を再有効化
target_cursor.execute("SET session_replication_role = DEFAULT;")
def backup_existing_data(target_cursor):
"""既存データのバックアップ状況を確認"""
print("\n=== 既存データ保護確認 ===")
# 既存データ数を確認
target_cursor.execute("SELECT COUNT(*) FROM rog_entry")
entry_count = target_cursor.fetchone()[0]
target_cursor.execute("SELECT COUNT(*) FROM rog_team")
team_count = target_cursor.fetchone()[0]
target_cursor.execute("SELECT COUNT(*) FROM rog_member")
member_count = target_cursor.fetchone()[0]
# Location2025データ数も確認
try:
target_cursor.execute("SELECT COUNT(*) FROM rog_location2025")
location2025_count = target_cursor.fetchone()[0]
print(f"✅ Location2025チェックポイント: {location2025_count}")
except Exception as e:
print(f"⚠️ Location2025確認エラー: {e}")
location2025_count = 0
if entry_count > 0 or team_count > 0 or member_count > 0:
print("✅ 既存のcore application dataが検出されました。これらは保護されます。")
print(f" - エントリー: {entry_count}")
print(f" - チーム: {team_count}")
print(f" - メンバー: {member_count}")
return True
else:
print("⚠️ 既存のcore application dataが見つかりません。")
print("注意: restore_core_data.pyの実行を検討してください。")
return False
def migrate_gps_data_with_location2025_validation(source_cursor, target_cursor):
"""Location2025対応版GPSデータ移行"""
print("\n=== Location2025対応版GPSデータ移行 ===")
# GPS専用データ取得serial_number < 20000
source_cursor.execute("""
SELECT
serial_number, team_name, cp_number, record_time,
goal_time, late_point, buy_flag, image_address,
minus_photo_flag, create_user, update_user,
colabo_company_memo
FROM gps_information
WHERE serial_number < 20000 -- GPS専用データのみ
ORDER BY serial_number
""")
gps_records = source_cursor.fetchall()
print(f"移行対象GPSデータ: {len(gps_records)}")
migrated_count = 0
error_count = 0
checkpoint_warnings = set()
for record in gps_records:
try:
(serial_number, team_name, cp_number, record_time, goal_time,
late_point, buy_flag, image_address, minus_photo_flag,
create_user, update_user, colabo_company_memo) = record
# Location2025でのチェックポイント存在確認警告のみ
if cp_number and colabo_company_memo:
target_cursor.execute("""
SELECT COUNT(*) FROM rog_location2025 l
JOIN rog_newevent2 e ON l.event_id = e.id
WHERE l.cp_number = %s AND e.event_code = %s
""", (cp_number, colabo_company_memo))
checkpoint_exists = target_cursor.fetchone()[0] > 0
if not checkpoint_exists:
warning_key = (cp_number, colabo_company_memo)
if warning_key not in checkpoint_warnings:
checkpoint_warnings.add(warning_key)
print(f"⚠️ チェックポイント未定義: CP{cp_number} in {colabo_company_memo}")
# UTC時刻をJST時刻に変換
record_time_jst = convert_utc_to_jst(record_time)
goal_time_utc = None
if goal_time:
# goal_timeをUTCに変換
if isinstance(goal_time, str):
# イベント名からイベント日付を取得
event_name = colabo_company_memo or "不明"
event_date = get_event_date(event_name)
if event_date:
goal_time_utc = parse_goal_time(goal_time, event_date.strftime("%Y-%m-%d"))
elif isinstance(goal_time, datetime):
goal_time_utc = convert_utc_to_jst(goal_time)
# rog_gpscheckinに挿入Location2025対応マーカー付き
target_cursor.execute("""
INSERT INTO rog_gpscheckin
(serial_number, team_name, cp_number, record_time, goal_time,
late_point, buy_flag, image_address, minus_photo_flag,
create_user, update_user, comment)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
serial_number, team_name, cp_number, record_time_jst, goal_time_utc,
late_point, buy_flag, image_address, minus_photo_flag,
create_user, update_user, 'migrated_from_gifuroge_location2025_compatible'
))
migrated_count += 1
if migrated_count % 1000 == 0:
print(f"移行進捗: {migrated_count}/{len(gps_records)}")
except Exception as e:
error_count += 1
print(f"移行エラー (record {serial_number}): {e}")
continue
print(f"\n移行完了: {migrated_count}件成功, {error_count}件エラー")
if checkpoint_warnings:
print(f"チェックポイント警告: {len(checkpoint_warnings)}種類のチェックポイントがLocation2025で未定義")
return migrated_count
def generate_location2025_migration_report(target_cursor):
"""Location2025移行レポート生成"""
print("\n=== Location2025移行レポート ===")
try:
# 移行されたGPSデータの統計
target_cursor.execute("""
SELECT COUNT(*) FROM rog_gpscheckin
WHERE comment LIKE 'migrated_from_gifuroge%'
""")
migrated_gps_count = target_cursor.fetchone()[0]
# イベント別移行データ統計
target_cursor.execute("""
SELECT
COALESCE(update_user, 'unknown') as event_name,
COUNT(*) as gps_count,
COUNT(DISTINCT cp_number) as unique_checkpoints,
MIN(record_time) as first_checkin,
MAX(record_time) as last_checkin
FROM rog_gpscheckin
WHERE comment LIKE 'migrated_from_gifuroge%'
GROUP BY update_user
ORDER BY gps_count DESC
""")
event_stats = target_cursor.fetchall()
print(f"📊 総移行GPS記録: {migrated_gps_count}")
print("📋 イベント別統計:")
for event_name, gps_count, unique_cps, first_time, last_time in event_stats:
print(f" {event_name}: {gps_count}件 (CP: {unique_cps}種類)")
# Location2025との整合性確認
target_cursor.execute("""
SELECT COUNT(DISTINCT l.cp_number)
FROM rog_location2025 l
JOIN rog_newevent2 e ON l.event_id = e.id
""")
defined_checkpoints = target_cursor.fetchone()[0]
print(f"🎯 Location2025定義済みチェックポイント: {defined_checkpoints}種類")
except Exception as e:
print(f"❌ レポート生成エラー: {e}")
def main():
"""メイン移行処理Location2025対応版"""
print("=== Location2025対応版移行プログラム開始 ===")
print("注意: 既存のentry、team、member、location2025データは削除されません")
# データベース接続設定
source_config = {
'host': 'localhost',
'port': '5433',
'database': 'gifuroge',
'user': 'postgres',
'password': 'postgres'
}
target_config = {
'host': 'localhost',
'port': '5432',
'database': 'rogdb',
'user': 'postgres',
'password': 'postgres'
}
source_conn = None
target_conn = None
try:
# データベース接続
print("データベースに接続中...")
source_conn = psycopg2.connect(**source_config)
target_conn = psycopg2.connect(**target_config)
source_cursor = source_conn.cursor()
target_cursor = target_conn.cursor()
# Location2025互換性確認
location2025_available = verify_location2025_compatibility(target_cursor)
# 既存データ保護確認
has_existing_data = backup_existing_data(target_cursor)
# チェックポイント参照整合性検証Location2025が利用可能な場合
if location2025_available:
validate_checkpoint_references(target_cursor, source_cursor)
# 確認プロンプト
print(f"\nLocation2025対応: {'✅ 利用可能' if location2025_available else '⚠️ 制限あり'}")
print(f"既存データ保護: {'✅ 検出済み' if has_existing_data else '⚠️ 未検出'}")
response = input("\n移行を開始しますか? (y/N): ")
if response.lower() != 'y':
print("移行を中止しました。")
return
# ターゲットデータベースの選択的クリーンアップ
clean_target_database_selective(target_cursor)
target_conn.commit()
# Location2025対応版GPSデータ移行
migrated_count = migrate_gps_data_with_location2025_validation(source_cursor, target_cursor)
if migrated_count > 0:
target_conn.commit()
print("✅ 移行データをコミットしました")
# 移行レポート生成
generate_location2025_migration_report(target_cursor)
else:
print("❌ 移行データがありません")
except Exception as e:
print(f"❌ 移行エラー: {e}")
if target_conn:
target_conn.rollback()
finally:
# 接続を閉じる
if source_conn:
source_conn.close()
if target_conn:
target_conn.close()
print("=== Location2025対応版移行プログラム終了 ===")
if __name__ == "__main__":
main()