556 lines
23 KiB
Python
556 lines
23 KiB
Python
"""
|
||
CSVファイルからチームエントリーをインポートするDjango管理コマンド
|
||
CPLIST/input/teams2025.csv 本番用対応
|
||
|
||
Usage:
|
||
docker compose exec app python manage.py import_teams --event_code=岐阜ロゲイニング2025 --csv_file=CPLIST/input/teams2025.csv
|
||
|
||
Author: システム開発チーム
|
||
Date: 2025-09-05
|
||
"""
|
||
|
||
import csv
|
||
import os
|
||
import logging
|
||
import re
|
||
from datetime import datetime, timedelta
|
||
from django.core.management.base import BaseCommand, CommandError
|
||
from django.db import transaction, models
|
||
from django.contrib.auth.hashers import make_password
|
||
from django.utils import timezone
|
||
from django.core.exceptions import ValidationError
|
||
|
||
from rog.models import (
|
||
CustomUser, Team, Member, Entry, NewEvent2, NewCategory
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class Command(BaseCommand):
|
||
help = 'CSVファイルからチームエントリーをインポート'
|
||
|
||
def add_arguments(self, parser):
|
||
parser.add_argument(
|
||
'--event_code',
|
||
type=str,
|
||
required=True,
|
||
help='インポート先のイベントコード'
|
||
)
|
||
parser.add_argument(
|
||
'--csv_file',
|
||
type=str,
|
||
default='CPLIST/input/teams2025.csv',
|
||
help='インポートするCSVファイルのパス (default: CPLIST/input/teams2025.csv)'
|
||
)
|
||
parser.add_argument(
|
||
'--dry_run',
|
||
action='store_true',
|
||
help='実際にはデータベースに書き込まずに処理を確認'
|
||
)
|
||
|
||
def handle(self, *args, **options):
|
||
event_code = options['event_code']
|
||
csv_file = options['csv_file']
|
||
dry_run = options['dry_run']
|
||
|
||
self.stdout.write(
|
||
self.style.SUCCESS(f'CSVインポート開始: event_code={event_code}, csv_file={csv_file}, dry_run={dry_run}')
|
||
)
|
||
|
||
# イベントの存在確認
|
||
try:
|
||
event = NewEvent2.objects.get(event_name=event_code)
|
||
self.stdout.write(f'イベント見つかりました: {event.event_name} (ID: {event.id})')
|
||
except NewEvent2.DoesNotExist:
|
||
raise CommandError(f'イベントが見つかりません: {event_code}')
|
||
|
||
# CSVファイルの存在確認
|
||
if not os.path.exists(csv_file):
|
||
raise CommandError(f'CSVファイルが見つかりません: {csv_file}')
|
||
|
||
# 統計情報
|
||
stats = {
|
||
'total_rows': 0,
|
||
'users_created': 0,
|
||
'users_existing': 0,
|
||
'teams_created': 0,
|
||
'members_created': 0,
|
||
'entries_created': 0,
|
||
'errors': []
|
||
}
|
||
|
||
try:
|
||
with open(csv_file, 'r', encoding='utf-8-sig') as f: # BOM対応のためutf-8-sigを使用
|
||
reader = csv.DictReader(f)
|
||
|
||
for row_num, row in enumerate(reader, start=2): # ヘッダー行の次から開始
|
||
stats['total_rows'] += 1
|
||
|
||
try:
|
||
if dry_run:
|
||
self.process_row_dry_run(row, event, stats, row_num)
|
||
else:
|
||
with transaction.atomic():
|
||
self.process_row(row, event, stats, row_num)
|
||
except Exception as e:
|
||
error_msg = f'行 {row_num}: {str(e)}'
|
||
stats['errors'].append(error_msg)
|
||
self.stdout.write(self.style.ERROR(error_msg))
|
||
continue
|
||
|
||
except Exception as e:
|
||
raise CommandError(f'CSVファイル読み込みエラー: {str(e)}')
|
||
|
||
# 結果レポート
|
||
self.print_stats(stats, dry_run)
|
||
|
||
# CSV出力
|
||
if not dry_run:
|
||
self.export_results_to_csv(event, options['csv_file'])
|
||
|
||
def export_results_to_csv(self, event, input_csv_file):
|
||
"""インポート結果をCSVファイルに出力"""
|
||
# 出力ファイル名を生成
|
||
input_dir = os.path.dirname(input_csv_file)
|
||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||
output_file = os.path.join(input_dir, f'import_results_{event.event_code}_{timestamp}.csv')
|
||
|
||
try:
|
||
with open(output_file, 'w', newline='', encoding='utf-8-sig') as csvfile:
|
||
fieldnames = [
|
||
'チーム名', 'ゼッケン番号', 'カテゴリー', '時間',
|
||
'オーナーメール', 'リーダー', 'メンバー数', 'メンバー一覧',
|
||
'参加登録状況', 'エントリーID', '作成日時'
|
||
]
|
||
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
||
writer.writeheader()
|
||
|
||
# イベントのエントリーを取得
|
||
entries = Entry.objects.filter(event=event).select_related(
|
||
'team', 'category', 'owner'
|
||
).prefetch_related('team__members__user')
|
||
|
||
for entry in entries:
|
||
# メンバー一覧を作成
|
||
member_list = []
|
||
for member in entry.team.members.all():
|
||
member_info = f"{member.user.firstname}"
|
||
if member.user.date_of_birth:
|
||
member_info += f"({member.user.date_of_birth})"
|
||
member_list.append(member_info)
|
||
|
||
# リーダー情報を取得(チームオーナー)
|
||
leader_name = ""
|
||
if entry.team.owner:
|
||
leader_name = f"{entry.team.owner.firstname}"
|
||
if entry.team.owner.date_of_birth:
|
||
leader_name += f"({entry.team.owner.date_of_birth})"
|
||
|
||
writer.writerow({
|
||
'チーム名': entry.team.team_name,
|
||
'ゼッケン番号': entry.zekken_number,
|
||
'カテゴリー': entry.category.category_name if entry.category else '',
|
||
'時間': f"{entry.category.duration.total_seconds() // 3600}時間" if entry.category else '',
|
||
'オーナーメール': entry.owner.email,
|
||
'リーダー': leader_name,
|
||
'メンバー数': entry.team.members.count(),
|
||
'メンバー一覧': '; '.join(member_list),
|
||
'参加登録状況': '完了',
|
||
'エントリーID': entry.id,
|
||
'作成日時': entry.date.strftime('%Y-%m-%d %H:%M:%S')
|
||
})
|
||
|
||
self.stdout.write(
|
||
self.style.SUCCESS(f'インポート結果をCSVに出力しました: {output_file}')
|
||
)
|
||
|
||
except Exception as e:
|
||
self.stdout.write(
|
||
self.style.ERROR(f'CSV出力エラー: {str(e)}')
|
||
)
|
||
|
||
def process_row(self, row, event, stats, row_num):
|
||
"""CSVの1行を処理(実際にデータベースに書き込み)"""
|
||
team_name = row.get('チーム名', '').strip()
|
||
self.stdout.write(f'行 {row_num} 処理中: チーム={team_name}')
|
||
|
||
# 2-1. カスタムユーザー登録
|
||
user = self.get_or_create_user(row, stats)
|
||
|
||
# 2-2. チーム登録
|
||
team = self.create_team(row, user, event, stats)
|
||
|
||
# メンバー登録(最大7名)
|
||
members = self.create_members(row, team, stats)
|
||
|
||
# 2-3. エントリー登録
|
||
entry = self.create_entry(row, team, event, stats)
|
||
|
||
self.stdout.write(self.style.SUCCESS(f'行 {row_num} 完了: チーム={team.team_name}'))
|
||
|
||
def process_row_dry_run(self, row, event, stats, row_num):
|
||
"""CSVの1行を処理(ドライラン - データベースには書き込まない)"""
|
||
team_name = row.get('チーム名', '').strip()
|
||
self.stdout.write(f'[DRY RUN] 行 {row_num}: チーム={team_name}')
|
||
|
||
# ユーザーの存在確認のみ
|
||
email = row.get('メール', '').strip()
|
||
password = row.get('パスワード', '').strip()
|
||
if email:
|
||
existing_user = CustomUser.objects.filter(email=email).first()
|
||
if existing_user:
|
||
self.stdout.write(f' ユーザー既存: {email} パスワード:既存')
|
||
stats['users_existing'] += 1
|
||
else:
|
||
display_password = password if password else 'defaultpassword123'
|
||
self.stdout.write(f' ユーザー新規作成予定: {email} パスワード:{display_password}')
|
||
stats['users_created'] += 1
|
||
|
||
stats['teams_created'] += 1
|
||
stats['entries_created'] += 1
|
||
|
||
# エントリー情報の表示
|
||
category_name = row.get('部門', '').strip()
|
||
duration_str = row.get('時間', '').strip()
|
||
|
||
# 予想されるゼッケン番号を計算(ドライラン用)
|
||
max_zekken = Entry.objects.filter(event=event).aggregate(
|
||
max_zekken=models.Max('zekken_number')
|
||
)['max_zekken'] or 0
|
||
predicted_zekken = max_zekken + stats['entries_created']
|
||
|
||
self.stdout.write(f' エントリー: ゼッケン{predicted_zekken}, カテゴリー:{category_name}, 時間:{duration_str}時間')
|
||
|
||
# 参加登録状況の確認
|
||
existing_entry = Entry.objects.filter(team__team_name=team_name, event=event).first()
|
||
if existing_entry:
|
||
self.stdout.write(f' 参加登録: 済み(既存エントリーID: {existing_entry.id})')
|
||
else:
|
||
self.stdout.write(f' 参加登録: 新規作成予定')
|
||
|
||
# メンバー数カウント
|
||
member_count = 0
|
||
member_names = []
|
||
for i in range(1, 8): # 最大7名
|
||
# 全角数字を使用
|
||
name_key = f'氏名{chr(0xFF10 + i)}' # 全角数字123...を生成
|
||
name = row.get(name_key, '').strip()
|
||
if name:
|
||
member_count += 1
|
||
birthday_key = f'誕生日{chr(0xFF10 + i)}'
|
||
birthday = row.get(birthday_key, '').strip()
|
||
member_names.append(f'{name}({birthday})')
|
||
stats['members_created'] += member_count
|
||
|
||
self.stdout.write(f' メンバー: {member_count}名 [{", ".join(member_names)}]')
|
||
|
||
def get_or_create_user(self, row, stats):
|
||
"""カスタムユーザーの取得または作成"""
|
||
email = row.get('メール', '').strip()
|
||
password = row.get('パスワード', '').strip()
|
||
phone = row.get('電話番号', '').strip()
|
||
|
||
if not email:
|
||
raise ValueError('メールアドレスが必要です')
|
||
|
||
# 特殊文字のクリーンアップ
|
||
email = email.replace('@', '@')
|
||
phone = self.clean_phone_number(phone)
|
||
|
||
# 既存ユーザーの検索
|
||
user = CustomUser.objects.filter(email=email).first()
|
||
if user:
|
||
self.stdout.write(f' 既存ユーザー: {email}')
|
||
stats['users_existing'] += 1
|
||
return user
|
||
|
||
# 新規ユーザー作成
|
||
user = CustomUser.objects.create_user(
|
||
email=email,
|
||
password=password if password else 'defaultpassword123',
|
||
is_active=True
|
||
)
|
||
self.stdout.write(f' 新規ユーザー作成: {email}')
|
||
stats['users_created'] += 1
|
||
return user
|
||
|
||
def clean_phone_number(self, phone):
|
||
"""電話番号のクリーンアップ"""
|
||
if not phone:
|
||
return ''
|
||
|
||
# 全角文字を半角に変換
|
||
phone = phone.replace('-', '-').replace('÷', '-')
|
||
|
||
# 余分な文字を削除
|
||
phone = re.sub(r'[^\d\-]', '', phone)
|
||
|
||
return phone
|
||
|
||
def create_team(self, row, user, event, stats):
|
||
"""チームの作成"""
|
||
team_name = row.get('チーム名', '').strip()
|
||
category_name = row.get('部門', '').strip()
|
||
|
||
if not team_name:
|
||
raise ValueError('チーム名が必要です')
|
||
|
||
# チームの重複チェック(同一ユーザー・同一チーム名)
|
||
existing_team = Team.objects.filter(team_name=team_name, owner=user).first()
|
||
if existing_team:
|
||
self.stdout.write(f' 既存チーム使用: {team_name}')
|
||
return existing_team
|
||
|
||
team = Team.objects.create(
|
||
team_name=team_name,
|
||
owner=user,
|
||
class_name=category_name,
|
||
event=event, # イベントを設定
|
||
created_at=timezone.now()
|
||
)
|
||
self.stdout.write(f' 新規チーム作成: {team_name}')
|
||
stats['teams_created'] += 1
|
||
return team
|
||
|
||
def create_members(self, row, team, stats):
|
||
"""メンバーの作成(最大7名)"""
|
||
members = []
|
||
|
||
for i in range(1, 8): # 最大7名
|
||
# 全角数字を使用
|
||
name_key = f'氏名{chr(0xFF10 + i)}'
|
||
birthday_key = f'誕生日{chr(0xFF10 + i)}'
|
||
|
||
name = row.get(name_key, '').strip()
|
||
birthday_str = row.get(birthday_key, '').strip()
|
||
|
||
if not name:
|
||
continue
|
||
|
||
# 誕生日の解析
|
||
birthday = self.parse_birthday(birthday_str)
|
||
|
||
# ダミーメールアドレス生成
|
||
dummy_email = f"{team.team_name.replace(' ', '_').replace('!', '').replace('?', '').lower()}_member_{i}@dummy.local"
|
||
|
||
# 既存のダミーユーザーチェック
|
||
existing_dummy = CustomUser.objects.filter(email=dummy_email).first()
|
||
if existing_dummy:
|
||
dummy_user = existing_dummy
|
||
else:
|
||
# ダミーユーザー作成
|
||
dummy_user = CustomUser.objects.create_user(
|
||
email=dummy_email,
|
||
password='dummypassword123',
|
||
firstname=name,
|
||
date_of_birth=birthday,
|
||
is_active=True
|
||
)
|
||
|
||
# 既存メンバーチェック
|
||
existing_member = Member.objects.filter(team=team, user=dummy_user).first()
|
||
if existing_member:
|
||
member = existing_member
|
||
else:
|
||
# メンバー作成(1番目の人がリーダー)
|
||
member = Member.objects.create(
|
||
team=team,
|
||
user=dummy_user,
|
||
firstname=name,
|
||
date_of_birth=birthday
|
||
)
|
||
# 1番目の人をチームのオーナーとして設定(リーダーの概念)
|
||
if i == 1 and team.owner != dummy_user:
|
||
team.owner = dummy_user
|
||
team.save()
|
||
|
||
stats['members_created'] += 1
|
||
|
||
members.append(member)
|
||
self.stdout.write(f' メンバー{i}作成: {name}')
|
||
|
||
return members
|
||
|
||
def parse_birthday(self, birthday_str):
|
||
"""誕生日文字列の解析"""
|
||
if not birthday_str or birthday_str in ['不明', '']:
|
||
return None
|
||
|
||
# 年齢のみの場合(例:71歳)
|
||
if '歳' in birthday_str:
|
||
try:
|
||
age = int(birthday_str.replace('歳', ''))
|
||
# 現在年から年齢を引いて生年を推定
|
||
birth_year = datetime.now().year - age
|
||
return datetime(birth_year, 1, 1).date()
|
||
except ValueError:
|
||
return None
|
||
|
||
# 日付形式の解析
|
||
try:
|
||
# YYYY/MM/DD形式
|
||
if '/' in birthday_str:
|
||
return datetime.strptime(birthday_str, '%Y/%m/%d').date()
|
||
# YYYYMMDD形式
|
||
elif len(birthday_str) == 8 and birthday_str.isdigit():
|
||
return datetime.strptime(birthday_str, '%Y%m%d').date()
|
||
# YYYY-MM-DD形式
|
||
elif '-' in birthday_str:
|
||
return datetime.strptime(birthday_str, '%Y-%m-%d').date()
|
||
except ValueError:
|
||
pass
|
||
|
||
self.stdout.write(self.style.WARNING(f' 誕生日形式エラー: {birthday_str}'))
|
||
return None
|
||
|
||
def create_entry(self, row, team, event, stats):
|
||
"""エントリーの作成"""
|
||
category_name = row.get('部門', '').strip()
|
||
duration_str = row.get('時間', '').strip()
|
||
|
||
# メンバー数を取得
|
||
member_count = team.members.count()
|
||
|
||
# カテゴリーの取得または作成
|
||
category = None
|
||
if category_name and duration_str:
|
||
# 既存のカテゴリーから最適なものを選択
|
||
duration_hours = int(duration_str) if duration_str.isdigit() else 3
|
||
|
||
# お試し部門の判定
|
||
is_trial = 'お試し' in category_name
|
||
|
||
if is_trial:
|
||
# お試しカテゴリーを検索(時間が一致するもの)
|
||
trial_categories = NewCategory.objects.filter(
|
||
trial=True,
|
||
duration__exact=timedelta(hours=duration_hours)
|
||
).order_by('-num_of_member') # メンバー数が多い順
|
||
|
||
# メンバー数に適したカテゴリーを選択(お試しは1名でも可)
|
||
for cat in trial_categories:
|
||
if cat.num_of_member >= member_count:
|
||
category = cat
|
||
break
|
||
|
||
if not category and trial_categories.exists():
|
||
# 適切なサイズがない場合は最大のお試しカテゴリーを使用
|
||
category = trial_categories.first()
|
||
else:
|
||
# 1. 完全一致するカテゴリーを探す
|
||
full_category_name = f"{category_name}-{duration_hours}時間"
|
||
category = NewCategory.objects.filter(category_name=full_category_name).first()
|
||
|
||
if not category:
|
||
# 2. 部分一致するカテゴリーを探す(時間が一致するもの)
|
||
categories = NewCategory.objects.filter(
|
||
category_name__icontains=category_name,
|
||
duration__exact=timedelta(hours=duration_hours),
|
||
trial=False # お試しではないもの
|
||
).order_by('-num_of_member') # メンバー数が多い順
|
||
|
||
# メンバー数に適したカテゴリーを選択
|
||
for cat in categories:
|
||
if cat.num_of_member >= member_count:
|
||
category = cat
|
||
break
|
||
|
||
if not category:
|
||
# 3. どれも見つからない場合は新規作成(メンバー数を考慮)
|
||
if is_trial:
|
||
# お試しの場合は1名から最大7名まで許可
|
||
max_members = max(7, member_count)
|
||
else:
|
||
# 通常カテゴリーは既存の制限に従う
|
||
max_members = max(7, member_count)
|
||
|
||
full_category_name = f"{category_name}-{duration_hours}時間"
|
||
category = NewCategory.objects.create(
|
||
category_name=full_category_name,
|
||
duration=timedelta(hours=duration_hours),
|
||
num_of_member=max_members,
|
||
trial=is_trial, # お試し判定を反映
|
||
family='ファミリー' in category_name, # ファミリー判定
|
||
category_number=0
|
||
)
|
||
trial_text = f", お試し={is_trial}" if is_trial else ""
|
||
self.stdout.write(f' 新規カテゴリー作成: {full_category_name} (最大{max_members}名{trial_text})')
|
||
else:
|
||
trial_text = ", お試し" if category.trial else ""
|
||
self.stdout.write(f' 使用カテゴリー: {category.category_name} (最大{category.num_of_member}名{trial_text})')
|
||
|
||
# 重複エントリーチェック
|
||
existing_entry = Entry.objects.filter(team=team, event=event).first()
|
||
if existing_entry:
|
||
self.stdout.write(f' 既存エントリー使用: {team.team_name}')
|
||
return existing_entry
|
||
|
||
# 最大ゼッケン番号を取得
|
||
max_zekken = Entry.objects.filter(event=event).aggregate(
|
||
max_zekken=models.Max('zekken_number')
|
||
)['max_zekken'] or 0
|
||
|
||
entry = Entry.objects.create(
|
||
team=team,
|
||
event=event,
|
||
category=category,
|
||
owner=team.owner,
|
||
zekken_number=max_zekken + 1,
|
||
date=timezone.now(),
|
||
is_active=True
|
||
)
|
||
|
||
self.stdout.write(f' エントリー作成: ゼッケン{entry.zekken_number}')
|
||
stats['entries_created'] += 1
|
||
return entry
|
||
|
||
# 重複エントリーチェック
|
||
existing_entry = Entry.objects.filter(team=team, event=event).first()
|
||
if existing_entry:
|
||
self.stdout.write(f' 既存エントリー使用: {team.team_name}')
|
||
return existing_entry
|
||
|
||
# 最大ゼッケン番号を取得
|
||
max_zekken = Entry.objects.filter(event=event).aggregate(
|
||
max_zekken=models.Max('zekken_number')
|
||
)['max_zekken'] or 0
|
||
|
||
entry = Entry.objects.create(
|
||
team=team,
|
||
event=event,
|
||
category=category,
|
||
owner=team.owner,
|
||
zekken_number=max_zekken + 1,
|
||
date=timezone.now(),
|
||
is_active=True
|
||
)
|
||
|
||
self.stdout.write(f' エントリー作成: ゼッケン{entry.zekken_number}')
|
||
stats['entries_created'] += 1
|
||
return entry
|
||
|
||
def print_stats(self, stats, dry_run):
|
||
"""統計情報の表示"""
|
||
mode = '[DRY RUN] ' if dry_run else ''
|
||
|
||
self.stdout.write(self.style.SUCCESS('\n' + '='*50))
|
||
self.stdout.write(self.style.SUCCESS(f'{mode}インポート結果'))
|
||
self.stdout.write(self.style.SUCCESS('='*50))
|
||
self.stdout.write(f'処理行数: {stats["total_rows"]}')
|
||
self.stdout.write(f'ユーザー新規作成: {stats["users_created"]}')
|
||
self.stdout.write(f'ユーザー既存利用: {stats["users_existing"]}')
|
||
self.stdout.write(f'チーム作成: {stats["teams_created"]}')
|
||
self.stdout.write(f'メンバー作成: {stats["members_created"]}')
|
||
self.stdout.write(f'エントリー作成: {stats["entries_created"]}')
|
||
|
||
if stats['errors']:
|
||
self.stdout.write(self.style.ERROR(f'\nエラー数: {len(stats["errors"])}'))
|
||
for error in stats['errors']:
|
||
self.stdout.write(self.style.ERROR(f' {error}'))
|
||
|
||
if dry_run:
|
||
self.stdout.write(self.style.WARNING('\n※ ドライランのため、実際のデータは作成されていません'))
|
||
else:
|
||
self.stdout.write(self.style.SUCCESS('\nインポート完了!'))
|