import os from aiohttp import ClientError from django.template.loader import render_to_string from django.conf import settings import logging import boto3 from django.core.mail import send_mail from django.urls import reverse import uuid import environ logger = logging.getLogger(__name__) def load_email_template(template_name, context): template_path = os.path.join('email', template_name) email_content = render_to_string(template_path, context) # 件名と本文を分離 subject, _, body = email_content.partition('\n\n') subject = subject.replace('件名: ', '').strip() # 件名と本文を分離し、件名から改行を削除 subject, _, body = email_content.partition('\n\n') subject = subject.replace('件名: ', '').strip().replace('\n', ' ') return subject, body def share_send_email(subject, body, recipient_email): try: send_mail(subject, body, settings.DEFAULT_FROM_EMAIL, [recipient_email], fail_silently=False) logger.info(f"メールを送信しました。 受信者: {recipient_email}") except Exception as e: logger.error(f"メールの送信に失敗しました。 受信者: {recipient_email}, エラー: {str(e)}") raise # エラーを再度発生させて、呼び出し元で処理できるようにします # 自らユーザー登録した際に、メールの確認メールを送る。 # def send_verification_email(user, activation_link): context = { 'name': user.firstname or user.email, 'activation_link': activation_link, } logger.info(f"send_verification_email : {context}") subject, body = load_email_template('verification_email.txt', context) share_send_email(subject,body,user.email) def send_reset_password_email(email,activation_link): context = { 'name': email, 'activation_link': activation_link, } logger.info(f"send_reset_password_email : {context}") subject, body = load_email_template('reset_password_email.txt', context) share_send_email(subject,body,email) # 既にユーザーになっている人にチームへの参加要請メールを出す。 # def send_team_join_email(request,sender,user,team): activation_link = request.build_absolute_uri( reverse('activate-member', args=[user.id, team.id]) ) logger.info(f"request: {request}") context = { 'name': user.lastname or user.email, 'invitor': sender.lastname, 'activation_link': activation_link, 'team_name': team.team_name, } subject, body = load_email_template('invitation_existing_email.txt', context) share_send_email(subject,body,user.email) # まだユーザーでない人にチームメンバー招待メールを送る # その人がユーザー登録して、ユーザー登録されるとメンバーになる。 # アプリからユーザー登録するため、アプリのダウンロードリンクも送る。 # def send_invitation_email(sender,request,user_email,team): verification_code = uuid.uuid4() # UUIDを生成 activation_link = request.build_absolute_uri( reverse('activate-new-member', args=[verification_code, team.id]) ) context = { 'name': user_email, 'invitor': sender.lastname, 'team_name': team.team_name, 'activation_link': activation_link, 'app_download_link': settings.APP_DOWNLOAD_LINK, 'android_download_link': settings.ANDROID_DOWNLOAD_LINK, } subject, body = load_email_template('invitation_new_email.txt', context) share_send_email(subject,body,user_email) # 招待された後にユーザー登録された場合、ヴェリフィケーションでチーム参加登録される。 # def send_invitaion_and_verification_email(user, team, activation_link): context = { 'name': user.firstname or user.email, 'activation_link': activation_link, 'team_name': team.team_name, } logger.info(f"send_invitation_and_verification_email : {context}") subject, body = load_email_template('invitation_and_verification_email.txt', context) share_send_email(subject,body,user.email) class S3Bucket: def __init__(self, bucket_name=None, aws_access_key_id=None, aws_secret_access_key=None, region_name=None): self.aws_access_key_id = aws_access_key_id self.aws_secret_access_key = aws_secret_access_key self.region_name = region_name self.bucket_name = bucket_name self.s3_client = self.connect(bucket_name,aws_access_key_id, aws_secret_access_key, region_name) def __str__(self): return f"s3://{self.bucket_name}" def __repr__(self): return f"S3File(bucket_name={self.bucket_name})" # AWS S3 への接続 def connect(self,bucket_name=None, aws_access_key_id=None, aws_secret_access_key=None, region_name=None): """ S3クライアントの作成 Args: .env から取得 aws_access_key_id (str, optional): AWSアクセスキーID aws_secret_access_key (str, optional): AWSシークレットアクセスキー region_name (str): AWSリージョン名 Returns: boto3.client: S3クライアント """ try: if aws_access_key_id and aws_secret_access_key: s3_client = boto3.client( 's3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=region_name ) else: env = environ.Env(DEBUG=(bool, False)) environ.Env.read_env(env_file=".env") if bucket_name==None: bucket_name = env("S3_BUCKET_NAME") aws_access_key_id = env("AWS_ACCESS_KEY") aws_secret_access_key = env("AWS_SECRET_ACCESS_KEY") region_name = env("S3_REGION") s3_client = boto3.client( 's3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=region_name ) return s3_client except Exception as e: logger.error(f"S3クライアントの作成に失敗しました: {str(e)}") raise def upload_file(self, file_path, s3_key=None): """ ファイルをS3バケットにアップロード Args: file_path (str): アップロードするローカルファイルのパス bucket_name (str): アップロード先のS3バケット名 s3_key (str, optional): S3内でのファイルパス(指定がない場合はファイル名を使用) s3_client (boto3.client, optional): S3クライアント Returns: bool: アップロードの成功・失敗 """ logger = logging.getLogger(__name__) try: # S3キーが指定されていない場合は、ファイル名を使用 if s3_key is None: s3_key = os.path.basename(file_path) # S3クライアントが指定されていない場合は新規作成 if s3_client is None: s3_client = self.connect() # ファイルのアップロード logger.info(f"アップロード開始: {file_path} → s3://{self.bucket_name}/{s3_key}") s3_client.upload_file(file_path, self.bucket_name, s3_key) logger.info("アップロード完了") return True except FileNotFoundError: logger.error(f"ファイルが見つかりません: {file_path}") return False except ClientError as e: logger.error(f"S3アップロードエラー: {str(e)}") return False except Exception as e: logger.error(f"予期しないエラーが発生しました: {str(e)}") return False def upload_directory(self, directory_path, prefix=''): """ ディレクトリ内のすべてのファイルをS3バケットにアップロード Args: directory_path (str): アップロードするローカルディレクトリのパス bucket_name (str): アップロード先のS3バケット名 prefix (str, optional): S3内でのプレフィックス(フォルダパス) s3_client (boto3.client, optional): S3クライアント Returns: tuple: (成功したファイル数, 失敗したファイル数) """ logger = logging.getLogger(__name__) success_count = 0 failure_count = 0 try: # S3クライアントが指定されていない場合は新規作成 if self.s3_client is None: self.s3_client = self.connect() # ディレクトリ内のすべてのファイルを処理 for root, _, files in os.walk(directory_path): for file in files: local_path = os.path.join(root, file) # S3キーの作成(相対パスを維持) relative_path = os.path.relpath(local_path, directory_path) s3_key = os.path.join(prefix, relative_path).replace('\\', '/') # ファイルのアップロード if self.upload_file(local_path, s3_key): success_count += 1 else: failure_count += 1 logger.info(f"アップロード完了: 成功 {success_count} 件, 失敗 {failure_count} 件") return success_count, failure_count except Exception as e: logger.error(f"ディレクトリのアップロードに失敗しました: {str(e)}") return success_count, failure_count def download_file(self, s3_key, file_path): """ S3バケットからファイルをダウンロード Args: bucket_name (str): ダウンロード元のS3バケット名 s3_key (str): ダウンロードするファイルのS3キー file_path (str): ダウンロード先のローカルファイルパス s3_client (boto3.client, optional): S3クライアント Returns: bool: ダウンロードの成功・失敗 """ logger = logging.getLogger(__name__) try: # S3クライアントが指定されていない場合は新規作成 if self.s3_client is None: self.s3_client = self.connect_to_s3() # ファイルのダウンロード logger.info(f"ダウンロード開始: s3://{self.bucket_name}/{s3_key} → {file_path}") self.s3_client.download_file(self.bucket_name, s3_key, file_path) logger.info("ダウンロード完了") return True except FileNotFoundError: logger.error(f"ファイルが見つかりません: s3://{self.bucket_name}/{s3_key}") return False except ClientError as e: logger.error(f"S3ダウンロードエラー: {str(e)}") return False except Exception as e: logger.error(f"予期しないエラーが発生しました: {str(e)}") return False def download_directory(self, prefix, directory_path): """ S3バケットからディレクトリをダウンロード Args: bucket_name (str): ダウンロード元のS3バケット名 prefix (str): ダウンロードするディレクトリのプレフィックス(フォルダパス) directory_path (str): ダウンロード先のローカルディレクトリパス s3_client (boto3.client, optional): S3クライアント Returns: tuple: (成功したファイル数, 失敗したファイル数) """ logger = logging.getLogger(__name__) success_count = 0 failure_count = 0 try: # S3クライアントが指定されていない場合は新規作成 if s3_client is None: s3_client = self.connect() # プレフィックスに一致するオブジェクトをリスト paginator = s3_client.get_paginator('list_objects_v2') pages = paginator.paginate(Bucket=self.bucket_name, Prefix=prefix) for page in pages: if 'Contents' in page: for obj in page['Contents']: s3_key = obj['Key'] relative_path = os.path.relpath(s3_key, prefix) local_path = os.path.join(directory_path, relative_path) # ローカルディレクトリが存在しない場合は作成 local_dir = os.path.dirname(local_path) if not os.path.exists(local_dir): os.makedirs(local_dir) # ファイルのダウンロード if self.download_file(self.bucket_name, s3_key, local_path): success_count += 1 else: failure_count += 1 logger.info(f"ダウンロード完了: 成功 {success_count} 件, 失敗 {failure_count} 件") return success_count, failure_count except Exception as e: logger.error(f"ディレクトリのダウンロードに失敗しました: {str(e)}") return success_count, failure_count def delete_object(self, s3_key): """ S3バケットからオブジェクトを削除 Args: bucket_name (str): 削除するオブジェクトが存在するS3バケット名 s3_key (str): 削除するオブジェクトのS3キー s3_client (boto3.client, optional): S3クライアント Returns: bool: 削除の成功・失敗 """ logger = logging.getLogger(__name__) try: # S3クライアントが指定されていない場合は新規作成 if s3_client is None: s3_client = self.connect() # オブジェクトの削除 logger.info(f"削除開始: s3://{self.bucket_name}/{s3_key}") s3_client.delete_object(Bucket=self.bucket_name, Key=s3_key) logger.info("削除完了") return True except ClientError as e: logger.error(f"S3削除エラー: {str(e)}") return False except Exception as e: logger.error(f"予期しないエラーが発生しました: {str(e)}") return False