Files
rogaining_srv/migrate_rog_entry_enhanced.py

355 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
rog_entry テーブル専用移行スクリプト (予約語・NULL値対応)
old_rogdb の rog_entry から rogdb の rog_entry へデータ移行
"""
import os
import sys
import psycopg2
from datetime import datetime, timezone
import logging
# ログ設定
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# データベース設定
OLD_ROGDB_CONFIG = {
'host': os.getenv('OLD_ROGDB_HOST', 'postgres-db'),
'database': os.getenv('OLD_ROGDB_NAME', 'old_rogdb'),
'user': os.getenv('OLD_ROGDB_USER', 'admin'),
'password': os.getenv('OLD_ROGDB_PASSWORD', 'admin123456'),
'port': int(os.getenv('OLD_ROGDB_PORT', 5432))
}
ROGDB_CONFIG = {
'host': os.getenv('ROGDB_HOST', 'postgres-db'),
'database': os.getenv('ROGDB_NAME', 'rogdb'),
'user': os.getenv('ROGDB_USER', 'admin'),
'password': os.getenv('ROGDB_PASSWORD', 'admin123456'),
'port': int(os.getenv('ROGDB_PORT', 5432))
}
class RogEntryMigrator:
"""rog_entry テーブル専用移行クラス"""
def __init__(self):
self.old_conn = None
self.new_conn = None
self.old_cursor = None
self.new_cursor = None
def connect_databases(self):
"""データベース接続"""
try:
logger.info("データベースに接続中...")
self.old_conn = psycopg2.connect(**OLD_ROGDB_CONFIG)
self.new_conn = psycopg2.connect(**ROGDB_CONFIG)
self.old_conn.autocommit = True
self.new_conn.autocommit = False
self.old_cursor = self.old_conn.cursor()
self.new_cursor = self.new_conn.cursor()
logger.info("✅ データベース接続成功")
return True
except Exception as e:
logger.error(f"❌ データベース接続エラー: {e}")
return False
def close_connections(self):
"""データベース接続クローズ"""
try:
if self.old_cursor:
self.old_cursor.close()
if self.new_cursor:
self.new_cursor.close()
if self.old_conn:
self.old_conn.close()
if self.new_conn:
self.new_conn.close()
logger.info("データベース接続をクローズしました")
except Exception as e:
logger.warning(f"接続クローズ時の警告: {e}")
def quote_column_if_needed(self, column_name):
"""予約語やキャメルケースの場合はダブルクォートで囲む"""
# camelCaseの場合はクォート
if any(c.isupper() for c in column_name):
return f'"{column_name}"'
return column_name
def handle_null_values(self, column_name, value):
"""NULL値の処理とデフォルト値設定"""
if value is not None:
return value
# rog_entryテーブル固有のデフォルト値
defaults = {
'is_active': True,
'is_trial': False,
'hasGoaled': False,
'hasParticipated': False,
'zekken_label': '',
'zekken_number': 0
}
if column_name in defaults:
default_value = defaults[column_name]
logger.debug(f"NULL値をデフォルト値に変換: {column_name} = {default_value}")
return default_value
# デフォルト値が見つからない場合はNULLを返す
logger.warning(f"デフォルト値が設定されていません: {column_name}")
return None
def validate_foreign_keys(self):
"""外部キー参照の整合性をチェック"""
logger.info("外部キー参照の整合性をチェック中...")
# team_id の存在確認
self.old_cursor.execute("SELECT DISTINCT team_id FROM rog_entry WHERE team_id IS NOT NULL")
old_team_ids = [row[0] for row in self.old_cursor.fetchall()]
self.new_cursor.execute("SELECT id FROM rog_team")
new_team_ids = [row[0] for row in self.new_cursor.fetchall()]
missing_teams = set(old_team_ids) - set(new_team_ids)
if missing_teams:
logger.warning(f"⚠️ 新DBに存在しないteam_id: {missing_teams}")
logger.warning("先にrog_teamの移行を完了してください")
return False
# event_id の存在確認
self.old_cursor.execute("SELECT DISTINCT event_id FROM rog_entry WHERE event_id IS NOT NULL")
old_event_ids = [row[0] for row in self.old_cursor.fetchall()]
self.new_cursor.execute("SELECT id FROM rog_newevent2")
new_event_ids = [row[0] for row in self.new_cursor.fetchall()]
missing_events = set(old_event_ids) - set(new_event_ids)
if missing_events:
logger.warning(f"⚠️ 新DBに存在しないevent_id: {missing_events}")
logger.warning("先にrog_newevent2の移行を完了してください")
return False
# category_id の存在確認
self.old_cursor.execute("SELECT DISTINCT category_id FROM rog_entry WHERE category_id IS NOT NULL")
old_category_ids = [row[0] for row in self.old_cursor.fetchall()]
self.new_cursor.execute("SELECT id FROM rog_newcategory")
new_category_ids = [row[0] for row in self.new_cursor.fetchall()]
missing_categories = set(old_category_ids) - set(new_category_ids)
if missing_categories:
logger.warning(f"⚠️ 新DBに存在しないcategory_id: {missing_categories}")
logger.warning("先にrog_newcategoryの移行を完了してください")
return False
logger.info("✅ 外部キー参照の整合性チェック完了")
return True
def migrate_rog_entry(self):
"""rog_entry テーブルのデータ移行"""
logger.info("=" * 60)
logger.info("rog_entry テーブルデータ移行開始")
logger.info("=" * 60)
try:
# 外部キー整合性チェック
if not self.validate_foreign_keys():
logger.error("❌ 外部キー整合性チェックに失敗しました")
return False
# 旧データ取得camelCaseカラム名をクォート
logger.info("旧rog_entryデータを取得中...")
self.old_cursor.execute("""
SELECT id, date, category_id, event_id, owner_id, team_id,
is_active, zekken_number, "hasGoaled", "hasParticipated",
zekken_label, is_trial
FROM rog_entry
ORDER BY id
""")
old_records = self.old_cursor.fetchall()
if not old_records:
logger.info("✅ 移行対象データがありません")
return True
logger.info(f"移行対象レコード数: {len(old_records)}")
# 統計情報
inserted_count = 0
updated_count = 0
error_count = 0
# レコード別処理
for i, old_record in enumerate(old_records):
try:
# レコードデータの展開とNULL値処理
entry_id, date, category_id, event_id, owner_id, team_id, \
is_active, zekken_number, hasGoaled, hasParticipated, \
zekken_label, is_trial = old_record
# NULL値処理
processed_record = {
'id': entry_id,
'date': date,
'category_id': category_id,
'event_id': event_id,
'owner_id': owner_id,
'team_id': team_id,
'is_active': self.handle_null_values('is_active', is_active),
'zekken_number': self.handle_null_values('zekken_number', zekken_number),
'hasGoaled': self.handle_null_values('hasGoaled', hasGoaled),
'hasParticipated': self.handle_null_values('hasParticipated', hasParticipated),
'zekken_label': self.handle_null_values('zekken_label', zekken_label),
'is_trial': self.handle_null_values('is_trial', is_trial)
}
# 既存レコード確認
self.new_cursor.execute(
"SELECT COUNT(*) FROM rog_entry WHERE id = %s",
(entry_id,)
)
exists = self.new_cursor.fetchone()[0] > 0
if exists:
# UPDATE処理camelCaseカラムをクォート
update_query = """
UPDATE rog_entry SET
date = %s,
category_id = %s,
event_id = %s,
owner_id = %s,
team_id = %s,
is_active = %s,
zekken_number = %s,
"hasGoaled" = %s,
"hasParticipated" = %s,
zekken_label = %s,
is_trial = %s
WHERE id = %s
"""
self.new_cursor.execute(update_query, (
processed_record['date'],
processed_record['category_id'],
processed_record['event_id'],
processed_record['owner_id'],
processed_record['team_id'],
processed_record['is_active'],
processed_record['zekken_number'],
processed_record['hasGoaled'],
processed_record['hasParticipated'],
processed_record['zekken_label'],
processed_record['is_trial'],
entry_id
))
updated_count += 1
else:
# INSERT処理camelCaseカラムをクォート
insert_query = """
INSERT INTO rog_entry (
id, date, category_id, event_id, owner_id, team_id,
is_active, zekken_number, "hasGoaled", "hasParticipated",
zekken_label, is_trial
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
)
"""
self.new_cursor.execute(insert_query, (
processed_record['id'],
processed_record['date'],
processed_record['category_id'],
processed_record['event_id'],
processed_record['owner_id'],
processed_record['team_id'],
processed_record['is_active'],
processed_record['zekken_number'],
processed_record['hasGoaled'],
processed_record['hasParticipated'],
processed_record['zekken_label'],
processed_record['is_trial']
))
inserted_count += 1
# 進捗表示とコミット
if (i + 1) % 100 == 0:
self.new_conn.commit()
logger.info(f" 進捗: {i + 1}/{len(old_records)} 件処理完了")
except Exception as e:
error_count += 1
logger.error(f" レコード処理エラー (ID: {entry_id}): {e}")
# トランザクションロールバック
try:
self.new_conn.rollback()
except:
pass
if error_count > 10:
logger.error("❌ エラー数が上限を超えました")
break
# 最終コミット
self.new_conn.commit()
# 結果サマリー
logger.info("=" * 60)
logger.info("rog_entry 移行完了")
logger.info("=" * 60)
logger.info(f"挿入: {inserted_count}")
logger.info(f"更新: {updated_count}")
logger.info(f"エラー: {error_count}")
logger.info(f"総処理: {len(old_records)}")
if error_count == 0:
logger.info("✅ rog_entry移行が正常に完了しました")
return True
else:
logger.warning(f"⚠️ {error_count}件のエラーがありました")
return False
except Exception as e:
logger.error(f"❌ rog_entry移行エラー: {e}")
try:
self.new_conn.rollback()
except:
pass
return False
def run(self):
"""移行実行"""
try:
if not self.connect_databases():
return False
return self.migrate_rog_entry()
finally:
self.close_connections()
def main():
"""メイン処理"""
logger.info("rog_entry テーブル移行スクリプト")
migrator = RogEntryMigrator()
success = migrator.run()
if success:
logger.info("🎉 移行が正常に完了しました!")
else:
logger.error("💥 移行中にエラーが発生しました")
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()