Files
rogaining_srv/rog/migration_scripts_fixed.py

332 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
修正版データ移行スクリプト
gifurogeデータベースからrogdbデータベースへの正確な移行を行う
UTCからJSTに変換して移行
"""
import psycopg2
from PIL import Image
import PIL.ExifTags
from datetime import datetime, timedelta
import pytz
import os
import re
def get_gps_from_image(image_path):
"""
画像ファイルからGPS情報を抽出する
Returns: (latitude, longitude) または取得できない場合は (None, None)
"""
try:
with Image.open(image_path) as img:
exif = {
PIL.ExifTags.TAGS[k]: v
for k, v in img._getexif().items()
if k in PIL.ExifTags.TAGS
}
if 'GPSInfo' in exif:
gps_info = exif['GPSInfo']
# 緯度の計算
lat = gps_info[2]
lat = lat[0] + lat[1]/60 + lat[2]/3600
if gps_info[1] == 'S':
lat = -lat
# 経度の計算
lon = gps_info[4]
lon = lon[0] + lon[1]/60 + lon[2]/3600
if gps_info[3] == 'W':
lon = -lon
return lat, lon
except Exception as e:
print(f"GPS情報の抽出に失敗: {e}")
return None, None
def convert_utc_to_jst(utc_datetime):
"""
UTCタイムスタンプをJSTに変換する
Args:
utc_datetime: UTC時刻のdatetimeオブジェクト
Returns:
JST時刻のdatetimeオブジェクト
"""
if utc_datetime is None:
return None
# UTCタイムゾーンを設定
if utc_datetime.tzinfo is None:
utc_datetime = pytz.UTC.localize(utc_datetime)
# JSTに変換
jst = pytz.timezone('Asia/Tokyo')
jst_datetime = utc_datetime.astimezone(jst)
# タイムゾーン情報を削除してnaive datetimeとして返す
return jst_datetime.replace(tzinfo=None)
def parse_goal_time(goal_time_str, event_date, create_at=None):
"""
goal_time文字列を正しいdatetimeに変換する
Args:
goal_time_str: "14:58" 形式の時刻文字列
event_date: イベント日付
create_at: goal_timeが空の場合に使用するタイムスタンプ
Returns:
datetime object または None
"""
# goal_timeが空の場合はcreate_atを使用UTCからJSTに変換
if not goal_time_str or goal_time_str.strip() == '':
if create_at:
return convert_utc_to_jst(create_at)
return None
try:
# "HH:MM" または "HH:MM:SS" 形式の時刻をパースJST時刻として扱う
if re.match(r'^\d{1,2}:\d{2}(:\d{2})?$', goal_time_str.strip()):
time_parts = goal_time_str.strip().split(':')
hour = int(time_parts[0])
minute = int(time_parts[1])
second = int(time_parts[2]) if len(time_parts) > 2 else 0
# イベント日付と結合JST時刻として扱うため変換なし
result_datetime = event_date.replace(hour=hour, minute=minute, second=second, microsecond=0)
# 深夜の場合は翌日に調整
if hour < 6: # 午前6時以前は翌日とみなす
result_datetime += timedelta(days=1)
return result_datetime
# すでにdatetime形式の場合
elif 'T' in goal_time_str or ' ' in goal_time_str:
return datetime.fromisoformat(goal_time_str.replace('T', ' ').replace('Z', ''))
except Exception as e:
print(f"時刻パースエラー: {goal_time_str} -> {e}")
return None
def get_event_date(event_code, target_cur):
"""
イベントコードからイベント開催日を取得する
"""
# イベントコード別の実際の開催日を定義
event_dates = {
'FC岐阜': datetime(2024, 10, 25).date(),
'美濃加茂': datetime(2024, 5, 19).date(),
'岐阜市': datetime(2023, 11, 19).date(),
'大垣2': datetime(2023, 5, 14).date(),
'各務原': datetime(2023, 10, 15).date(),
'郡上': datetime(2023, 10, 22).date(),
'中津川': datetime(2024, 4, 14).date(),
'下呂': datetime(2024, 1, 21).date(),
'多治見': datetime(2023, 11, 26).date(),
'大垣': datetime(2023, 4, 16).date(),
'揖斐川': datetime(2023, 12, 3).date(),
'養老ロゲ': datetime(2023, 4, 23).date(),
'高山': datetime(2024, 3, 10).date(),
'大垣3': datetime(2024, 8, 4).date(),
'各務原2': datetime(2024, 11, 10).date(),
'多治見2': datetime(2024, 12, 15).date(),
'下呂2': datetime(2024, 12, 1).date(),
'美濃加茂2': datetime(2024, 11, 3).date(),
'郡上2': datetime(2024, 12, 8).date(),
'関ケ原2': datetime(2024, 9, 29).date(),
'養老2': datetime(2024, 11, 24).date(),
'高山2': datetime(2024, 12, 22).date(),
}
if event_code in event_dates:
return event_dates[event_code]
# デフォルト日付
return datetime(2024, 1, 1).date()
def get_foreign_keys(zekken_number, event_code, cp_number, target_cur):
"""
team_id, event_id, checkpoint_idを取得する
"""
team_id = None
event_id = None
checkpoint_id = None
# team_id を取得
try:
target_cur.execute("""
SELECT t.id, t.event_id
FROM rog_team t
JOIN rog_newevent2 e ON t.event_id = e.id
WHERE t.zekken_number = %s AND e.event_code = %s
""", (zekken_number, event_code))
result = target_cur.fetchone()
if result:
team_id, event_id = result
except Exception as e:
print(f"Team ID取得エラー: {e}")
# checkpoint_id を取得
try:
target_cur.execute("""
SELECT c.id
FROM rog_checkpoint c
JOIN rog_newevent2 e ON c.event_id = e.id
WHERE c.cp_number = %s AND e.event_code = %s
""", (str(cp_number), event_code))
result = target_cur.fetchone()
if result:
checkpoint_id = result[0]
except Exception as e:
print(f"Checkpoint ID取得エラー: {e}")
return team_id, event_id, checkpoint_id
def migrate_gps_data():
"""
GPSチェックインデータの移行
"""
# コンテナ環境用の接続情報
source_db = {
'dbname': 'gifuroge',
'user': 'admin',
'password': 'admin123456',
'host': 'postgres-db', # Dockerサービス名
'port': '5432'
}
target_db = {
'dbname': 'rogdb',
'user': 'admin',
'password': 'admin123456',
'host': 'postgres-db', # Dockerサービス名
'port': '5432'
}
source_conn = None
target_conn = None
source_cur = None
target_cur = None
try:
print("ソースDBへの接続を試みています...")
source_conn = psycopg2.connect(**source_db)
source_cur = source_conn.cursor()
print("ソースDBへの接続が成功しました")
print("ターゲットDBへの接続を試みています...")
target_conn = psycopg2.connect(**target_db)
target_cur = target_conn.cursor()
print("ターゲットDBへの接続が成功しました")
# 既存のrog_gpscheckinデータをクリア
print("既存のGPSチェックインデータをクリアしています...")
target_cur.execute("DELETE FROM rog_gpscheckin")
target_conn.commit()
print("既存データのクリアが完了しました")
print("データの取得を開始します...")
source_cur.execute("""
SELECT serial_number, zekken_number, event_code, cp_number, image_address,
goal_time, late_point, create_at, create_user,
update_at, update_user, buy_flag, colabo_company_memo
FROM gps_information
ORDER BY event_code, zekken_number, serial_number
""")
rows = source_cur.fetchall()
print(f"取得したレコード数: {len(rows)}")
processed_count = 0
error_count = 0
for row in rows:
(serial_number, zekken_number, event_code, cp_number, image_address,
goal_time, late_point, create_at, create_user,
update_at, update_user, buy_flag, colabo_company_memo) = row
try:
# 関連IDを取得
team_id, event_id, checkpoint_id = get_foreign_keys(zekken_number, event_code, cp_number, target_cur)
if not team_id or not event_id:
print(f"スキップ: team_id={team_id}, event_id={event_id} for {zekken_number}/{event_code}")
error_count += 1
continue
# イベント日付を取得
event_date = get_event_date(event_code, target_cur)
# 時刻を正しく変換create_atも渡す
checkin_time = None
record_time = None
if goal_time:
parsed_time = parse_goal_time(goal_time, datetime.combine(event_date, datetime.min.time()), create_at)
if parsed_time:
checkin_time = parsed_time
record_time = parsed_time
# goal_timeがない場合はcreate_atを使用UTCからJSTに変換
if not checkin_time and create_at:
checkin_time = convert_utc_to_jst(create_at)
record_time = convert_utc_to_jst(create_at)
elif not checkin_time:
# 最後の手段としてデフォルト時刻
checkin_time = datetime.combine(event_date, datetime.min.time()) + timedelta(hours=12)
record_time = checkin_time
# GPS座標を取得
latitude, longitude = None, None
if image_address and os.path.exists(image_address):
latitude, longitude = get_gps_from_image(image_address)
# rog_gpscheckinテーブルに挿入
target_cur.execute("""
INSERT INTO rog_gpscheckin (
event_code, zekken, serial_number, cp_number,
lat, lng, checkin_time, record_time,
mobserver_id, event_id, team_id, checkpoint_id
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
)
""", (
event_code, zekken_number, serial_number, str(cp_number),
latitude, longitude, checkin_time, record_time,
serial_number, event_id, team_id, checkpoint_id
))
processed_count += 1
if processed_count % 100 == 0:
print(f"処理済みレコード数: {processed_count}")
target_conn.commit()
except Exception as e:
print(f"レコード処理エラー: {e} - {row}")
error_count += 1
continue
target_conn.commit()
print(f"移行完了: {processed_count}件のレコードを処理しました")
print(f"エラー件数: {error_count}")
except Exception as e:
print(f"エラーが発生しました: {e}")
if target_conn:
target_conn.rollback()
finally:
if source_cur:
source_cur.close()
if target_cur:
target_cur.close()
if source_conn:
source_conn.close()
if target_conn:
target_conn.close()
print("すべての接続をクローズしました")
if __name__ == "__main__":
migrate_gps_data()