Files
rogaining_srv/rog/management/commands/import_teams.py
2025-09-05 17:29:11 +09:00

556 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
CSVファイルからチームエントリーをインポートするDjango管理コマンド
CPLIST/input/teams2025.csv 本番用対応
Usage:
docker compose exec app python manage.py import_teams --event_code=岐阜ロゲイニング2025 --csv_file=CPLIST/input/teams2025.csv
Author: システム開発チーム
Date: 2025-09-05
"""
import csv
import os
import logging
import re
from datetime import datetime, timedelta
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction, models
from django.contrib.auth.hashers import make_password
from django.utils import timezone
from django.core.exceptions import ValidationError
from rog.models import (
CustomUser, Team, Member, Entry, NewEvent2, NewCategory
)
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = 'CSVファイルからチームエントリーをインポート'
def add_arguments(self, parser):
parser.add_argument(
'--event_code',
type=str,
required=True,
help='インポート先のイベントコード'
)
parser.add_argument(
'--csv_file',
type=str,
default='CPLIST/input/teams2025.csv',
help='インポートするCSVファイルのパス (default: CPLIST/input/teams2025.csv)'
)
parser.add_argument(
'--dry_run',
action='store_true',
help='実際にはデータベースに書き込まずに処理を確認'
)
def handle(self, *args, **options):
event_code = options['event_code']
csv_file = options['csv_file']
dry_run = options['dry_run']
self.stdout.write(
self.style.SUCCESS(f'CSVインポート開始: event_code={event_code}, csv_file={csv_file}, dry_run={dry_run}')
)
# イベントの存在確認
try:
event = NewEvent2.objects.get(event_name=event_code)
self.stdout.write(f'イベント見つかりました: {event.event_name} (ID: {event.id})')
except NewEvent2.DoesNotExist:
raise CommandError(f'イベントが見つかりません: {event_code}')
# CSVファイルの存在確認
if not os.path.exists(csv_file):
raise CommandError(f'CSVファイルが見つかりません: {csv_file}')
# 統計情報
stats = {
'total_rows': 0,
'users_created': 0,
'users_existing': 0,
'teams_created': 0,
'members_created': 0,
'entries_created': 0,
'errors': []
}
try:
with open(csv_file, 'r', encoding='utf-8-sig') as f: # BOM対応のためutf-8-sigを使用
reader = csv.DictReader(f)
for row_num, row in enumerate(reader, start=2): # ヘッダー行の次から開始
stats['total_rows'] += 1
try:
if dry_run:
self.process_row_dry_run(row, event, stats, row_num)
else:
with transaction.atomic():
self.process_row(row, event, stats, row_num)
except Exception as e:
error_msg = f'{row_num}: {str(e)}'
stats['errors'].append(error_msg)
self.stdout.write(self.style.ERROR(error_msg))
continue
except Exception as e:
raise CommandError(f'CSVファイル読み込みエラー: {str(e)}')
# 結果レポート
self.print_stats(stats, dry_run)
# CSV出力
if not dry_run:
self.export_results_to_csv(event, options['csv_file'])
def export_results_to_csv(self, event, input_csv_file):
"""インポート結果をCSVファイルに出力"""
# 出力ファイル名を生成
input_dir = os.path.dirname(input_csv_file)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_file = os.path.join(input_dir, f'import_results_{event.event_code}_{timestamp}.csv')
try:
with open(output_file, 'w', newline='', encoding='utf-8-sig') as csvfile:
fieldnames = [
'チーム名', 'ゼッケン番号', 'カテゴリー', '時間',
'オーナーメール', 'リーダー', 'メンバー数', 'メンバー一覧',
'参加登録状況', 'エントリーID', '作成日時'
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
# イベントのエントリーを取得
entries = Entry.objects.filter(event=event).select_related(
'team', 'category', 'owner'
).prefetch_related('team__members__user')
for entry in entries:
# メンバー一覧を作成
member_list = []
for member in entry.team.members.all():
member_info = f"{member.user.firstname}"
if member.user.date_of_birth:
member_info += f"({member.user.date_of_birth})"
member_list.append(member_info)
# リーダー情報を取得(チームオーナー)
leader_name = ""
if entry.team.owner:
leader_name = f"{entry.team.owner.firstname}"
if entry.team.owner.date_of_birth:
leader_name += f"({entry.team.owner.date_of_birth})"
writer.writerow({
'チーム名': entry.team.team_name,
'ゼッケン番号': entry.zekken_number,
'カテゴリー': entry.category.category_name if entry.category else '',
'時間': f"{entry.category.duration.total_seconds() // 3600}時間" if entry.category else '',
'オーナーメール': entry.owner.email,
'リーダー': leader_name,
'メンバー数': entry.team.members.count(),
'メンバー一覧': '; '.join(member_list),
'参加登録状況': '完了',
'エントリーID': entry.id,
'作成日時': entry.date.strftime('%Y-%m-%d %H:%M:%S')
})
self.stdout.write(
self.style.SUCCESS(f'インポート結果をCSVに出力しました: {output_file}')
)
except Exception as e:
self.stdout.write(
self.style.ERROR(f'CSV出力エラー: {str(e)}')
)
def process_row(self, row, event, stats, row_num):
"""CSVの1行を処理実際にデータベースに書き込み"""
team_name = row.get('チーム名', '').strip()
self.stdout.write(f'{row_num} 処理中: チーム={team_name}')
# 2-1. カスタムユーザー登録
user = self.get_or_create_user(row, stats)
# 2-2. チーム登録
team = self.create_team(row, user, event, stats)
# メンバー登録最大7名
members = self.create_members(row, team, stats)
# 2-3. エントリー登録
entry = self.create_entry(row, team, event, stats)
self.stdout.write(self.style.SUCCESS(f'{row_num} 完了: チーム={team.team_name}'))
def process_row_dry_run(self, row, event, stats, row_num):
"""CSVの1行を処理ドライラン - データベースには書き込まない)"""
team_name = row.get('チーム名', '').strip()
self.stdout.write(f'[DRY RUN] 行 {row_num}: チーム={team_name}')
# ユーザーの存在確認のみ
email = row.get('メール', '').strip()
password = row.get('パスワード', '').strip()
if email:
existing_user = CustomUser.objects.filter(email=email).first()
if existing_user:
self.stdout.write(f' ユーザー既存: {email} パスワード:既存')
stats['users_existing'] += 1
else:
display_password = password if password else 'defaultpassword123'
self.stdout.write(f' ユーザー新規作成予定: {email} パスワード:{display_password}')
stats['users_created'] += 1
stats['teams_created'] += 1
stats['entries_created'] += 1
# エントリー情報の表示
category_name = row.get('部門', '').strip()
duration_str = row.get('時間', '').strip()
# 予想されるゼッケン番号を計算(ドライラン用)
max_zekken = Entry.objects.filter(event=event).aggregate(
max_zekken=models.Max('zekken_number')
)['max_zekken'] or 0
predicted_zekken = max_zekken + stats['entries_created']
self.stdout.write(f' エントリー: ゼッケン{predicted_zekken}, カテゴリー:{category_name}, 時間:{duration_str}時間')
# 参加登録状況の確認
existing_entry = Entry.objects.filter(team__team_name=team_name, event=event).first()
if existing_entry:
self.stdout.write(f' 参加登録: 済み既存エントリーID: {existing_entry.id}')
else:
self.stdout.write(f' 参加登録: 新規作成予定')
# メンバー数カウント
member_count = 0
member_names = []
for i in range(1, 8): # 最大7名
# 全角数字を使用
name_key = f'氏名{chr(0xFF10 + i)}' # 全角数字123...を生成
name = row.get(name_key, '').strip()
if name:
member_count += 1
birthday_key = f'誕生日{chr(0xFF10 + i)}'
birthday = row.get(birthday_key, '').strip()
member_names.append(f'{name}({birthday})')
stats['members_created'] += member_count
self.stdout.write(f' メンバー: {member_count}名 [{", ".join(member_names)}]')
def get_or_create_user(self, row, stats):
"""カスタムユーザーの取得または作成"""
email = row.get('メール', '').strip()
password = row.get('パスワード', '').strip()
phone = row.get('電話番号', '').strip()
if not email:
raise ValueError('メールアドレスが必要です')
# 特殊文字のクリーンアップ
email = email.replace('', '@')
phone = self.clean_phone_number(phone)
# 既存ユーザーの検索
user = CustomUser.objects.filter(email=email).first()
if user:
self.stdout.write(f' 既存ユーザー: {email}')
stats['users_existing'] += 1
return user
# 新規ユーザー作成
user = CustomUser.objects.create_user(
email=email,
password=password if password else 'defaultpassword123',
is_active=True
)
self.stdout.write(f' 新規ユーザー作成: {email}')
stats['users_created'] += 1
return user
def clean_phone_number(self, phone):
"""電話番号のクリーンアップ"""
if not phone:
return ''
# 全角文字を半角に変換
phone = phone.replace('', '-').replace('÷', '-')
# 余分な文字を削除
phone = re.sub(r'[^\d\-]', '', phone)
return phone
def create_team(self, row, user, event, stats):
"""チームの作成"""
team_name = row.get('チーム名', '').strip()
category_name = row.get('部門', '').strip()
if not team_name:
raise ValueError('チーム名が必要です')
# チームの重複チェック(同一ユーザー・同一チーム名)
existing_team = Team.objects.filter(team_name=team_name, owner=user).first()
if existing_team:
self.stdout.write(f' 既存チーム使用: {team_name}')
return existing_team
team = Team.objects.create(
team_name=team_name,
owner=user,
class_name=category_name,
event=event, # イベントを設定
created_at=timezone.now()
)
self.stdout.write(f' 新規チーム作成: {team_name}')
stats['teams_created'] += 1
return team
def create_members(self, row, team, stats):
"""メンバーの作成最大7名"""
members = []
for i in range(1, 8): # 最大7名
# 全角数字を使用
name_key = f'氏名{chr(0xFF10 + i)}'
birthday_key = f'誕生日{chr(0xFF10 + i)}'
name = row.get(name_key, '').strip()
birthday_str = row.get(birthday_key, '').strip()
if not name:
continue
# 誕生日の解析
birthday = self.parse_birthday(birthday_str)
# ダミーメールアドレス生成
dummy_email = f"{team.team_name.replace(' ', '_').replace('!', '').replace('?', '').lower()}_member_{i}@dummy.local"
# 既存のダミーユーザーチェック
existing_dummy = CustomUser.objects.filter(email=dummy_email).first()
if existing_dummy:
dummy_user = existing_dummy
else:
# ダミーユーザー作成
dummy_user = CustomUser.objects.create_user(
email=dummy_email,
password='dummypassword123',
firstname=name,
date_of_birth=birthday,
is_active=True
)
# 既存メンバーチェック
existing_member = Member.objects.filter(team=team, user=dummy_user).first()
if existing_member:
member = existing_member
else:
# メンバー作成1番目の人がリーダー
member = Member.objects.create(
team=team,
user=dummy_user,
firstname=name,
date_of_birth=birthday
)
# 1番目の人をチームのオーナーとして設定リーダーの概念
if i == 1 and team.owner != dummy_user:
team.owner = dummy_user
team.save()
stats['members_created'] += 1
members.append(member)
self.stdout.write(f' メンバー{i}作成: {name}')
return members
def parse_birthday(self, birthday_str):
"""誕生日文字列の解析"""
if not birthday_str or birthday_str in ['不明', '']:
return None
# 年齢のみの場合71歳
if '' in birthday_str:
try:
age = int(birthday_str.replace('', ''))
# 現在年から年齢を引いて生年を推定
birth_year = datetime.now().year - age
return datetime(birth_year, 1, 1).date()
except ValueError:
return None
# 日付形式の解析
try:
# YYYY/MM/DD形式
if '/' in birthday_str:
return datetime.strptime(birthday_str, '%Y/%m/%d').date()
# YYYYMMDD形式
elif len(birthday_str) == 8 and birthday_str.isdigit():
return datetime.strptime(birthday_str, '%Y%m%d').date()
# YYYY-MM-DD形式
elif '-' in birthday_str:
return datetime.strptime(birthday_str, '%Y-%m-%d').date()
except ValueError:
pass
self.stdout.write(self.style.WARNING(f' 誕生日形式エラー: {birthday_str}'))
return None
def create_entry(self, row, team, event, stats):
"""エントリーの作成"""
category_name = row.get('部門', '').strip()
duration_str = row.get('時間', '').strip()
# メンバー数を取得
member_count = team.members.count()
# カテゴリーの取得または作成
category = None
if category_name and duration_str:
# 既存のカテゴリーから最適なものを選択
duration_hours = int(duration_str) if duration_str.isdigit() else 3
# お試し部門の判定
is_trial = 'お試し' in category_name
if is_trial:
# お試しカテゴリーを検索(時間が一致するもの)
trial_categories = NewCategory.objects.filter(
trial=True,
duration__exact=timedelta(hours=duration_hours)
).order_by('-num_of_member') # メンバー数が多い順
# メンバー数に適したカテゴリーを選択お試しは1名でも可
for cat in trial_categories:
if cat.num_of_member >= member_count:
category = cat
break
if not category and trial_categories.exists():
# 適切なサイズがない場合は最大のお試しカテゴリーを使用
category = trial_categories.first()
else:
# 1. 完全一致するカテゴリーを探す
full_category_name = f"{category_name}-{duration_hours}時間"
category = NewCategory.objects.filter(category_name=full_category_name).first()
if not category:
# 2. 部分一致するカテゴリーを探す(時間が一致するもの)
categories = NewCategory.objects.filter(
category_name__icontains=category_name,
duration__exact=timedelta(hours=duration_hours),
trial=False # お試しではないもの
).order_by('-num_of_member') # メンバー数が多い順
# メンバー数に適したカテゴリーを選択
for cat in categories:
if cat.num_of_member >= member_count:
category = cat
break
if not category:
# 3. どれも見つからない場合は新規作成(メンバー数を考慮)
if is_trial:
# お試しの場合は1名から最大7名まで許可
max_members = max(7, member_count)
else:
# 通常カテゴリーは既存の制限に従う
max_members = max(7, member_count)
full_category_name = f"{category_name}-{duration_hours}時間"
category = NewCategory.objects.create(
category_name=full_category_name,
duration=timedelta(hours=duration_hours),
num_of_member=max_members,
trial=is_trial, # お試し判定を反映
family='ファミリー' in category_name, # ファミリー判定
category_number=0
)
trial_text = f", お試し={is_trial}" if is_trial else ""
self.stdout.write(f' 新規カテゴリー作成: {full_category_name} (最大{max_members}{trial_text})')
else:
trial_text = ", お試し" if category.trial else ""
self.stdout.write(f' 使用カテゴリー: {category.category_name} (最大{category.num_of_member}{trial_text})')
# 重複エントリーチェック
existing_entry = Entry.objects.filter(team=team, event=event).first()
if existing_entry:
self.stdout.write(f' 既存エントリー使用: {team.team_name}')
return existing_entry
# 最大ゼッケン番号を取得
max_zekken = Entry.objects.filter(event=event).aggregate(
max_zekken=models.Max('zekken_number')
)['max_zekken'] or 0
entry = Entry.objects.create(
team=team,
event=event,
category=category,
owner=team.owner,
zekken_number=max_zekken + 1,
date=timezone.now(),
is_active=True
)
self.stdout.write(f' エントリー作成: ゼッケン{entry.zekken_number}')
stats['entries_created'] += 1
return entry
# 重複エントリーチェック
existing_entry = Entry.objects.filter(team=team, event=event).first()
if existing_entry:
self.stdout.write(f' 既存エントリー使用: {team.team_name}')
return existing_entry
# 最大ゼッケン番号を取得
max_zekken = Entry.objects.filter(event=event).aggregate(
max_zekken=models.Max('zekken_number')
)['max_zekken'] or 0
entry = Entry.objects.create(
team=team,
event=event,
category=category,
owner=team.owner,
zekken_number=max_zekken + 1,
date=timezone.now(),
is_active=True
)
self.stdout.write(f' エントリー作成: ゼッケン{entry.zekken_number}')
stats['entries_created'] += 1
return entry
def print_stats(self, stats, dry_run):
"""統計情報の表示"""
mode = '[DRY RUN] ' if dry_run else ''
self.stdout.write(self.style.SUCCESS('\n' + '='*50))
self.stdout.write(self.style.SUCCESS(f'{mode}インポート結果'))
self.stdout.write(self.style.SUCCESS('='*50))
self.stdout.write(f'処理行数: {stats["total_rows"]}')
self.stdout.write(f'ユーザー新規作成: {stats["users_created"]}')
self.stdout.write(f'ユーザー既存利用: {stats["users_existing"]}')
self.stdout.write(f'チーム作成: {stats["teams_created"]}')
self.stdout.write(f'メンバー作成: {stats["members_created"]}')
self.stdout.write(f'エントリー作成: {stats["entries_created"]}')
if stats['errors']:
self.stdout.write(self.style.ERROR(f'\nエラー数: {len(stats["errors"])}'))
for error in stats['errors']:
self.stdout.write(self.style.ERROR(f' {error}'))
if dry_run:
self.stdout.write(self.style.WARNING('\n※ ドライランのため、実際のデータは作成されていません'))
else:
self.stdout.write(self.style.SUCCESS('\nインポート完了!'))