Files
rogaining_srv/update_image_paths_to_s3.py

384 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
"""
データベースのパス情報をS3形式に更新するスクリプト
物理ファイルが存在しない場合、データベースのパス情報のみをS3形式に変換
既存のローカルパス形式から、仮定のS3 URLパターンに変換します。
使用方法:
python update_image_paths_to_s3.py
機能:
- GoalImagesのパスをS3形式に変換
- CheckinImagesのパスをS3形式に変換
- 既存のパス構造を保持しながらS3 URL形式に変換
- バックアップとロールバック機能付き
"""
import os
import sys
import django
from pathlib import Path
import json
from datetime import datetime
# Django settings setup
BASE_DIR = Path(__file__).resolve().parent
sys.path.append(str(BASE_DIR))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
django.setup()
from django.conf import settings
from rog.models import GoalImages, CheckinImages
from django.db import transaction
import logging
import re
# ロギング設定
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f'path_update_log_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class PathUpdateService:
"""パス更新サービス"""
def __init__(self):
self.s3_bucket = settings.AWS_STORAGE_BUCKET_NAME
self.s3_region = settings.AWS_S3_REGION_NAME
self.s3_base_url = f"https://{self.s3_bucket}.s3.{self.s3_region}.amazonaws.com"
self.update_stats = {
'total_goal_images': 0,
'updated_goal_images': 0,
'total_checkin_images': 0,
'updated_checkin_images': 0,
'skipped_already_s3': 0,
'failed_updates': [],
'backup_file': f'path_update_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json'
}
def create_backup(self):
"""現在のパス情報をバックアップ"""
logger.info("データベースパス情報をバックアップ中...")
backup_data = {
'backup_timestamp': datetime.now().isoformat(),
'goal_images': [],
'checkin_images': []
}
# GoalImages のバックアップ
for goal_img in GoalImages.objects.filter(goalimage__isnull=False).exclude(goalimage=''):
backup_data['goal_images'].append({
'id': goal_img.id,
'original_path': str(goal_img.goalimage),
'event_code': goal_img.event_code,
'team_name': goal_img.team_name,
'cp_number': goal_img.cp_number
})
# CheckinImages のバックアップ
for checkin_img in CheckinImages.objects.filter(checkinimage__isnull=False).exclude(checkinimage=''):
backup_data['checkin_images'].append({
'id': checkin_img.id,
'original_path': str(checkin_img.checkinimage),
'event_code': checkin_img.event_code,
'team_name': checkin_img.team_name,
'cp_number': checkin_img.cp_number
})
with open(self.update_stats['backup_file'], 'w', encoding='utf-8') as f:
json.dump(backup_data, f, ensure_ascii=False, indent=2)
logger.info(f"バックアップ完了: {self.update_stats['backup_file']}")
logger.info(f" GoalImages: {len(backup_data['goal_images'])}")
logger.info(f" CheckinImages: {len(backup_data['checkin_images'])}")
return backup_data
def convert_local_path_to_s3_url(self, local_path, event_code, team_name, image_type='checkin'):
"""
ローカルパスをS3 URLに変換100文字制限対応
例:
goals/230205/2269a407-3745-44fc-977d-f0f22bda112f.jpg
-> s3://{bucket}/各務原/goals/{team_name}/{filename}
checkin/230205/09d76ced-aa87-41ee-9467-5fd30eb836d0.jpg
-> s3://{bucket}/各務原/{team_name}/{filename}
"""
try:
# ファイル名を抽出
filename = os.path.basename(local_path)
if image_type == 'goal' or local_path.startswith('goals/'):
# ゴール画像: s3://{bucket}/{event_code}/goals/{team_name}/{filename}
s3_path = f"s3://{self.s3_bucket}/{event_code}/goals/{team_name}/{filename}"
else:
# チェックイン画像: s3://{bucket}/{event_code}/{team_name}/{filename}
s3_path = f"s3://{self.s3_bucket}/{event_code}/{team_name}/{filename}"
# 100文字制限チェック
if len(s3_path) > 100:
# 短縮版: ファイル名のみを使用
if image_type == 'goal' or local_path.startswith('goals/'):
s3_path = f"s3://{self.s3_bucket}/goals/{filename}"
else:
s3_path = f"s3://{self.s3_bucket}/checkin/{filename}"
# それでも長い場合はファイル名だけ
if len(s3_path) > 100:
s3_path = f"s3://{self.s3_bucket}/{filename}"
logger.warning(f"長いパスを短縮: {local_path} -> {s3_path}")
return s3_path
except Exception as e:
logger.error(f"パス変換エラー: {local_path} -> {str(e)}")
return None
def is_already_s3_url(self, path):
"""既にS3 URLかどうかを判定"""
return (
path and (
path.startswith('https://') or
path.startswith('http://') or
path.startswith('s3://') or
's3' in path.lower() or
'amazonaws' in path.lower()
)
)
def update_goal_images(self):
"""GoalImagesのパスを更新"""
logger.info("=== GoalImagesのパス更新開始 ===")
goal_images = GoalImages.objects.filter(goalimage__isnull=False).exclude(goalimage='')
self.update_stats['total_goal_images'] = goal_images.count()
logger.info(f"更新対象GoalImages: {self.update_stats['total_goal_images']}")
updated_count = 0
skipped_count = 0
# トランザクションを個別に処理
for goal_img in goal_images:
try:
with transaction.atomic():
original_path = str(goal_img.goalimage)
# 既にS3 URLの場合はスキップ
if self.is_already_s3_url(original_path):
skipped_count += 1
continue
# S3 URLに変換
s3_url = self.convert_local_path_to_s3_url(
original_path,
goal_img.event_code,
goal_img.team_name,
'goal'
)
if s3_url and len(s3_url) <= 100: # 100文字制限チェック
goal_img.goalimage = s3_url
goal_img.save()
updated_count += 1
if updated_count <= 5: # 最初の5件のみログ出力
logger.info(f"✅ GoalImage ID={goal_img.id}: {original_path} -> {s3_url}")
else:
self.update_stats['failed_updates'].append({
'type': 'goal',
'id': goal_img.id,
'original_path': original_path,
'reason': f'Path too long or conversion failed: {s3_url}'
})
except Exception as e:
logger.error(f"❌ GoalImage ID={goal_img.id} 更新エラー: {str(e)}")
self.update_stats['failed_updates'].append({
'type': 'goal',
'id': goal_img.id,
'original_path': str(goal_img.goalimage),
'reason': str(e)
})
self.update_stats['updated_goal_images'] = updated_count
self.update_stats['skipped_already_s3'] += skipped_count
logger.info(f"GoalImages更新完了: {updated_count}件更新、{skipped_count}件スキップ")
def update_checkin_images(self):
"""CheckinImagesのパスを更新"""
logger.info("=== CheckinImagesのパス更新開始 ===")
checkin_images = CheckinImages.objects.filter(checkinimage__isnull=False).exclude(checkinimage='')
self.update_stats['total_checkin_images'] = checkin_images.count()
logger.info(f"更新対象CheckinImages: {self.update_stats['total_checkin_images']}")
updated_count = 0
skipped_count = 0
# トランザクションを個別に処理
for checkin_img in checkin_images:
try:
with transaction.atomic():
original_path = str(checkin_img.checkinimage)
# 既にS3 URLの場合はスキップ
if self.is_already_s3_url(original_path):
skipped_count += 1
continue
# S3 URLに変換
s3_url = self.convert_local_path_to_s3_url(
original_path,
checkin_img.event_code,
checkin_img.team_name,
'checkin'
)
if s3_url and len(s3_url) <= 100: # 100文字制限チェック
checkin_img.checkinimage = s3_url
checkin_img.save()
updated_count += 1
if updated_count <= 5: # 最初の5件のみログ出力
logger.info(f"✅ CheckinImage ID={checkin_img.id}: {original_path} -> {s3_url}")
else:
self.update_stats['failed_updates'].append({
'type': 'checkin',
'id': checkin_img.id,
'original_path': original_path,
'reason': f'Path too long or conversion failed: {s3_url}'
})
except Exception as e:
logger.error(f"❌ CheckinImage ID={checkin_img.id} 更新エラー: {str(e)}")
self.update_stats['failed_updates'].append({
'type': 'checkin',
'id': checkin_img.id,
'original_path': str(checkin_img.checkinimage),
'reason': str(e)
})
self.update_stats['updated_checkin_images'] = updated_count
self.update_stats['skipped_already_s3'] += skipped_count
logger.info(f"CheckinImages更新完了: {updated_count}件更新、{skipped_count}件スキップ")
def generate_update_report(self):
"""更新レポートを生成"""
logger.info("=== 更新レポート生成 ===")
total_updated = self.update_stats['updated_goal_images'] + self.update_stats['updated_checkin_images']
total_processed = self.update_stats['total_goal_images'] + self.update_stats['total_checkin_images']
report = {
'update_timestamp': datetime.now().isoformat(),
'summary': {
'total_processed': total_processed,
'total_updated': total_updated,
'goal_images_updated': self.update_stats['updated_goal_images'],
'checkin_images_updated': self.update_stats['updated_checkin_images'],
'skipped_already_s3': self.update_stats['skipped_already_s3'],
'failed_updates': len(self.update_stats['failed_updates']),
'success_rate': (total_updated / max(total_processed, 1) * 100)
},
'failed_updates': self.update_stats['failed_updates'],
'backup_file': self.update_stats['backup_file']
}
# レポートファイルの保存
report_file = f'path_update_report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json'
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
# コンソール出力
print("\n" + "="*60)
print("🎯 データベースパス更新レポート")
print("="*60)
print(f"📊 処理総数: {total_processed:,}")
print(f"✅ 更新成功: {total_updated:,}件 ({report['summary']['success_rate']:.1f}%)")
print(f" - ゴール画像: {self.update_stats['updated_goal_images']:,}")
print(f" - チェックイン画像: {self.update_stats['updated_checkin_images']:,}")
print(f"⏩ スキップ既にS3: {self.update_stats['skipped_already_s3']:,}")
print(f"❌ 失敗: {len(self.update_stats['failed_updates'])}")
print(f"📄 詳細レポート: {report_file}")
print(f"💾 バックアップファイル: {self.update_stats['backup_file']}")
if len(self.update_stats['failed_updates']) > 0:
print("\n⚠️ 失敗した更新:")
for failure in self.update_stats['failed_updates'][:5]:
print(f" - {failure['type']} ID={failure['id']}: {failure['reason']}")
if len(self.update_stats['failed_updates']) > 5:
print(f" ... 他 {len(self.update_stats['failed_updates']) - 5}")
return report
def run_update(self):
"""メイン更新処理"""
logger.info("🚀 データベースパス更新開始")
print("🚀 データベースのパス情報をS3形式に更新します...")
try:
# 1. バックアップ
self.create_backup()
# 2. GoalImages更新
self.update_goal_images()
# 3. CheckinImages更新
self.update_checkin_images()
# 4. レポート生成
report = self.generate_update_report()
logger.info("✅ パス更新完了")
print("\n✅ パス更新が完了しました!")
return report
except Exception as e:
logger.error(f"💥 パス更新中に重大なエラーが発生: {str(e)}")
print(f"\n💥 パス更新エラー: {str(e)}")
print(f"バックアップファイル: {self.update_stats['backup_file']}")
raise
def main():
"""メイン関数"""
print("="*60)
print("🔄 データベースパスS3更新ツール")
print("="*60)
print("このツールは以下を実行します:")
print("1. データベースの現在のパス情報をバックアップ")
print("2. GoalImagesのパスをS3 URL形式に変換")
print("3. CheckinImagesのパスをS3 URL形式に変換")
print("4. 更新レポートの生成")
print()
print("⚠️ 注意: 物理ファイルの移行は行いません")
print(" データベースのパス情報のみを更新します")
print()
# 確認プロンプト
confirm = input("パス更新を開始しますか? [y/N]: ").strip().lower()
if confirm not in ['y', 'yes']:
print("パス更新をキャンセルしました。")
return
# 更新実行
path_update_service = PathUpdateService()
path_update_service.run_update()
if __name__ == "__main__":
main()