Files
rogaining_srv/migration_data_protection.py
2025-08-25 14:28:30 +09:00

470 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
GPS記録データマイグレーション (既存データ保護版)
gifurogeからrogdbへ12,665件のGPSチェックイン記録を移行
既存のアプリケーションデータ(188エントリ、226チーム、388メンバー)は保護
"""
import os
import sys
import psycopg2
from datetime import datetime, timezone
from typing import Optional, Dict, List, Tuple
import pytz
# 設定
GIFUROGE_DB = {
'host': 'postgres-db',
'database': 'gifuroge',
'user': 'admin',
'password': 'admin123456',
'port': 5432
}
ROGDB_DB = {
'host': 'postgres-db',
'database': 'rogdb',
'user': 'admin',
'password': 'admin123456',
'port': 5432
}
def convert_utc_to_jst(utc_time):
"""UTC時刻をJST時刻に変換"""
if utc_time is None:
return None
if isinstance(utc_time, str):
utc_time = datetime.fromisoformat(utc_time.replace('Z', '+00:00'))
if utc_time.tzinfo is None:
utc_time = utc_time.replace(tzinfo=timezone.utc)
jst = pytz.timezone('Asia/Tokyo')
return utc_time.astimezone(jst)
def get_event_date(event_name: str) -> Optional[datetime]:
"""イベント名から開催日を推定"""
event_dates = {
'岐阜県ロゲイニング大会': datetime(2024, 11, 23),
'高山市ロゲイニング大会': datetime(2024, 11, 23),
'ロゲイニング大会2024': datetime(2024, 11, 23),
'default': datetime(2024, 11, 23)
}
for key, date in event_dates.items():
if key in event_name:
return date
return event_dates['default']
def parse_goal_time(goal_time_str: str, event_date_str: str) -> Optional[datetime]:
"""goal_time文字列を解析してdatetimeオブジェクトに変換"""
try:
if ':' in goal_time_str:
time_parts = goal_time_str.split(':')
hour = int(time_parts[0])
minute = int(time_parts[1])
event_date = datetime.strptime(event_date_str, "%Y-%m-%d")
goal_datetime = event_date.replace(hour=hour, minute=minute)
jst = pytz.timezone('Asia/Tokyo')
goal_datetime_jst = jst.localize(goal_datetime)
return goal_datetime_jst
except Exception as e:
print(f"goal_time解析エラー: {goal_time_str} - {e}")
return None
def check_database_connectivity():
"""データベース接続確認"""
print("=== データベース接続確認 ===")
try:
# gifuroge DB接続確認
source_conn = psycopg2.connect(**GIFUROGE_DB)
source_cursor = source_conn.cursor()
source_cursor.execute("SELECT COUNT(*) FROM gps_information")
source_count = source_cursor.fetchone()[0]
print(f"✅ gifuroge DB接続成功: gps_information {source_count}")
# テーブル構造確認
source_cursor.execute("""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = 'gps_information'
ORDER BY ordinal_position
""")
columns = source_cursor.fetchall()
print("📋 gps_informationテーブル構造:")
for col_name, col_type in columns:
print(f" {col_name}: {col_type}")
source_conn.close()
# rogdb DB接続確認
target_conn = psycopg2.connect(**ROGDB_DB)
target_cursor = target_conn.cursor()
target_cursor.execute("SELECT COUNT(*) FROM rog_gpscheckin")
target_count = target_cursor.fetchone()[0]
print(f"✅ rogdb DB接続成功: rog_gpscheckin {target_count}")
# 移行先テーブル構造確認
target_cursor.execute("""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = 'rog_gpscheckin'
ORDER BY ordinal_position
""")
target_columns = target_cursor.fetchall()
print("📋 rog_gpscheckinテーブル構造:")
for col_name, col_type in target_columns:
print(f" {col_name}: {col_type}")
target_conn.close()
return True
except Exception as e:
print(f"❌ データベース接続エラー: {e}")
return False
def verify_location2025_compatibility(target_cursor):
"""rog_location2025テーブルとの互換性確認"""
print("\n=== Location2025互換性確認 ===")
try:
# テーブル存在確認
target_cursor.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'rog_location2025'
)
""")
table_exists = target_cursor.fetchone()[0]
if not table_exists:
print("⚠️ rog_location2025テーブルが存在しません")
return True # テーブルが存在しない場合は互換性チェックをスキップ
# カラム構造を動的に確認
target_cursor.execute("""
SELECT column_name FROM information_schema.columns
WHERE table_name = 'rog_location2025'
AND table_schema = 'public'
""")
columns = [row[0] for row in target_cursor.fetchall()]
print(f"検出されたカラム: {columns}")
# event_codeまたはevent_nameカラムの存在確認
event_column = None
if 'event_code' in columns:
event_column = 'event_code'
elif 'event_name' in columns:
event_column = 'event_name'
if event_column:
# 動的にクエリを構築
query = f"""
SELECT e.{event_column}, COUNT(l.id) as location_count
FROM rog_entry e
LEFT JOIN rog_location2025 l ON e.id = l.entry_id
GROUP BY e.{event_column}
HAVING COUNT(l.id) > 0
"""
target_cursor.execute(query)
location_data = target_cursor.fetchall()
print(f"既存のLocation2025データ:")
for event_id, count in location_data:
print(f" {event_column} {event_id}: {count}")
print("✅ Location2025互換性確認完了")
return True
else:
print("⚠️ event_codeもevent_nameも見つかりません")
return True
except Exception as e:
print(f"❌ Location2025互換性確認エラー: {e}")
# トランザクションエラーの場合はロールバック
try:
target_cursor.connection.rollback()
except:
pass
return False
def backup_existing_data(target_cursor):
"""既存データのバックアップ状況を確認"""
print("\n=== 既存データ保護確認 ===")
try:
# 既存データ数を確認
target_cursor.execute("SELECT COUNT(*) FROM rog_entry")
entry_count = target_cursor.fetchone()[0]
target_cursor.execute("SELECT COUNT(*) FROM rog_team")
team_count = target_cursor.fetchone()[0]
target_cursor.execute("SELECT COUNT(*) FROM rog_member")
member_count = target_cursor.fetchone()[0]
target_cursor.execute("SELECT COUNT(*) FROM rog_gpscheckin")
checkin_count = target_cursor.fetchone()[0]
# Location2025データ数も確認
try:
target_cursor.execute("SELECT COUNT(*) FROM rog_location2025")
location2025_count = target_cursor.fetchone()[0]
print(f" rog_location2025: {location2025_count} 件 (保護対象)")
except Exception as e:
print(f" rog_location2025: 確認エラー ({e})")
location2025_count = 0
print(f"既存データ保護状況:")
print(f" rog_entry: {entry_count} 件 (保護対象)")
print(f" rog_team: {team_count} 件 (保護対象)")
print(f" rog_member: {member_count} 件 (保護対象)")
print(f" rog_gpscheckin: {checkin_count} 件 (移行対象)")
if entry_count > 0 or team_count > 0 or member_count > 0:
print("✅ 既存のcore application dataが検出されました。これらは保護されます。")
return True
else:
print("⚠️ 既存のcore application dataが見つかりません。")
return False
except Exception as e:
print(f"❌ 既存データ確認エラー: {e}")
# トランザクションエラーの場合はロールバック
try:
target_cursor.connection.rollback()
except:
pass
return False
def migrate_gps_data(source_cursor, target_cursor):
"""GPS記録データのみを移行(写真記録データは除外)"""
print("\n=== GPS記録データの移行 ===")
try:
# 移行元テーブルの構造を確認
source_cursor.execute("""
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'gps_information'
ORDER BY ordinal_position
""")
source_columns = [row[0] for row in source_cursor.fetchall()]
print(f"📋 移行元カラム: {source_columns}")
# 移行先テーブルの構造を確認
target_cursor.execute("""
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'rog_gpscheckin'
ORDER BY ordinal_position
""")
target_columns = [row[0] for row in target_cursor.fetchall()]
print(f"📋 移行先カラム: {target_columns}")
# 必要なカラムのマッピングを確認
column_mapping = {
'serial_number': 'serial_number',
'team_name': 'team_name' if 'team_name' in source_columns else None,
'zekken_number': 'zekken_number' if 'zekken_number' in source_columns else None,
'event_code': 'event_code' if 'event_code' in source_columns else None,
'cp_number': 'cp_number',
'record_time': 'create_at' if 'create_at' in source_columns else 'record_time',
'goal_time': 'goal_time',
'late_point': 'late_point',
'buy_flag': 'buy_flag',
'image_address': 'image_address',
'minus_photo_flag': 'minus_photo_flag',
'create_user': 'create_user',
'update_user': 'update_user',
'colabo_company_memo': 'colabo_company_memo'
}
# 実際に存在するカラムでクエリを構築
select_columns = []
for key, column in column_mapping.items():
if column and column in source_columns:
select_columns.append(f"{column} as {key}")
else:
select_columns.append(f"NULL as {key}")
query = f"""
SELECT {', '.join(select_columns)}
FROM gps_information
WHERE serial_number < 20000 -- GPS専用データのみ
ORDER BY serial_number
"""
print(f"📋 実行クエリ: {query}")
source_cursor.execute(query)
gps_records = source_cursor.fetchall()
print(f"移行対象GPS記録数: {len(gps_records)}")
migrated_count = 0
error_count = 0
for record in gps_records:
try:
# レコードを解析NULLの場合はデフォルト値を設定
record_data = {}
for i, key in enumerate(column_mapping.keys()):
record_data[key] = record[i] if i < len(record) else None
serial_number = record_data['serial_number']
team_name = record_data['team_name'] or f"Team_{record_data['zekken_number'] or serial_number}"
zekken_number = record_data['zekken_number'] or serial_number
event_code = record_data['event_code'] or 'unknown'
cp_number = record_data['cp_number']
record_time = record_data['record_time']
goal_time = record_data['goal_time']
late_point = record_data['late_point']
buy_flag = record_data['buy_flag']
image_address = record_data['image_address']
minus_photo_flag = record_data['minus_photo_flag']
create_user = record_data['create_user']
update_user = record_data['update_user']
colabo_company_memo = record_data['colabo_company_memo']
# UTC時刻をJST時刻に変換
record_time_jst = convert_utc_to_jst(record_time)
goal_time_utc = None
if goal_time:
# goal_timeをUTCに変換
if isinstance(goal_time, str):
# イベント名からイベント日付を取得
event_name = colabo_company_memo or "不明"
event_date = get_event_date(event_name)
if event_date:
goal_time_utc = parse_goal_time(goal_time, event_date.strftime("%Y-%m-%d"))
elif isinstance(goal_time, datetime):
goal_time_utc = convert_utc_to_jst(goal_time)
# 移行先テーブルに合わせてINSERT文を動的構築
insert_columns = ['serial_number', 'cp_number', 'record_time', 'goal_time',
'late_point', 'buy_flag', 'image_address', 'minus_photo_flag',
'create_user', 'update_user', 'comment']
insert_values = [serial_number, cp_number, record_time_jst, goal_time_utc,
late_point, buy_flag, image_address, minus_photo_flag,
create_user, update_user, f'migrated_from_gifuroge_team_{team_name}_zekken_{zekken_number}_event_{event_code}']
# 移行先テーブルに存在するカラムのみを使用
final_columns = []
final_values = []
for i, col in enumerate(insert_columns):
if col in target_columns:
final_columns.append(col)
final_values.append(insert_values[i])
placeholders = ', '.join(['%s'] * len(final_columns))
columns_str = ', '.join(final_columns)
target_cursor.execute(f"""
INSERT INTO rog_gpscheckin ({columns_str})
VALUES ({placeholders})
""", final_values)
migrated_count += 1
if migrated_count % 1000 == 0:
print(f" 移行進捗: {migrated_count}/{len(gps_records)}")
target_cursor.connection.commit()
except Exception as e:
error_count += 1
print(f" レコード移行エラー(serial_number={serial_number}): {e}")
# トランザクションエラーの場合はロールバックして続行
try:
target_cursor.connection.rollback()
except:
pass
if error_count > 100: # エラー上限
print("❌ エラー数が上限を超えました。移行を中止します。")
raise
target_cursor.connection.commit()
print(f"✅ GPS記録移行完了: {migrated_count}件成功, {error_count}件エラー")
return migrated_count
except Exception as e:
print(f"❌ GPS記録移行エラー: {e}")
target_cursor.connection.rollback()
raise
def main():
"""メイン移行処理(既存データ保護版)"""
print("=" * 60)
print("GPS記録データ移行スクリプト (既存データ保護版)")
print("=" * 60)
print("移行対象: gifuroge.gps_information → rogdb.rog_gpscheckin")
print("既存データ保護: rog_entry, rog_team, rog_member, rog_location2025")
print("=" * 60)
try:
# 1. データベース接続確認
if not check_database_connectivity():
return False
# 2. 実際の移行処理
source_conn = psycopg2.connect(**GIFUROGE_DB)
target_conn = psycopg2.connect(**ROGDB_DB)
source_cursor = source_conn.cursor()
target_cursor = target_conn.cursor()
# 3. 既存データ確認
has_existing_data = backup_existing_data(target_cursor)
# 4. Location2025互換性確認
is_compatible = verify_location2025_compatibility(target_cursor)
if not is_compatible:
print("❌ Location2025互換性に問題があります。")
return False
# 5. 安全確認
if has_existing_data:
print("\n⚠️ 既存のアプリケーションデータが検出されました。")
print("この移行操作は既存データを保護しながらGPS記録のみを移行します。")
confirm = input("続行しますか? (yes/no): ")
if confirm.lower() != 'yes':
print("移行を中止しました。")
return False
# 6. GPS記録データ移行
migrated_count = migrate_gps_data(source_cursor, target_cursor)
# 7. 完了確認
print("\n" + "=" * 60)
print("移行完了サマリー")
print("=" * 60)
print(f"移行されたGPS記録: {migrated_count}")
print("保護された既存データ: rog_entry, rog_team, rog_member, rog_location2025")
print("✅ データ移行が完了しました!")
return True
except Exception as e:
print(f"\n❌ 移行処理エラー: {e}")
return False
finally:
try:
source_conn.close()
target_conn.close()
except:
pass
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)