223 lines
7.9 KiB
Python
223 lines
7.9 KiB
Python
"""
|
||
S3 Service for managing uploads and standard images
|
||
"""
|
||
import boto3
|
||
import uuid
|
||
import base64
|
||
from datetime import datetime
|
||
from django.conf import settings
|
||
from django.core.files.uploadedfile import InMemoryUploadedFile
|
||
import logging
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class S3Service:
|
||
"""AWS S3 service for handling image uploads and management"""
|
||
|
||
def __init__(self):
|
||
self.s3_client = boto3.client(
|
||
's3',
|
||
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
|
||
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
|
||
region_name=settings.AWS_S3_REGION_NAME
|
||
)
|
||
self.bucket_name = settings.AWS_STORAGE_BUCKET_NAME
|
||
self.custom_domain = settings.AWS_S3_CUSTOM_DOMAIN
|
||
|
||
def upload_checkin_image(self, image_file, event_code, team_code, cp_number, is_goal=False):
|
||
"""
|
||
チェックイン画像またはゴール画像をS3にアップロード
|
||
|
||
Args:
|
||
image_file: アップロードする画像ファイル
|
||
event_code: イベントコード
|
||
team_code: チームコード
|
||
cp_number: チェックポイント番号
|
||
is_goal: ゴール画像かどうか(デフォルト: False)
|
||
"""
|
||
try:
|
||
# ファイル名を生成(UUID + タイムスタンプ)
|
||
file_extension = image_file.name.split('.')[-1] if '.' in image_file.name else 'jpg'
|
||
filename = f"{uuid.uuid4()}-{datetime.now().strftime('%Y-%m-%dT%H-%M-%S')}.{file_extension}"
|
||
|
||
# S3キーを生成(イベント/チーム/ファイル名)
|
||
# ゴール画像の場合は専用フォルダに保存
|
||
if is_goal:
|
||
s3_key = f"{event_code}/goals/{team_code}/{filename}"
|
||
else:
|
||
s3_key = f"{event_code}/{team_code}/{filename}"
|
||
|
||
# メタデータをBase64エンコードして設定(S3メタデータはASCIIのみ対応)
|
||
metadata = {
|
||
'event_b64': base64.b64encode(event_code.encode('utf-8')).decode('ascii'),
|
||
'team_b64': base64.b64encode(team_code.encode('utf-8')).decode('ascii'),
|
||
'cp_number': str(cp_number),
|
||
'uploaded_at': datetime.now().strftime('%Y-%m-%dT%H-%M-%S'),
|
||
'image_type': 'goal' if is_goal else 'checkin'
|
||
}
|
||
|
||
# S3にアップロード
|
||
self.s3_client.upload_fileobj(
|
||
image_file,
|
||
self.bucket_name,
|
||
s3_key,
|
||
ExtraArgs={
|
||
'ContentType': f'image/{file_extension}',
|
||
'Metadata': metadata
|
||
}
|
||
)
|
||
|
||
# S3 URLを生成
|
||
s3_url = f"https://{self.bucket_name}.s3.{settings.AWS_S3_REGION_NAME}.amazonaws.com/{s3_key}"
|
||
|
||
logger.info(f"{'Goal' if is_goal else 'Checkin'} image uploaded to S3: {s3_url}")
|
||
return s3_url
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to upload image to S3: {e}")
|
||
raise
|
||
|
||
def upload_standard_image(self, image_file, event_code, image_type):
|
||
"""
|
||
規定画像をS3にアップロード
|
||
"""
|
||
try:
|
||
# ファイル拡張子を取得
|
||
file_extension = image_file.name.split('.')[-1] if '.' in image_file.name else 'jpg'
|
||
|
||
# S3キーを生成(イベント/standards/タイプ.拡張子)
|
||
s3_key = f"{event_code}/standards/{image_type}.{file_extension}"
|
||
|
||
# メタデータをBase64エンコードして設定
|
||
metadata = {
|
||
'event_b64': base64.b64encode(event_code.encode('utf-8')).decode('ascii'),
|
||
'image_type': image_type,
|
||
'uploaded_at': datetime.now().strftime('%Y-%m-%dT%H-%M-%S')
|
||
}
|
||
|
||
# S3にアップロード
|
||
self.s3_client.upload_fileobj(
|
||
image_file,
|
||
self.bucket_name,
|
||
s3_key,
|
||
ExtraArgs={
|
||
'ContentType': f'image/{file_extension}',
|
||
'Metadata': metadata
|
||
}
|
||
)
|
||
|
||
# S3 URLを生成
|
||
s3_url = f"https://{self.bucket_name}.s3.{settings.AWS_S3_REGION_NAME}.amazonaws.com/{s3_key}"
|
||
|
||
logger.info(f"Standard image uploaded to S3: {s3_url}")
|
||
return s3_url
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to upload standard image to S3: {e}")
|
||
raise
|
||
|
||
def get_standard_image_url(self, event_code, image_type):
|
||
"""
|
||
Get URL for standard image
|
||
|
||
Args:
|
||
event_code: Event code
|
||
image_type: Type of image (goal, start, checkpoint, etc.)
|
||
|
||
Returns:
|
||
str: S3 URL of standard image or None if not found
|
||
"""
|
||
# Try common image extensions
|
||
extensions = ['.jpg', '.jpeg', '.png', '.gif']
|
||
|
||
for ext in extensions:
|
||
s3_key = f"{event_code}/standards/{image_type}{ext}"
|
||
try:
|
||
# Check if object exists
|
||
self.s3_client.head_object(Bucket=self.bucket_name, Key=s3_key)
|
||
return f"https://{self.custom_domain}/{s3_key}"
|
||
except self.s3_client.exceptions.NoSuchKey:
|
||
continue
|
||
except Exception as e:
|
||
logger.error(f"Error checking standard image: {e}")
|
||
continue
|
||
|
||
return None
|
||
|
||
def delete_image(self, s3_url):
|
||
"""
|
||
Delete image from S3
|
||
|
||
Args:
|
||
s3_url: Full S3 URL of the image
|
||
|
||
Returns:
|
||
bool: True if deleted successfully
|
||
"""
|
||
try:
|
||
# Extract S3 key from URL
|
||
s3_key = self._extract_s3_key_from_url(s3_url)
|
||
if not s3_key:
|
||
logger.error(f"Invalid S3 URL: {s3_url}")
|
||
return False
|
||
|
||
# Delete from S3
|
||
self.s3_client.delete_object(Bucket=self.bucket_name, Key=s3_key)
|
||
logger.info(f"Image deleted from S3: {s3_url}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to delete image from S3: {e}")
|
||
return False
|
||
|
||
def list_event_images(self, event_code, limit=100):
|
||
"""
|
||
List all images for an event
|
||
|
||
Args:
|
||
event_code: Event code
|
||
limit: Maximum number of images to return
|
||
|
||
Returns:
|
||
list: List of S3 URLs
|
||
"""
|
||
try:
|
||
response = self.s3_client.list_objects_v2(
|
||
Bucket=self.bucket_name,
|
||
Prefix=f"{event_code}/",
|
||
MaxKeys=limit
|
||
)
|
||
|
||
urls = []
|
||
if 'Contents' in response:
|
||
for obj in response['Contents']:
|
||
url = f"https://{self.custom_domain}/{obj['Key']}"
|
||
urls.append(url)
|
||
|
||
return urls
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to list event images: {e}")
|
||
return []
|
||
|
||
def _get_file_extension(self, filename):
|
||
"""Get file extension from filename"""
|
||
if '.' in filename:
|
||
return '.' + filename.split('.')[-1].lower()
|
||
return '.jpg' # default extension
|
||
|
||
def _get_timestamp(self):
|
||
"""Get current timestamp string"""
|
||
from datetime import datetime
|
||
return datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
|
||
|
||
def _extract_s3_key_from_url(self, s3_url):
|
||
"""Extract S3 key from full S3 URL"""
|
||
try:
|
||
# Remove domain part to get key
|
||
if self.custom_domain in s3_url:
|
||
return s3_url.split(self.custom_domain + '/')[-1]
|
||
return None
|
||
except Exception:
|
||
return None
|