Files
rogaining_srv/rog/services/csv_processor.py
2025-01-23 15:27:20 +09:00

269 lines
10 KiB
Python

# services/csv_processor.py
from typing import Dict, Any
from django.shortcuts import render, redirect
from django.contrib import messages
from django.contrib.auth.hashers import make_password
from django.core.exceptions import ValidationError
from django.db import transaction
from datetime import timedelta
import csv
from ..models import CustomUser, Team, Member, NewCategory, Entry, NewEvent2
from ..utils_code.date_converter import DateConverter
from ..utils_code.name_splitter import NameSplitter
class EntryCSVProcessor:
def __init__(self):
self.date_converter = DateConverter()
self.name_splitter = NameSplitter()
def process_upload(self, request):
"""
CSVファイルのアップロードとデータ処理を行う
"""
if request.method == 'POST':
try:
if 'csv_file' not in request.FILES:
messages.error(request, 'No file was uploaded.')
return redirect('..')
csv_file = request.FILES['csv_file']
if not csv_file.name.endswith('.csv'):
messages.error(request, 'File is not CSV type')
return redirect('..')
# BOMを考慮してファイルを読み込む
file_content = csv_file.read()
if file_content.startswith(b'\xef\xbb\xbf'):
file_content = file_content[3:]
decoded_file = file_content.decode('utf-8').splitlines()
reader = csv.DictReader(decoded_file)
for row in reader:
try:
self.process_csv_row(row)
except Exception as e:
print(f'Error processing upload: {str(e)}')
messages.error(request, f'Error in row: {str(e)}')
return redirect('..')
messages.success(request, 'CSV file processed successfully')
return redirect('..')
except Exception as e:
messages.error(request, f'Error processing CSV: {str(e)}')
return redirect('..')
return render(request, 'admin/entry/upload_csv.html')
def process_csv_row(self, row: Dict[str, Any]) -> None:
"""
CSVの1行のデータを処理する
"""
try:
with transaction.atomic():
# 1) ユーザーの作成/取得
user = self._get_or_create_user(row)
if not user:
raise ValidationError("Failed to create/get user")
# 2) チームの作成/取得とカテゴリの設定
team = self._get_or_create_team(row, user)
if not team:
raise ValidationError("Failed to create/get team")
# 3) メンバーの作成/更新
self._process_team_members(row, team, user)
# 4) エントリーの作成
self._create_entry(row, team, user)
except Exception as e:
print(f'Error on process_csv_row : {str(e)}')
raise ValidationError(f"Error processing row: {str(e)}")
def _get_or_create_user(self, row: Dict[str, Any]) -> CustomUser:
"""
メールアドレスでユーザーを検索し、存在しない場合は新規作成
"""
print(f'_get_or_create_user.row={row}')
user = CustomUser.objects.filter(email=row['email']).first()
if not user:
print('New User...')
last_name, first_name = self.name_splitter.split_full_name(row['owner_name'])
birth_date = self.date_converter.convert_date(row['owner_birthday'])
is_female = row['owner_sex'] in ['女性', '', '女子', 'female']
user = CustomUser.objects.create(
email=row['email'],
password=make_password(row['password']),
firstname=first_name,
lastname=last_name,
date_of_birth=birth_date,
female=is_female,
is_active=True
)
return user
def _get_or_create_team(self, row: Dict[str, Any], user: CustomUser) -> Team:
"""
チーム名でチームを検索し、存在しない場合は新規作成
既存チームの場合はメンバー構成を確認し、必要に応じて新バージョンを作成
"""
team_name = row['team_name']
base_team_name = team_name
version = 1
print(f'チーム作成:{team_name}')
while Team.objects.filter(team_name=team_name).exists():
existing_team = Team.objects.get(team_name=team_name)
print(f'既存チーム:{existing_team.team_name}')
if self._check_same_members(existing_team, row, user):
print('既存チームとメンバー構成が同じ')
return existing_team
version += 1
team_name = f"{base_team_name}_v{version}"
print(f'新規チーム名:{team_name}')
# 新規チームを作成
try:
category = self._get_or_create_category(row)
print(f'カテゴリからゼッケン取得:{row["zekken_number"]}')
team = Team.objects.create(
team_name=team_name,
owner=user,
category=category
)
return team
except NewCategory.DoesNotExist:
print(f'Error on _get_or_create_team')
raise ValidationError(f"Category with name {category} for team {team} does not exist")
def _get_or_create_category(self, row: Dict[str, Any]) -> NewCategory:
"""
時間とデパートメントに基づいてカテゴリを取得
"""
time = row['time']
if time:
timeclass = ''.join(chr(ord(c) + 0xFEE0) if '0' <= c <= '9' else c for c in time)
timeclass = timeclass+'時間'
else:
timeclass = ''
sexclass = ''
if row['sex'] in ['', '男子']:
sexclass = '男子'
elif row['sex'] in ['', '女子']:
sexclass = '女子'
category_name = f"{row['department']}{sexclass}-{timeclass}"
print(f'設定カテゴリ名:{category_name}')
try:
category = NewCategory.objects.get(category_name=category_name)
row['zekken_number'] = category.category_number
category.category_number = category.category_number + 1
category.save()
except NewCategory.DoesNotExist:
print(f'Error on _get_or_create_category : {category_name}')
raise ValidationError(f"Category with name {category_name} does not exist")
return category
def _check_same_members(self, team: Team, row: Dict[str, Any], owner: CustomUser) -> bool:
"""
既存チームと新しいメンバー構成が同じかどうかをチェック
"""
existing_members = set(member.user.email for member in team.members.all())
new_members = {owner.email}
for i in range(2, int(row['members_count']) + 1):
if row.get(f'member{i}'):
new_members.add(f"dummy_{team.team_name}_{i}@example.com")
return existing_members == new_members
def _process_team_members(self, row: Dict[str, Any], team: Team, owner: CustomUser) -> None:
"""
チームメンバーを処理(オーナーとその他のメンバー)
"""
# オーナーをメンバーとして追加
Member.objects.get_or_create(
team=team,
user=owner,
defaults={'is_temporary': False}
)
# 追加メンバーの処理
for i in range(2, int(row['members_count']) + 1):
if row.get(f'member{i}'):
self._create_team_member(row, team, i)
def _create_team_member(self, row: Dict[str, Any], team: Team, member_num: int) -> None:
"""
チームの追加メンバーを作成
"""
last_name, first_name = self.name_splitter.split_full_name(row[f'member{member_num}'])
birth_date = self.date_converter.convert_date(row[f'birthday{member_num}'])
is_female = row.get(f'sex{member_num}', '') in ['女性', '', '女子', 'female']
print(f'メンバー作成:{first_name} {last_name}')
dummy_email = f"dummy_{team.team_name}_{member_num}@example.com"
dummy_user, _ = CustomUser.objects.get_or_create(
email=dummy_email,
defaults={
'password': make_password('dummy_password'),
'firstname': first_name,
'lastname': last_name,
'date_of_birth': birth_date,
'female': is_female
}
)
Member.objects.get_or_create(
team=team,
user=dummy_user,
defaults={'is_temporary': True}
)
print(f'メンバー作成完了:{dummy_user.email}')
def _create_entry(self, row: Dict[str, Any], team: Team, owner: CustomUser) -> None:
"""
エントリーを作成
"""
print(f'エントリー作成:{team.team_name}')
try:
zekken_no = row['zekken_number']
event = NewEvent2.objects.get(event_name=row['event_code'])
print(f'イベント取得:{event.event_name}')
# start_datetimeの検証を追加
if not event.start_datetime:
raise ValidationError(f"Event {event.event_name} has no start date")
# 日付が必要な場合のフォールバック
entry_date = row.get('entry_date')
print(f'エントリー日付:{entry_date}')
entry = Entry.objects.create(
team=team,
event=event,
category=team.category,
date=entry_date or event.start_datetime,
owner=owner,
zekken_number=zekken_no,
is_active=False
)
entry.clean() # バリデーション実行
entry.save()
print(f'エントリー作成完了:{team.team_name} - {event.event_name}')
except NewEvent2.DoesNotExist:
print(f'Error on _create_entry: {row}')
raise ValidationError(f"Event with code {row['event_code']} does not exist")