Files
rogaining_srv/migration_data_protection.py
2025-08-25 09:06:26 +09:00

369 lines
14 KiB
Python

#!/usr/bin/env python3
"""
GPS記録データマイグレーション (既存データ保護版)
gifurogeからrogdbへ12,665件のGPSチェックイン記録を移行
既存のアプリケーションデータ(188エントリ、226チーム、388メンバー)は保護
"""
import os
import sys
import psycopg2
from datetime import datetime, timezone
from typing import Optional, Dict, List, Tuple
import pytz
# 設定
GIFUROGE_DB = {
'host': 'postgres-db',
'database': 'gifuroge',
'user': 'admin',
'password': 'admin123456',
'port': 5432
}
ROGDB_DB = {
'host': 'postgres-db',
'database': 'rogdb',
'user': 'admin',
'password': 'admin123456',
'port': 5432
}
def convert_utc_to_jst(utc_time):
"""UTC時刻をJST時刻に変換"""
if utc_time is None:
return None
if isinstance(utc_time, str):
utc_time = datetime.fromisoformat(utc_time.replace('Z', '+00:00'))
if utc_time.tzinfo is None:
utc_time = utc_time.replace(tzinfo=timezone.utc)
jst = pytz.timezone('Asia/Tokyo')
return utc_time.astimezone(jst)
def get_event_date(event_name: str) -> Optional[datetime]:
"""イベント名から開催日を推定"""
event_dates = {
'岐阜県ロゲイニング大会': datetime(2024, 11, 23),
'高山市ロゲイニング大会': datetime(2024, 11, 23),
'ロゲイニング大会2024': datetime(2024, 11, 23),
'default': datetime(2024, 11, 23)
}
for key, date in event_dates.items():
if key in event_name:
return date
return event_dates['default']
def parse_goal_time(goal_time_str: str, event_date_str: str) -> Optional[datetime]:
"""goal_time文字列を解析してdatetimeオブジェクトに変換"""
try:
if ':' in goal_time_str:
time_parts = goal_time_str.split(':')
hour = int(time_parts[0])
minute = int(time_parts[1])
event_date = datetime.strptime(event_date_str, "%Y-%m-%d")
goal_datetime = event_date.replace(hour=hour, minute=minute)
jst = pytz.timezone('Asia/Tokyo')
goal_datetime_jst = jst.localize(goal_datetime)
return goal_datetime_jst
except Exception as e:
print(f"goal_time解析エラー: {goal_time_str} - {e}")
return None
def check_database_connectivity():
"""データベース接続確認"""
print("=== データベース接続確認 ===")
try:
# gifuroge DB接続確認
source_conn = psycopg2.connect(**GIFUROGE_DB)
source_cursor = source_conn.cursor()
source_cursor.execute("SELECT COUNT(*) FROM gps_information")
source_count = source_cursor.fetchone()[0]
print(f"✅ gifuroge DB接続成功: gps_information {source_count}")
source_conn.close()
# rogdb DB接続確認
target_conn = psycopg2.connect(**ROGDB_DB)
target_cursor = target_conn.cursor()
target_cursor.execute("SELECT COUNT(*) FROM rog_gpscheckin")
target_count = target_cursor.fetchone()[0]
print(f"✅ rogdb DB接続成功: rog_gpscheckin {target_count}")
target_conn.close()
return True
except Exception as e:
print(f"❌ データベース接続エラー: {e}")
return False
def verify_location2025_compatibility(target_cursor):
"""rog_location2025テーブルとの互換性確認"""
print("\n=== Location2025互換性確認 ===")
try:
# テーブル存在確認
target_cursor.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'rog_location2025'
)
""")
table_exists = target_cursor.fetchone()[0]
if not table_exists:
print("⚠️ rog_location2025テーブルが存在しません")
return True # テーブルが存在しない場合は互換性チェックをスキップ
# カラム構造を動的に確認
target_cursor.execute("""
SELECT column_name FROM information_schema.columns
WHERE table_name = 'rog_location2025'
AND table_schema = 'public'
""")
columns = [row[0] for row in target_cursor.fetchall()]
print(f"検出されたカラム: {columns}")
# event_codeまたはevent_nameカラムの存在確認
event_column = None
if 'event_code' in columns:
event_column = 'event_code'
elif 'event_name' in columns:
event_column = 'event_name'
if event_column:
# 動的にクエリを構築
query = f"""
SELECT e.{event_column}, COUNT(l.id) as location_count
FROM rog_entry e
LEFT JOIN rog_location2025 l ON e.id = l.entry_id
GROUP BY e.{event_column}
HAVING COUNT(l.id) > 0
"""
target_cursor.execute(query)
location_data = target_cursor.fetchall()
print(f"既存のLocation2025データ:")
for event_id, count in location_data:
print(f" {event_column} {event_id}: {count}")
print("✅ Location2025互換性確認完了")
return True
else:
print("⚠️ event_codeもevent_nameも見つかりません")
return True
except Exception as e:
print(f"❌ Location2025互換性確認エラー: {e}")
# トランザクションエラーの場合はロールバック
try:
target_cursor.connection.rollback()
except:
pass
return False
def backup_existing_data(target_cursor):
"""既存データのバックアップ状況を確認"""
print("\n=== 既存データ保護確認 ===")
try:
# 既存データ数を確認
target_cursor.execute("SELECT COUNT(*) FROM rog_entry")
entry_count = target_cursor.fetchone()[0]
target_cursor.execute("SELECT COUNT(*) FROM rog_team")
team_count = target_cursor.fetchone()[0]
target_cursor.execute("SELECT COUNT(*) FROM rog_member")
member_count = target_cursor.fetchone()[0]
target_cursor.execute("SELECT COUNT(*) FROM rog_gpscheckin")
checkin_count = target_cursor.fetchone()[0]
# Location2025データ数も確認
try:
target_cursor.execute("SELECT COUNT(*) FROM rog_location2025")
location2025_count = target_cursor.fetchone()[0]
print(f" rog_location2025: {location2025_count} 件 (保護対象)")
except Exception as e:
print(f" rog_location2025: 確認エラー ({e})")
location2025_count = 0
print(f"既存データ保護状況:")
print(f" rog_entry: {entry_count} 件 (保護対象)")
print(f" rog_team: {team_count} 件 (保護対象)")
print(f" rog_member: {member_count} 件 (保護対象)")
print(f" rog_gpscheckin: {checkin_count} 件 (移行対象)")
if entry_count > 0 or team_count > 0 or member_count > 0:
print("✅ 既存のcore application dataが検出されました。これらは保護されます。")
return True
else:
print("⚠️ 既存のcore application dataが見つかりません。")
return False
except Exception as e:
print(f"❌ 既存データ確認エラー: {e}")
# トランザクションエラーの場合はロールバック
try:
target_cursor.connection.rollback()
except:
pass
return False
def migrate_gps_data(source_cursor, target_cursor):
"""GPS記録データのみを移行(写真記録データは除外)"""
print("\n=== GPS記録データの移行 ===")
try:
# GPS記録のみを取得(不正な写真記録データを除外)
source_cursor.execute("""
SELECT
serial_number, team_name, cp_number, record_time,
goal_time, late_point, buy_flag, image_address,
minus_photo_flag, create_user, update_user,
colabo_company_memo
FROM gps_information
WHERE serial_number < 20000 -- GPS専用データのみ
ORDER BY serial_number
""")
gps_records = source_cursor.fetchall()
print(f"移行対象GPS記録数: {len(gps_records)}")
migrated_count = 0
error_count = 0
for record in gps_records:
try:
(serial_number, team_name, cp_number, record_time,
goal_time, late_point, buy_flag, image_address,
minus_photo_flag, create_user, update_user,
colabo_company_memo) = record
# UTC時刻をJST時刻に変換
record_time_jst = convert_utc_to_jst(record_time)
goal_time_utc = None
if goal_time:
# goal_timeをUTCに変換
if isinstance(goal_time, str):
# イベント名からイベント日付を取得
event_name = colabo_company_memo or "不明"
event_date = get_event_date(event_name)
if event_date:
goal_time_utc = parse_goal_time(goal_time, event_date.strftime("%Y-%m-%d"))
elif isinstance(goal_time, datetime):
goal_time_utc = convert_utc_to_jst(goal_time)
# rog_gpscheckinに挿入(マイグレーション用マーカー付き)
target_cursor.execute("""
INSERT INTO rog_gpscheckin
(serial_number, team_name, cp_number, record_time, goal_time,
late_point, buy_flag, image_address, minus_photo_flag,
create_user, update_user, comment)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
serial_number, team_name, cp_number, record_time_jst, goal_time_utc,
late_point, buy_flag, image_address, minus_photo_flag,
create_user, update_user, 'migrated_from_gifuroge'
))
migrated_count += 1
if migrated_count % 1000 == 0:
print(f" 移行進捗: {migrated_count}/{len(gps_records)}")
target_cursor.connection.commit()
except Exception as e:
error_count += 1
print(f" レコード移行エラー(serial_number={serial_number}): {e}")
if error_count > 100: # エラー上限
print("❌ エラー数が上限を超えました。移行を中止します。")
raise
target_cursor.connection.commit()
print(f"✅ GPS記録移行完了: {migrated_count}件成功, {error_count}件エラー")
return migrated_count
except Exception as e:
print(f"❌ GPS記録移行エラー: {e}")
target_cursor.connection.rollback()
raise
def main():
"""メイン移行処理(既存データ保護版)"""
print("=" * 60)
print("GPS記録データ移行スクリプト (既存データ保護版)")
print("=" * 60)
print("移行対象: gifuroge.gps_information → rogdb.rog_gpscheckin")
print("既存データ保護: rog_entry, rog_team, rog_member, rog_location2025")
print("=" * 60)
try:
# 1. データベース接続確認
if not check_database_connectivity():
return False
# 2. 実際の移行処理
source_conn = psycopg2.connect(**GIFUROGE_DB)
target_conn = psycopg2.connect(**ROGDB_DB)
source_cursor = source_conn.cursor()
target_cursor = target_conn.cursor()
# 3. 既存データ確認
has_existing_data = backup_existing_data(target_cursor)
# 4. Location2025互換性確認
is_compatible = verify_location2025_compatibility(target_cursor)
if not is_compatible:
print("❌ Location2025互換性に問題があります。")
return False
# 5. 安全確認
if has_existing_data:
print("\n⚠️ 既存のアプリケーションデータが検出されました。")
print("この移行操作は既存データを保護しながらGPS記録のみを移行します。")
confirm = input("続行しますか? (yes/no): ")
if confirm.lower() != 'yes':
print("移行を中止しました。")
return False
# 6. GPS記録データ移行
migrated_count = migrate_gps_data(source_cursor, target_cursor)
# 7. 完了確認
print("\n" + "=" * 60)
print("移行完了サマリー")
print("=" * 60)
print(f"移行されたGPS記録: {migrated_count}")
print("保護された既存データ: rog_entry, rog_team, rog_member, rog_location2025")
print("✅ データ移行が完了しました!")
return True
except Exception as e:
print(f"\n❌ 移行処理エラー: {e}")
return False
finally:
try:
source_conn.close()
target_conn.close()
except:
pass
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)