# services/csv_processor.py from typing import Dict, Any from django.shortcuts import render, redirect from django.contrib import messages from django.contrib.auth.hashers import make_password from django.core.exceptions import ValidationError from django.db import transaction from datetime import timedelta import csv from ..models import CustomUser, Team, Member, NewCategory, Entry, NewEvent2 from ..utils.date_converter import DateConverter from ..utils.name_splitter import NameSplitter class EntryCSVProcessor: def __init__(self): self.date_converter = DateConverter() self.name_splitter = NameSplitter() def process_upload(self, request): """ CSVファイルのアップロードとデータ処理を行う """ if request.method == 'POST': try: if 'csv_file' not in request.FILES: messages.error(request, 'No file was uploaded.') return redirect('..') csv_file = request.FILES['csv_file'] if not csv_file.name.endswith('.csv'): messages.error(request, 'File is not CSV type') return redirect('..') # BOMを考慮してファイルを読み込む file_content = csv_file.read() if file_content.startswith(b'\xef\xbb\xbf'): file_content = file_content[3:] decoded_file = file_content.decode('utf-8').splitlines() reader = csv.DictReader(decoded_file) for row in reader: try: self.process_csv_row(row) except Exception as e: messages.error(request, f'Error in row: {str(e)}') return redirect('..') messages.success(request, 'CSV file processed successfully') return redirect('..') except Exception as e: messages.error(request, f'Error processing CSV: {str(e)}') return redirect('..') return render(request, 'admin/entry/upload_csv.html') def process_csv_row(self, row: Dict[str, Any]) -> None: """ CSVの1行のデータを処理する """ try: with transaction.atomic(): # 1) ユーザーの作成/取得 user = self._get_or_create_user(row) if not user: raise ValidationError("Failed to create/get user") # 2) チームの作成/取得とカテゴリの設定 team = self._get_or_create_team(row, user) if not team: raise ValidationError("Failed to create/get team") # 3) メンバーの作成/更新 self._process_team_members(row, team, user) # 4) エントリーの作成 self._create_entry(row, team, user) except Exception as e: raise ValidationError(f"Error processing row: {str(e)}") def _get_or_create_user(self, row: Dict[str, Any]) -> CustomUser: """ メールアドレスでユーザーを検索し、存在しない場合は新規作成 """ user = CustomUser.objects.filter(email=row['email']).first() if not user: last_name, first_name = self.name_splitter.split_full_name(row['owner_name']) birth_date = self.date_converter.convert_date(row['owner_birthday']) is_female = row['owner_sex'] in ['女性', '女', '女子', 'female'] user = CustomUser.objects.create( email=row['email'], password=make_password(row['password']), firstname=first_name, lastname=last_name, date_of_birth=birth_date, female=is_female, is_active=True ) return user def _get_or_create_team(self, row: Dict[str, Any], user: CustomUser) -> Team: """ チーム名でチームを検索し、存在しない場合は新規作成 既存チームの場合はメンバー構成を確認し、必要に応じて新バージョンを作成 """ team_name = row['team_name'] base_team_name = team_name version = 1 while Team.objects.filter(team_name=team_name).exists(): existing_team = Team.objects.get(team_name=team_name) if self._check_same_members(existing_team, row, user): return existing_team version += 1 team_name = f"{base_team_name}_v{version}" # 新規チームを作成 category = self._get_or_create_category(row) team = Team.objects.create( team_name=team_name, owner=user, category=category ) return team def _get_or_create_category(self, row: Dict[str, Any]) -> NewCategory: """ 時間とデパートメントに基づいてカテゴリを取得または作成 """ category_name = f"{row['department']}_{row['time']}h" category, _ = NewCategory.objects.get_or_create( category_name=category_name, defaults={ 'duration': timedelta(hours=int(row['time'])), 'num_of_member': int(row['members_count']) } ) return category def _check_same_members(self, team: Team, row: Dict[str, Any], owner: CustomUser) -> bool: """ 既存チームと新しいメンバー構成が同じかどうかをチェック """ existing_members = set(member.user.email for member in team.members.all()) new_members = {owner.email} for i in range(2, int(row['members_count']) + 1): if row.get(f'member{i}'): new_members.add(f"dummy_{team.team_name}_{i}@example.com") return existing_members == new_members def _process_team_members(self, row: Dict[str, Any], team: Team, owner: CustomUser) -> None: """ チームメンバーを処理(オーナーとその他のメンバー) """ # オーナーをメンバーとして追加 Member.objects.get_or_create( team=team, user=owner, defaults={'is_temporary': False} ) # 追加メンバーの処理 for i in range(2, int(row['members_count']) + 1): if row.get(f'member{i}'): self._create_team_member(row, team, i) def _create_team_member(self, row: Dict[str, Any], team: Team, member_num: int) -> None: """ チームの追加メンバーを作成 """ last_name, first_name = self.name_splitter.split_full_name(row[f'member{member_num}']) birth_date = self.date_converter.convert_date(row[f'birthday{member_num}']) is_female = row.get(f'sex{member_num}', '') in ['女性', '女', '女子', 'female'] dummy_email = f"dummy_{team.team_name}_{member_num}@example.com" dummy_user, _ = CustomUser.objects.get_or_create( email=dummy_email, defaults={ 'password': make_password('dummy_password'), 'firstname': first_name, 'lastname': last_name, 'date_of_birth': birth_date, 'female': is_female } ) Member.objects.get_or_create( team=team, user=dummy_user, defaults={'is_temporary': True} ) def _create_entry(self, row: Dict[str, Any], team: Team, owner: CustomUser) -> None: """ エントリーを作成 """ try: event = NewEvent2.objects.get(event_name=row['event_code']) Entry.objects.create( team=team, event=event, category=team.category, date=event.start_datetime, owner=owner, is_active=False ) except NewEvent2.DoesNotExist: raise ValidationError(f"Event with code {row['event_code']} does not exist")