import psycopg2 from datetime import datetime import sys from typing import Dict, List, Tuple def get_db_connection(dbname: str) -> psycopg2.extensions.connection: """データベース接続を確立する""" try: return psycopg2.connect( dbname=dbname, user='your_username', # 実際のユーザー名に変更してください password='your_password', # 実際のパスワードに変更してください host='localhost' # 実際のホスト名に変更してください ) except psycopg2.Error as e: print(f"データベース {dbname} への接続エラー: {e}") sys.exit(1) def get_source_data() -> List[Dict]: """rogdbからデータを取得する""" conn = get_db_connection('rogdb') try: with conn.cursor() as cur: cur.execute(""" SELECT DISTINCT ON (rci.user_id, rci.cp_number) rci.team_name, rci.event_code, rci.cp_number, rci.checkinimage, rci.checkintime, rci.user_id, COALESCE(p.point, 0) as late_point FROM rog_checkinimages rci LEFT JOIN point p ON p.user_id = rci.user_id AND p.event_code = rci.event_code AND p.cp_number = rci.cp_number WHERE rci.event_code = 'FC岐阜' ORDER BY rci.user_id, rci.cp_number, rci.checkintime DESC """) columns = [desc[0] for desc in cur.description] return [dict(zip(columns, row)) for row in cur.fetchall()] finally: conn.close() def get_next_serial_number(cur) -> int: """次のserial_numberを取得する""" cur.execute("SELECT nextval('gps_information_serial_number_seq')") return cur.fetchone()[0] def insert_into_target(data: List[Dict]) -> Tuple[int, List[str]]: """gifurogeデータベースにデータを挿入する""" conn = get_db_connection('gifuroge') inserted_count = 0 errors = [] try: with conn.cursor() as cur: for record in data: try: serial_number = get_next_serial_number(cur) cur.execute(""" INSERT INTO gps_information ( serial_number, zekken_number, event_code, cp_number, image_address, goal_time, late_point, create_at, create_user, update_at, update_user, buy_flag, minus_photo_flag, colabo_company_memo ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, ( serial_number, record['team_name'], record['event_code'], record['cp_number'], record['checkinimage'], record['checkintime'].strftime('%Y-%m-%d %H:%M:%S'), record['late_point'], record['checkintime'], 'system', record['checkintime'], 'system', False, False, '' )) inserted_count += 1 except psycopg2.Error as e: errors.append(f"Error inserting record for team {record['team_name']}: {str(e)}") if inserted_count % 100 == 0: print(f"Processed {inserted_count} records...") conn.commit() except psycopg2.Error as e: conn.rollback() errors.append(f"Transaction error: {str(e)}") finally: conn.close() return inserted_count, errors def main(): print("データ移行を開始します...") # ソースデータの取得 print("ソースデータを取得中...") source_data = get_source_data() print(f"取得したレコード数: {len(source_data)}") # データの挿入 print("データを移行中...") inserted_count, errors = insert_into_target(source_data) # 結果の表示 print("\n=== 移行結果 ===") print(f"処理したレコード数: {inserted_count}") print(f"エラー数: {len(errors)}") if errors: print("\nエラーログ:") for error in errors: print(f"- {error}") if __name__ == "__main__": main()