253 lines
9.8 KiB
Python
253 lines
9.8 KiB
Python
# services/csv_processor.py
|
|
from typing import Dict, Any
|
|
from django.shortcuts import render, redirect
|
|
from django.contrib import messages
|
|
from django.contrib.auth.hashers import make_password
|
|
from django.core.exceptions import ValidationError
|
|
from django.db import transaction
|
|
from datetime import timedelta
|
|
import csv
|
|
|
|
from ..models import CustomUser, Team, Member, NewCategory, Entry, NewEvent2
|
|
from ..utils_code.date_converter import DateConverter
|
|
from ..utils_code.name_splitter import NameSplitter
|
|
|
|
class EntryCSVProcessor:
|
|
def __init__(self):
|
|
self.date_converter = DateConverter()
|
|
self.name_splitter = NameSplitter()
|
|
|
|
def process_upload(self, request):
|
|
"""
|
|
CSVファイルのアップロードとデータ処理を行う
|
|
"""
|
|
if request.method == 'POST':
|
|
try:
|
|
if 'csv_file' not in request.FILES:
|
|
messages.error(request, 'No file was uploaded.')
|
|
return redirect('..')
|
|
|
|
csv_file = request.FILES['csv_file']
|
|
if not csv_file.name.endswith('.csv'):
|
|
messages.error(request, 'File is not CSV type')
|
|
return redirect('..')
|
|
|
|
# BOMを考慮してファイルを読み込む
|
|
file_content = csv_file.read()
|
|
if file_content.startswith(b'\xef\xbb\xbf'):
|
|
file_content = file_content[3:]
|
|
|
|
decoded_file = file_content.decode('utf-8').splitlines()
|
|
reader = csv.DictReader(decoded_file)
|
|
|
|
for row in reader:
|
|
try:
|
|
self.process_csv_row(row)
|
|
except Exception as e:
|
|
print(f'Error processing upload: {str(e)}')
|
|
messages.error(request, f'Error in row: {str(e)}')
|
|
return redirect('..')
|
|
|
|
messages.success(request, 'CSV file processed successfully')
|
|
return redirect('..')
|
|
|
|
except Exception as e:
|
|
messages.error(request, f'Error processing CSV: {str(e)}')
|
|
return redirect('..')
|
|
|
|
return render(request, 'admin/entry/upload_csv.html')
|
|
|
|
def process_csv_row(self, row: Dict[str, Any]) -> None:
|
|
"""
|
|
CSVの1行のデータを処理する
|
|
"""
|
|
try:
|
|
with transaction.atomic():
|
|
# 1) ユーザーの作成/取得
|
|
user = self._get_or_create_user(row)
|
|
if not user:
|
|
raise ValidationError("Failed to create/get user")
|
|
|
|
# 2) チームの作成/取得とカテゴリの設定
|
|
team = self._get_or_create_team(row, user)
|
|
if not team:
|
|
raise ValidationError("Failed to create/get team")
|
|
|
|
# 3) メンバーの作成/更新
|
|
self._process_team_members(row, team, user)
|
|
|
|
# 4) エントリーの作成
|
|
self._create_entry(row, team, user)
|
|
|
|
except Exception as e:
|
|
print(f'Error on process_csv_row : {str(e)}')
|
|
raise ValidationError(f"Error processing row: {str(e)}")
|
|
|
|
def _get_or_create_user(self, row: Dict[str, Any]) -> CustomUser:
|
|
"""
|
|
メールアドレスでユーザーを検索し、存在しない場合は新規作成
|
|
"""
|
|
print(f'_get_or_create_user.row={row}')
|
|
user = CustomUser.objects.filter(email=row['email']).first()
|
|
if not user:
|
|
print('New User...')
|
|
last_name, first_name = self.name_splitter.split_full_name(row['owner_name'])
|
|
birth_date = self.date_converter.convert_date(row['owner_birthday'])
|
|
is_female = row['owner_sex'] in ['女性', '女', '女子', 'female']
|
|
|
|
user = CustomUser.objects.create(
|
|
email=row['email'],
|
|
password=make_password(row['password']),
|
|
firstname=first_name,
|
|
lastname=last_name,
|
|
date_of_birth=birth_date,
|
|
female=is_female,
|
|
is_active=True
|
|
)
|
|
return user
|
|
|
|
def _get_or_create_team(self, row: Dict[str, Any], user: CustomUser) -> Team:
|
|
"""
|
|
チーム名でチームを検索し、存在しない場合は新規作成
|
|
既存チームの場合はメンバー構成を確認し、必要に応じて新バージョンを作成
|
|
"""
|
|
team_name = row['team_name']
|
|
base_team_name = team_name
|
|
version = 1
|
|
print(f'チーム作成:{team_name}')
|
|
|
|
while Team.objects.filter(team_name=team_name).exists():
|
|
existing_team = Team.objects.get(team_name=team_name)
|
|
print(f'既存チーム:{existing_team.team_name}')
|
|
if self._check_same_members(existing_team, row, user):
|
|
print('既存チームとメンバー構成が同じ')
|
|
return existing_team
|
|
version += 1
|
|
team_name = f"{base_team_name}_v{version}"
|
|
print(f'新規チーム名:{team_name}')
|
|
|
|
# 新規チームを作成
|
|
try:
|
|
category = self._get_or_create_category(row)
|
|
print(f'カテゴリからゼッケン取得:{row["zekken_number"]}')
|
|
team = Team.objects.create(
|
|
team_name=team_name,
|
|
owner=user,
|
|
category=category
|
|
)
|
|
return team
|
|
except NewCategory.DoesNotExist:
|
|
print(f'Error on _get_or_create_team')
|
|
raise ValidationError(f"Category with name {category} for team {team} does not exist")
|
|
|
|
|
|
def _get_or_create_category(self, row: Dict[str, Any]) -> NewCategory:
|
|
"""
|
|
時間とデパートメントに基づいてカテゴリを取得
|
|
"""
|
|
time = row['time']
|
|
if time:
|
|
timeclass = ''.join(chr(ord(c) + 0xFEE0) if '0' <= c <= '9' else c for c in time)
|
|
timeclass = timeclass+'時間'
|
|
else:
|
|
timeclass = ''
|
|
|
|
sexclass = ''
|
|
if row['sex'] in ['男', '男子']:
|
|
sexclass = '男子'
|
|
elif row['sex'] in ['女', '女子']:
|
|
sexclass = '女子'
|
|
|
|
category_name = f"{row['department']}{sexclass}-{timeclass}"
|
|
print(f'設定カテゴリ名:{category_name}')
|
|
try:
|
|
category = NewCategory.objects.get(category_name=category_name)
|
|
row['zekken_number'] = category.category_number
|
|
category.category_number = category.category_number + 1
|
|
category.save()
|
|
|
|
except NewCategory.DoesNotExist:
|
|
print(f'Error on _get_or_create_category : {category_name}')
|
|
|
|
raise ValidationError(f"Category with name {category_name} does not exist")
|
|
|
|
return category
|
|
|
|
def _check_same_members(self, team: Team, row: Dict[str, Any], owner: CustomUser) -> bool:
|
|
"""
|
|
既存チームと新しいメンバー構成が同じかどうかをチェック
|
|
"""
|
|
existing_members = set(member.user.email for member in team.members.all())
|
|
new_members = {owner.email}
|
|
|
|
for i in range(2, int(row['members_count']) + 1):
|
|
if row.get(f'member{i}'):
|
|
new_members.add(f"dummy_{team.team_name}_{i}@example.com")
|
|
|
|
return existing_members == new_members
|
|
|
|
def _process_team_members(self, row: Dict[str, Any], team: Team, owner: CustomUser) -> None:
|
|
"""
|
|
チームメンバーを処理(オーナーとその他のメンバー)
|
|
"""
|
|
# オーナーをメンバーとして追加
|
|
Member.objects.get_or_create(
|
|
team=team,
|
|
user=owner,
|
|
defaults={'is_temporary': False}
|
|
)
|
|
|
|
# 追加メンバーの処理
|
|
for i in range(2, int(row['members_count']) + 1):
|
|
if row.get(f'member{i}'):
|
|
self._create_team_member(row, team, i)
|
|
|
|
def _create_team_member(self, row: Dict[str, Any], team: Team, member_num: int) -> None:
|
|
"""
|
|
チームの追加メンバーを作成
|
|
"""
|
|
last_name, first_name = self.name_splitter.split_full_name(row[f'member{member_num}'])
|
|
birth_date = self.date_converter.convert_date(row[f'birthday{member_num}'])
|
|
is_female = row.get(f'sex{member_num}', '') in ['女性', '女', '女子', 'female']
|
|
|
|
dummy_email = f"dummy_{team.team_name}_{member_num}@example.com"
|
|
dummy_user, _ = CustomUser.objects.get_or_create(
|
|
email=dummy_email,
|
|
defaults={
|
|
'password': make_password('dummy_password'),
|
|
'firstname': first_name,
|
|
'lastname': last_name,
|
|
'date_of_birth': birfth_date,
|
|
'female': is_female
|
|
}
|
|
)
|
|
|
|
Member.objects.get_or_create(
|
|
team=team,
|
|
user=dummy_user,
|
|
defaults={'is_temporary': True}
|
|
)
|
|
print(f'メンバー作成:{dummy_user.email}')
|
|
|
|
def _create_entry(self, row: Dict[str, Any], team: Team, owner: CustomUser) -> None:
|
|
"""
|
|
エントリーを作成
|
|
"""
|
|
try:
|
|
zekken_no = row['zekken_number']
|
|
event = NewEvent2.objects.get(event_name=row['event_code'])
|
|
Entry.objects.create(
|
|
team=team,
|
|
event=event,
|
|
category=team.category,
|
|
date=event.start_datetime,
|
|
owner=owner,
|
|
zekken_number=zekken_no,
|
|
is_active=False
|
|
)
|
|
print(f'エントリー作成:{team.team_name} - {event.event_name}')
|
|
except NewEvent2.DoesNotExist:
|
|
raise ValidationError(f"Event with code {row['event_code']} does not exist")
|
|
|
|
|