""" このコードは永栄コードをNoufferコードに統合するための一時変換コードです。 一旦、完全にマイグレーションでき、ランキングや走行履歴が完成したら、不要になります。 """ import psycopg2 from PIL import Image import PIL.ExifTags from datetime import datetime import os def get_gps_from_image(image_path): """ 画像ファイルからGPS情報を抽出する Returns: (latitude, longitude) または取得できない場合は (None, None) """ try: with Image.open(image_path) as img: exif = { PIL.ExifTags.TAGS[k]: v for k, v in img._getexif().items() if k in PIL.ExifTags.TAGS } if 'GPSInfo' in exif: gps_info = exif['GPSInfo'] # 緯度の計算 lat = gps_info[2] lat = lat[0] + lat[1]/60 + lat[2]/3600 if gps_info[1] == 'S': lat = -lat # 経度の計算 lon = gps_info[4] lon = lon[0] + lon[1]/60 + lon[2]/3600 if gps_info[3] == 'W': lon = -lon return lat, lon except Exception as e: print(f"GPS情報の抽出に失敗: {e}") return None, None def migrate_data(): # コンテナ環境用の接続情報 source_db = { 'dbname': 'gifuroge', 'user': 'admin', # 環境に合わせて変更 'password': 'admin123456', # 環境に合わせて変更 'host': 'localhost', # Dockerのサービス名 'port': '5432' } target_db = { 'dbname': 'rogdb', 'user': 'admin', # 環境に合わせて変更 'password': 'admin123456', # 環境に合わせて変更 'host': 'localhost', # Dockerのサービス名 'port': '5432' } source_conn = None target_conn = None source_cur = None target_cur = None try: print("ソースDBへの接続を試みています...") source_conn = psycopg2.connect(**source_db) source_cur = source_conn.cursor() print("ソースDBへの接続が成功しました") print("ターゲットDBへの接続を試みています...") target_conn = psycopg2.connect(**target_db) target_cur = target_conn.cursor() print("ターゲットDBへの接続が成功しました") print("データの取得を開始します...") source_cur.execute(""" SELECT serial_number, zekken_number, event_code, cp_number, image_address, goal_time, late_point, create_at, create_user, update_at, update_user, buy_flag, colabo_company_memo FROM gps_information """) rows = source_cur.fetchall() print(f"取得したレコード数: {len(rows)}") processed_count = 0 for row in rows: (serial_number, zekken_number, event_code, cp_number, image_address, goal_time, late_point, create_at, create_user, update_at, update_user, buy_flag, colabo_company_memo) = row latitude, longitude = None, None if image_address and os.path.exists(image_address): latitude, longitude = get_gps_from_image(image_address) target_cur.execute(""" INSERT INTO gps_checkins ( path_order, zekken_number, event_code, cp_number, lattitude, longitude, image_address, image_receipt, image_QR, validate_location, goal_time, late_point, create_at, create_user, update_at, update_user, buy_flag, colabo_company_memo, points ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ) """, ( serial_number, zekken_number, event_code, cp_number, latitude, longitude, image_address, True, True, True, goal_time, late_point, create_at, create_user, update_at, update_user, buy_flag if buy_flag is not None else False, colabo_company_memo if colabo_company_memo else '', 0 )) processed_count += 1 if processed_count % 100 == 0: print(f"処理済みレコード数: {processed_count}") target_conn.commit() print(f"移行完了: {processed_count}件のレコードを処理しました") except Exception as e: print(f"エラーが発生しました: {e}") if target_conn: target_conn.rollback() finally: if source_cur: source_cur.close() if target_cur: target_cur.close() if source_conn: source_conn.close() if target_conn: target_conn.close() print("すべての接続をクローズしました") if __name__ == "__main__": migrate_data()