# services/csv_processor.py from typing import Dict, Any from django.shortcuts import render, redirect from django.contrib import messages from django.contrib.auth.hashers import make_password from django.core.exceptions import ValidationError from django.db import transaction from datetime import datetime, timedelta, timezone import csv from ..models import CustomUser, Team, Member, NewCategory, Entry, NewEvent2 from ..utils_code.date_converter import DateConverter from ..utils_code.name_splitter import NameSplitter class EntryCSVProcessor: def __init__(self): self.date_converter = DateConverter() self.name_splitter = NameSplitter() def process_upload(self, request): """ CSVファイルのアップロードとデータ処理を行う """ if request.method == 'POST': try: if 'csv_file' not in request.FILES: messages.error(request, 'No file was uploaded.') return redirect('..') csv_file = request.FILES['csv_file'] if not csv_file.name.endswith('.csv'): messages.error(request, 'File is not CSV type') return redirect('..') # BOMを考慮してファイルを読み込む file_content = csv_file.read() if file_content.startswith(b'\xef\xbb\xbf'): file_content = file_content[3:] decoded_file = file_content.decode('utf-8').splitlines() reader = csv.DictReader(decoded_file) for row in reader: try: self.process_csv_row(row) except Exception as e: print(f'Error processing upload: {str(e)}') messages.error(request, f'Error in row: {str(e)}') return redirect('..') messages.success(request, 'CSV file processed successfully') return redirect('..') except Exception as e: messages.error(request, f'Error processing CSV: {str(e)}') return redirect('..') return render(request, 'admin/entry/upload_csv.html') def process_csv_row(self, row: Dict[str, Any]) -> None: """ CSVの1行のデータを処理する """ try: with transaction.atomic(): # 1) ユーザーの作成/取得 user = self._get_or_create_user(row) if not user: raise ValidationError("Failed to create/get user") # 2) チームの作成/取得とカテゴリの設定 team = self._get_or_create_team(row, user) if not team: raise ValidationError("Failed to create/get team") # 3) メンバーの作成/更新 self._process_team_members(row, team, user) # 4) エントリーの作成 self._create_entry(row, team, user) except Exception as e: print(f'Error on process_csv_row : {str(e)}') raise ValidationError(f"Error processing row: {str(e)}") def _get_or_create_user(self, row: Dict[str, Any]) -> CustomUser: """ メールアドレスでユーザーを検索し、存在しない場合は新規作成 """ print(f'_get_or_create_user.row={row}') user = CustomUser.objects.filter(email=row['email']).first() if not user: print('New User...') last_name, first_name = self.name_splitter.split_full_name(row['owner_name']) birth_date = self.date_converter.convert_date(row['owner_birthday']) is_female = row['owner_sex'] in ['女性', '女', '女子', 'female'] user = CustomUser.objects.create( email=row['email'], password=make_password(row['password']), firstname=first_name, lastname=last_name, date_of_birth=birth_date, female=is_female, is_active=True ) return user def _get_or_create_team(self, row: Dict[str, Any], user: CustomUser) -> Team: """ チーム名でチームを検索し、存在しない場合は新規作成 既存チームの場合はメンバー構成を確認し、必要に応じて新バージョンを作成 """ team_name = row['team_name'] base_team_name = team_name version = 1 print(f'チーム作成:{team_name}') while Team.objects.filter(team_name=team_name).exists(): existing_team = Team.objects.get(team_name=team_name) print(f'既存チーム:{existing_team.team_name}') if self._check_same_members(existing_team, row, user): print('既存チームとメンバー構成が同じ') return existing_team version += 1 team_name = f"{base_team_name}_v{version}" print(f'新規チーム名:{team_name}') # 新規チームを作成 try: category = self._get_or_create_category(row) print(f'カテゴリからゼッケン取得:{row["zekken_number"]}') team = Team.objects.create( team_name=team_name, owner=user, category=category ) return team except NewCategory.DoesNotExist: print(f'Error on _get_or_create_team') raise ValidationError(f"Category with name {category} for team {team} does not exist") def _get_or_create_category(self, row: Dict[str, Any]) -> NewCategory: """ 時間とデパートメントに基づいてカテゴリを取得 """ time = row['time'] if time: timeclass = ''.join(chr(ord(c) + 0xFEE0) if '0' <= c <= '9' else c for c in time) timeclass = timeclass+'時間' else: timeclass = '' sexclass = '' if row['sex'] in ['男', '男子']: sexclass = '男子' elif row['sex'] in ['女', '女子']: sexclass = '女子' category_name = f"{row['department']}{sexclass}-{timeclass}" print(f'設定カテゴリ名:{category_name}') try: category = NewCategory.objects.get(category_name=category_name) row['zekken_number'] = category.category_number category.category_number = category.category_number + 1 category.save() except NewCategory.DoesNotExist: print(f'Error on _get_or_create_category : {category_name}') raise ValidationError(f"Category with name {category_name} does not exist") return category def _check_same_members(self, team: Team, row: Dict[str, Any], owner: CustomUser) -> bool: """ 既存チームと新しいメンバー構成が同じかどうかをチェック """ existing_members = set(member.user.email for member in team.members.all()) new_members = {owner.email} for i in range(2, int(row['members_count']) + 1): if row.get(f'member{i}'): new_members.add(f"dummy_{team.team_name}_{i}@example.com") return existing_members == new_members def _process_team_members(self, row: Dict[str, Any], team: Team, owner: CustomUser) -> None: """ チームメンバーを処理(オーナーとその他のメンバー) """ # オーナーをメンバーとして追加 Member.objects.get_or_create( team=team, user=owner, defaults={'is_temporary': False} ) # 追加メンバーの処理 for i in range(2, int(row['members_count']) + 1): if row.get(f'member{i}'): self._create_team_member(row, team, i) def _create_team_member(self, row: Dict[str, Any], team: Team, member_num: int) -> None: """ チームの追加メンバーを作成 """ last_name, first_name = self.name_splitter.split_full_name(row[f'member{member_num}']) birth_date = self.date_converter.convert_date(row[f'birthday{member_num}']) is_female = row.get(f'sex{member_num}', '') in ['女性', '女', '女子', 'female'] print(f'メンバー作成:{first_name} {last_name}') dummy_email = f"dummy_{team.team_name}_{member_num}@example.com" dummy_user, _ = CustomUser.objects.get_or_create( email=dummy_email, defaults={ 'password': make_password('dummy_password'), 'firstname': first_name, 'lastname': last_name, 'date_of_birth': birth_date, 'female': is_female } ) Member.objects.get_or_create( team=team, user=dummy_user, defaults={'is_temporary': True} ) print(f'メンバー作成完了:{dummy_user.email}') def _create_entry(self, row: Dict[str, Any], team: Team, owner: CustomUser) -> None: """ エントリーを作成 """ print(f'エントリー作成:{team.team_name}') try: zekken_no = row['zekken_number'] event = NewEvent2.objects.get(event_name=row['event_code']) print(f'イベント取得:{event.event_name}') # start_datetimeの検証を追加 if not event.start_datetime: raise ValidationError(f"Event {event.event_name} has no start date") if not event.start_datetime or not event.end_datetime: print(f"Event dates: start={event.start_datetime}, end={event.end_datetime}") event.start_datetime = timezone.make_aware(datetime(2025, 1, 25, 9, 0)) event.end_datetime = timezone.make_aware(datetime(2025, 1, 25, 17, 0)) event.save() entry_date = self.date_converter.convert_date(row['entry_date']) entry_datetime = timezone.make_aware(datetime.combine(entry_date, datetime.min.time())) # 日付が必要な場合のフォールバック entry_date = self.date_converter.convert_date(row['entry_date']) if entry_date: entry_date = timezone.make_aware( datetime.combine(entry_date, datetime.min.time()) ) else: entry_date = event.start_datetime print(f'エントリー日付:{entry_date}') entry = Entry.objects.create( team=team, event=event, category=team.category, date=entry_datetime, owner=owner, zekken_number=zekken_no, is_active=False ) entry.clean() # バリデーション実行 entry.save() print(f'エントリー作成完了:{team.team_name} - {event.event_name}') except NewEvent2.DoesNotExist: print(f'Error on _create_entry: {row}') raise ValidationError(f"Event with code {row['event_code']} does not exist")