470 lines
18 KiB
Python
470 lines
18 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
GPS記録データマイグレーション (既存データ保護版)
|
||
gifurogeからrogdbへ12,665件のGPSチェックイン記録を移行
|
||
既存のアプリケーションデータ(188エントリ、226チーム、388メンバー)は保護
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import psycopg2
|
||
from datetime import datetime, timezone
|
||
from typing import Optional, Dict, List, Tuple
|
||
import pytz
|
||
|
||
# 設定
|
||
GIFUROGE_DB = {
|
||
'host': 'postgres-db',
|
||
'database': 'gifuroge',
|
||
'user': 'admin',
|
||
'password': 'admin123456',
|
||
'port': 5432
|
||
}
|
||
|
||
ROGDB_DB = {
|
||
'host': 'postgres-db',
|
||
'database': 'rogdb',
|
||
'user': 'admin',
|
||
'password': 'admin123456',
|
||
'port': 5432
|
||
}
|
||
|
||
def convert_utc_to_jst(utc_time):
|
||
"""UTC時刻をJST時刻に変換"""
|
||
if utc_time is None:
|
||
return None
|
||
|
||
if isinstance(utc_time, str):
|
||
utc_time = datetime.fromisoformat(utc_time.replace('Z', '+00:00'))
|
||
|
||
if utc_time.tzinfo is None:
|
||
utc_time = utc_time.replace(tzinfo=timezone.utc)
|
||
|
||
jst = pytz.timezone('Asia/Tokyo')
|
||
return utc_time.astimezone(jst)
|
||
|
||
def get_event_date(event_name: str) -> Optional[datetime]:
|
||
"""イベント名から開催日を推定"""
|
||
event_dates = {
|
||
'岐阜県ロゲイニング大会': datetime(2024, 11, 23),
|
||
'高山市ロゲイニング大会': datetime(2024, 11, 23),
|
||
'ロゲイニング大会2024': datetime(2024, 11, 23),
|
||
'default': datetime(2024, 11, 23)
|
||
}
|
||
|
||
for key, date in event_dates.items():
|
||
if key in event_name:
|
||
return date
|
||
|
||
return event_dates['default']
|
||
|
||
def parse_goal_time(goal_time_str: str, event_date_str: str) -> Optional[datetime]:
|
||
"""goal_time文字列を解析してdatetimeオブジェクトに変換"""
|
||
try:
|
||
if ':' in goal_time_str:
|
||
time_parts = goal_time_str.split(':')
|
||
hour = int(time_parts[0])
|
||
minute = int(time_parts[1])
|
||
|
||
event_date = datetime.strptime(event_date_str, "%Y-%m-%d")
|
||
goal_datetime = event_date.replace(hour=hour, minute=minute)
|
||
|
||
jst = pytz.timezone('Asia/Tokyo')
|
||
goal_datetime_jst = jst.localize(goal_datetime)
|
||
|
||
return goal_datetime_jst
|
||
except Exception as e:
|
||
print(f"goal_time解析エラー: {goal_time_str} - {e}")
|
||
|
||
return None
|
||
|
||
def check_database_connectivity():
|
||
"""データベース接続確認"""
|
||
print("=== データベース接続確認 ===")
|
||
|
||
try:
|
||
# gifuroge DB接続確認
|
||
source_conn = psycopg2.connect(**GIFUROGE_DB)
|
||
source_cursor = source_conn.cursor()
|
||
source_cursor.execute("SELECT COUNT(*) FROM gps_information")
|
||
source_count = source_cursor.fetchone()[0]
|
||
print(f"✅ gifuroge DB接続成功: gps_information {source_count}件")
|
||
|
||
# テーブル構造確認
|
||
source_cursor.execute("""
|
||
SELECT column_name, data_type
|
||
FROM information_schema.columns
|
||
WHERE table_name = 'gps_information'
|
||
ORDER BY ordinal_position
|
||
""")
|
||
columns = source_cursor.fetchall()
|
||
print("📋 gps_informationテーブル構造:")
|
||
for col_name, col_type in columns:
|
||
print(f" {col_name}: {col_type}")
|
||
|
||
source_conn.close()
|
||
|
||
# rogdb DB接続確認
|
||
target_conn = psycopg2.connect(**ROGDB_DB)
|
||
target_cursor = target_conn.cursor()
|
||
target_cursor.execute("SELECT COUNT(*) FROM gps_checkins")
|
||
target_count = target_cursor.fetchone()[0]
|
||
print(f"✅ rogdb DB接続成功: gps_checkins {target_count}件")
|
||
|
||
# 移行先テーブル構造確認
|
||
target_cursor.execute("""
|
||
SELECT column_name, data_type
|
||
FROM information_schema.columns
|
||
WHERE table_name = 'gps_checkins'
|
||
ORDER BY ordinal_position
|
||
""")
|
||
target_columns = target_cursor.fetchall()
|
||
print("📋 gps_checkinsテーブル構造:")
|
||
for col_name, col_type in target_columns:
|
||
print(f" {col_name}: {col_type}")
|
||
|
||
target_conn.close()
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ データベース接続エラー: {e}")
|
||
return False
|
||
|
||
def verify_location2025_compatibility(target_cursor):
|
||
"""rog_location2025テーブルとの互換性確認"""
|
||
print("\n=== Location2025互換性確認 ===")
|
||
|
||
try:
|
||
# テーブル存在確認
|
||
target_cursor.execute("""
|
||
SELECT EXISTS (
|
||
SELECT FROM information_schema.tables
|
||
WHERE table_schema = 'public'
|
||
AND table_name = 'rog_location2025'
|
||
)
|
||
""")
|
||
table_exists = target_cursor.fetchone()[0]
|
||
|
||
if not table_exists:
|
||
print("⚠️ rog_location2025テーブルが存在しません")
|
||
return True # テーブルが存在しない場合は互換性チェックをスキップ
|
||
|
||
# カラム構造を動的に確認
|
||
target_cursor.execute("""
|
||
SELECT column_name FROM information_schema.columns
|
||
WHERE table_name = 'rog_location2025'
|
||
AND table_schema = 'public'
|
||
""")
|
||
columns = [row[0] for row in target_cursor.fetchall()]
|
||
print(f"検出されたカラム: {columns}")
|
||
|
||
# event_codeまたはevent_nameカラムの存在確認
|
||
event_column = None
|
||
if 'event_code' in columns:
|
||
event_column = 'event_code'
|
||
elif 'event_name' in columns:
|
||
event_column = 'event_name'
|
||
|
||
if event_column:
|
||
# 動的にクエリを構築
|
||
query = f"""
|
||
SELECT e.{event_column}, COUNT(l.id) as location_count
|
||
FROM rog_entry e
|
||
LEFT JOIN rog_location2025 l ON e.id = l.entry_id
|
||
GROUP BY e.{event_column}
|
||
HAVING COUNT(l.id) > 0
|
||
"""
|
||
target_cursor.execute(query)
|
||
location_data = target_cursor.fetchall()
|
||
|
||
print(f"既存のLocation2025データ:")
|
||
for event_id, count in location_data:
|
||
print(f" {event_column} {event_id}: {count}件")
|
||
|
||
print("✅ Location2025互換性確認完了")
|
||
return True
|
||
else:
|
||
print("⚠️ event_codeもevent_nameも見つかりません")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ Location2025互換性確認エラー: {e}")
|
||
# トランザクションエラーの場合はロールバック
|
||
try:
|
||
target_cursor.connection.rollback()
|
||
except:
|
||
pass
|
||
return False
|
||
|
||
def backup_existing_data(target_cursor):
|
||
"""既存データのバックアップ状況を確認"""
|
||
print("\n=== 既存データ保護確認 ===")
|
||
|
||
try:
|
||
# 既存データ数を確認
|
||
target_cursor.execute("SELECT COUNT(*) FROM rog_entry")
|
||
entry_count = target_cursor.fetchone()[0]
|
||
|
||
target_cursor.execute("SELECT COUNT(*) FROM rog_team")
|
||
team_count = target_cursor.fetchone()[0]
|
||
|
||
target_cursor.execute("SELECT COUNT(*) FROM rog_member")
|
||
member_count = target_cursor.fetchone()[0]
|
||
|
||
target_cursor.execute("SELECT COUNT(*) FROM gps_checkins")
|
||
checkin_count = target_cursor.fetchone()[0]
|
||
|
||
# Location2025データ数も確認
|
||
try:
|
||
target_cursor.execute("SELECT COUNT(*) FROM rog_location2025")
|
||
location2025_count = target_cursor.fetchone()[0]
|
||
print(f" rog_location2025: {location2025_count} 件 (保護対象)")
|
||
except Exception as e:
|
||
print(f" rog_location2025: 確認エラー ({e})")
|
||
location2025_count = 0
|
||
|
||
print(f"既存データ保護状況:")
|
||
print(f" rog_entry: {entry_count} 件 (保護対象)")
|
||
print(f" rog_team: {team_count} 件 (保護対象)")
|
||
print(f" rog_member: {member_count} 件 (保護対象)")
|
||
print(f" gps_checkins: {checkin_count} 件 (移行対象)")
|
||
|
||
if entry_count > 0 or team_count > 0 or member_count > 0:
|
||
print("✅ 既存のcore application dataが検出されました。これらは保護されます。")
|
||
return True
|
||
else:
|
||
print("⚠️ 既存のcore application dataが見つかりません。")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"❌ 既存データ確認エラー: {e}")
|
||
# トランザクションエラーの場合はロールバック
|
||
try:
|
||
target_cursor.connection.rollback()
|
||
except:
|
||
pass
|
||
return False
|
||
|
||
def migrate_gps_data(source_cursor, target_cursor):
|
||
"""GPS記録データのみを移行(写真記録データは除外)"""
|
||
print("\n=== GPS記録データの移行 ===")
|
||
|
||
try:
|
||
# 移行元テーブルの構造を確認
|
||
source_cursor.execute("""
|
||
SELECT column_name
|
||
FROM information_schema.columns
|
||
WHERE table_name = 'gps_information'
|
||
ORDER BY ordinal_position
|
||
""")
|
||
source_columns = [row[0] for row in source_cursor.fetchall()]
|
||
print(f"📋 移行元カラム: {source_columns}")
|
||
|
||
# 移行先テーブルの構造を確認
|
||
target_cursor.execute("""
|
||
SELECT column_name
|
||
FROM information_schema.columns
|
||
WHERE table_name = 'gps_checkins'
|
||
ORDER BY ordinal_position
|
||
""")
|
||
target_columns = [row[0] for row in target_cursor.fetchall()]
|
||
print(f"📋 移行先カラム: {target_columns}")
|
||
|
||
# 必要なカラムのマッピングを確認
|
||
column_mapping = {
|
||
'serial_number': 'serial_number',
|
||
'team_name': 'team_name' if 'team_name' in source_columns else None,
|
||
'zekken_number': 'zekken_number' if 'zekken_number' in source_columns else None,
|
||
'event_code': 'event_code' if 'event_code' in source_columns else None,
|
||
'cp_number': 'cp_number',
|
||
'record_time': 'create_at' if 'create_at' in source_columns else 'record_time',
|
||
'goal_time': 'goal_time',
|
||
'late_point': 'late_point',
|
||
'buy_flag': 'buy_flag',
|
||
'image_address': 'image_address',
|
||
'minus_photo_flag': 'minus_photo_flag',
|
||
'create_user': 'create_user',
|
||
'update_user': 'update_user',
|
||
'colabo_company_memo': 'colabo_company_memo'
|
||
}
|
||
|
||
# 実際に存在するカラムでクエリを構築
|
||
select_columns = []
|
||
for key, column in column_mapping.items():
|
||
if column and column in source_columns:
|
||
select_columns.append(f"{column} as {key}")
|
||
else:
|
||
select_columns.append(f"NULL as {key}")
|
||
|
||
query = f"""
|
||
SELECT {', '.join(select_columns)}
|
||
FROM gps_information
|
||
WHERE serial_number < 20000 -- GPS専用データのみ
|
||
ORDER BY serial_number
|
||
"""
|
||
|
||
print(f"📋 実行クエリ: {query}")
|
||
source_cursor.execute(query)
|
||
|
||
gps_records = source_cursor.fetchall()
|
||
print(f"移行対象GPS記録数: {len(gps_records)}件")
|
||
|
||
migrated_count = 0
|
||
error_count = 0
|
||
|
||
for record in gps_records:
|
||
try:
|
||
# レコードを解析(NULLの場合はデフォルト値を設定)
|
||
record_data = {}
|
||
for i, key in enumerate(column_mapping.keys()):
|
||
record_data[key] = record[i] if i < len(record) else None
|
||
|
||
serial_number = record_data['serial_number']
|
||
team_name = record_data['team_name'] or f"Team_{record_data['zekken_number'] or serial_number}"
|
||
zekken_number = record_data['zekken_number'] or serial_number
|
||
event_code = record_data['event_code'] or 'unknown'
|
||
cp_number = record_data['cp_number']
|
||
record_time = record_data['record_time']
|
||
goal_time = record_data['goal_time']
|
||
late_point = record_data['late_point']
|
||
buy_flag = record_data['buy_flag']
|
||
image_address = record_data['image_address']
|
||
minus_photo_flag = record_data['minus_photo_flag']
|
||
create_user = record_data['create_user']
|
||
update_user = record_data['update_user']
|
||
colabo_company_memo = record_data['colabo_company_memo']
|
||
|
||
# UTC時刻をJST時刻に変換
|
||
record_time_jst = convert_utc_to_jst(record_time)
|
||
goal_time_utc = None
|
||
|
||
if goal_time:
|
||
# goal_timeをUTCに変換
|
||
if isinstance(goal_time, str):
|
||
# イベント名からイベント日付を取得
|
||
event_name = colabo_company_memo or "不明"
|
||
event_date = get_event_date(event_name)
|
||
if event_date:
|
||
goal_time_utc = parse_goal_time(goal_time, event_date.strftime("%Y-%m-%d"))
|
||
elif isinstance(goal_time, datetime):
|
||
goal_time_utc = convert_utc_to_jst(goal_time)
|
||
|
||
# 移行先テーブルに合わせてINSERT文を動的構築
|
||
insert_columns = ['serial_number', 'cp_number', 'record_time', 'goal_time',
|
||
'late_point', 'buy_flag', 'image_address', 'minus_photo_flag',
|
||
'create_user', 'update_user', 'comment']
|
||
insert_values = [serial_number, cp_number, record_time_jst, goal_time_utc,
|
||
late_point, buy_flag, image_address, minus_photo_flag,
|
||
create_user, update_user, f'migrated_from_gifuroge_team_{team_name}_zekken_{zekken_number}_event_{event_code}']
|
||
|
||
# 移行先テーブルに存在するカラムのみを使用
|
||
final_columns = []
|
||
final_values = []
|
||
for i, col in enumerate(insert_columns):
|
||
if col in target_columns:
|
||
final_columns.append(col)
|
||
final_values.append(insert_values[i])
|
||
|
||
placeholders = ', '.join(['%s'] * len(final_columns))
|
||
columns_str = ', '.join(final_columns)
|
||
|
||
target_cursor.execute(f"""
|
||
INSERT INTO gps_checkins ({columns_str})
|
||
VALUES ({placeholders})
|
||
""", final_values)
|
||
|
||
migrated_count += 1
|
||
|
||
if migrated_count % 1000 == 0:
|
||
print(f" 移行進捗: {migrated_count}/{len(gps_records)} 件")
|
||
target_cursor.connection.commit()
|
||
|
||
except Exception as e:
|
||
error_count += 1
|
||
print(f" レコード移行エラー(serial_number={serial_number}): {e}")
|
||
# トランザクションエラーの場合はロールバックして続行
|
||
try:
|
||
target_cursor.connection.rollback()
|
||
except:
|
||
pass
|
||
if error_count > 100: # エラー上限
|
||
print("❌ エラー数が上限を超えました。移行を中止します。")
|
||
raise
|
||
|
||
target_cursor.connection.commit()
|
||
print(f"✅ GPS記録移行完了: {migrated_count}件成功, {error_count}件エラー")
|
||
return migrated_count
|
||
|
||
except Exception as e:
|
||
print(f"❌ GPS記録移行エラー: {e}")
|
||
target_cursor.connection.rollback()
|
||
raise
|
||
|
||
def main():
|
||
"""メイン移行処理(既存データ保護版)"""
|
||
print("=" * 60)
|
||
print("GPS記録データ移行スクリプト (既存データ保護版)")
|
||
print("=" * 60)
|
||
print("移行対象: gifuroge.gps_information → rogdb.gps_checkins")
|
||
print("既存データ保護: rog_entry, rog_team, rog_member, rog_location2025")
|
||
print("=" * 60)
|
||
|
||
try:
|
||
# 1. データベース接続確認
|
||
if not check_database_connectivity():
|
||
return False
|
||
|
||
# 2. 実際の移行処理
|
||
source_conn = psycopg2.connect(**GIFUROGE_DB)
|
||
target_conn = psycopg2.connect(**ROGDB_DB)
|
||
|
||
source_cursor = source_conn.cursor()
|
||
target_cursor = target_conn.cursor()
|
||
|
||
# 3. 既存データ確認
|
||
has_existing_data = backup_existing_data(target_cursor)
|
||
|
||
# 4. Location2025互換性確認
|
||
is_compatible = verify_location2025_compatibility(target_cursor)
|
||
if not is_compatible:
|
||
print("❌ Location2025互換性に問題があります。")
|
||
return False
|
||
|
||
# 5. 安全確認
|
||
if has_existing_data:
|
||
print("\n⚠️ 既存のアプリケーションデータが検出されました。")
|
||
print("この移行操作は既存データを保護しながらGPS記録のみを移行します。")
|
||
confirm = input("続行しますか? (yes/no): ")
|
||
if confirm.lower() != 'yes':
|
||
print("移行を中止しました。")
|
||
return False
|
||
|
||
# 6. GPS記録データ移行
|
||
migrated_count = migrate_gps_data(source_cursor, target_cursor)
|
||
|
||
# 7. 完了確認
|
||
print("\n" + "=" * 60)
|
||
print("移行完了サマリー")
|
||
print("=" * 60)
|
||
print(f"移行されたGPS記録: {migrated_count}件")
|
||
print("保護された既存データ: rog_entry, rog_team, rog_member, rog_location2025")
|
||
print("✅ データ移行が完了しました!")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"\n❌ 移行処理エラー: {e}")
|
||
return False
|
||
|
||
finally:
|
||
try:
|
||
source_conn.close()
|
||
target_conn.close()
|
||
except:
|
||
pass
|
||
|
||
if __name__ == "__main__":
|
||
success = main()
|
||
sys.exit(0 if success else 1)
|