""" CSVファイルからチームエントリーをインポートするDjango管理コマンド CPLIST/input/teams2025.csv 本番用対応 Usage: docker compose exec app python manage.py import_teams --event_code=岐阜ロゲイニング2025 --csv_file=CPLIST/input/teams2025.csv Author: システム開発チーム Date: 2025-09-05 """ import csv import os import logging import re from datetime import datetime, timedelta from django.core.management.base import BaseCommand, CommandError from django.db import transaction, models from django.contrib.auth.hashers import make_password from django.utils import timezone from django.core.exceptions import ValidationError from rog.models import ( CustomUser, Team, Member, Entry, NewEvent2, NewCategory ) logger = logging.getLogger(__name__) class Command(BaseCommand): help = 'CSVファイルからチームエントリーをインポート' def add_arguments(self, parser): parser.add_argument( '--event_code', type=str, required=True, help='インポート先のイベントコード' ) parser.add_argument( '--csv_file', type=str, default='CPLIST/input/teams2025.csv', help='インポートするCSVファイルのパス (default: CPLIST/input/teams2025.csv)' ) parser.add_argument( '--dry_run', action='store_true', help='実際にはデータベースに書き込まずに処理を確認' ) def handle(self, *args, **options): event_code = options['event_code'] csv_file = options['csv_file'] dry_run = options['dry_run'] self.stdout.write( self.style.SUCCESS(f'CSVインポート開始: event_code={event_code}, csv_file={csv_file}, dry_run={dry_run}') ) # イベントの存在確認 try: event = NewEvent2.objects.get(event_name=event_code) self.stdout.write(f'イベント見つかりました: {event.event_name} (ID: {event.id})') except NewEvent2.DoesNotExist: raise CommandError(f'イベントが見つかりません: {event_code}') # CSVファイルの存在確認 if not os.path.exists(csv_file): raise CommandError(f'CSVファイルが見つかりません: {csv_file}') # 統計情報 stats = { 'total_rows': 0, 'users_created': 0, 'users_existing': 0, 'teams_created': 0, 'members_created': 0, 'entries_created': 0, 'errors': [] } try: with open(csv_file, 'r', encoding='utf-8-sig') as f: # BOM対応のためutf-8-sigを使用 reader = csv.DictReader(f) for row_num, row in enumerate(reader, start=2): # ヘッダー行の次から開始 stats['total_rows'] += 1 try: if dry_run: self.process_row_dry_run(row, event, stats, row_num) else: with transaction.atomic(): self.process_row(row, event, stats, row_num) except Exception as e: error_msg = f'行 {row_num}: {str(e)}' stats['errors'].append(error_msg) self.stdout.write(self.style.ERROR(error_msg)) continue except Exception as e: raise CommandError(f'CSVファイル読み込みエラー: {str(e)}') # 結果レポート self.print_stats(stats, dry_run) # CSV出力 if not dry_run: self.export_results_to_csv(event, options['csv_file']) def export_results_to_csv(self, event, input_csv_file): """インポート結果をCSVファイルに出力""" # 出力ファイル名を生成 input_dir = os.path.dirname(input_csv_file) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') output_file = os.path.join(input_dir, f'import_results_{event.event_code}_{timestamp}.csv') try: with open(output_file, 'w', newline='', encoding='utf-8-sig') as csvfile: fieldnames = [ 'チーム名', 'ゼッケン番号', 'カテゴリー', '時間', 'オーナーメール', 'リーダー', 'メンバー数', 'メンバー一覧', '参加登録状況', 'エントリーID', '作成日時' ] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() # イベントのエントリーを取得 entries = Entry.objects.filter(event=event).select_related( 'team', 'category', 'owner' ).prefetch_related('team__members__user') for entry in entries: # メンバー一覧を作成 member_list = [] for member in entry.team.members.all(): member_info = f"{member.user.firstname}" if member.user.date_of_birth: member_info += f"({member.user.date_of_birth})" member_list.append(member_info) # リーダー情報を取得(チームオーナー) leader_name = "" if entry.team.owner: leader_name = f"{entry.team.owner.firstname}" if entry.team.owner.date_of_birth: leader_name += f"({entry.team.owner.date_of_birth})" writer.writerow({ 'チーム名': entry.team.team_name, 'ゼッケン番号': entry.zekken_number, 'カテゴリー': entry.category.category_name if entry.category else '', '時間': f"{entry.category.duration.total_seconds() // 3600}時間" if entry.category else '', 'オーナーメール': entry.owner.email, 'リーダー': leader_name, 'メンバー数': entry.team.members.count(), 'メンバー一覧': '; '.join(member_list), '参加登録状況': '完了', 'エントリーID': entry.id, '作成日時': entry.date.strftime('%Y-%m-%d %H:%M:%S') }) self.stdout.write( self.style.SUCCESS(f'インポート結果をCSVに出力しました: {output_file}') ) except Exception as e: self.stdout.write( self.style.ERROR(f'CSV出力エラー: {str(e)}') ) def process_row(self, row, event, stats, row_num): """CSVの1行を処理(実際にデータベースに書き込み)""" team_name = row.get('チーム名', '').strip() self.stdout.write(f'行 {row_num} 処理中: チーム={team_name}') # 2-1. カスタムユーザー登録 user = self.get_or_create_user(row, stats) # 2-2. チーム登録 team = self.create_team(row, user, event, stats) # メンバー登録(最大7名) members = self.create_members(row, team, stats) # 2-3. エントリー登録 entry = self.create_entry(row, team, event, stats) self.stdout.write(self.style.SUCCESS(f'行 {row_num} 完了: チーム={team.team_name}')) def process_row_dry_run(self, row, event, stats, row_num): """CSVの1行を処理(ドライラン - データベースには書き込まない)""" team_name = row.get('チーム名', '').strip() self.stdout.write(f'[DRY RUN] 行 {row_num}: チーム={team_name}') # ユーザーの存在確認のみ email = row.get('メール', '').strip() password = row.get('パスワード', '').strip() if email: existing_user = CustomUser.objects.filter(email=email).first() if existing_user: self.stdout.write(f' ユーザー既存: {email} パスワード:既存') stats['users_existing'] += 1 else: display_password = password if password else 'defaultpassword123' self.stdout.write(f' ユーザー新規作成予定: {email} パスワード:{display_password}') stats['users_created'] += 1 stats['teams_created'] += 1 stats['entries_created'] += 1 # エントリー情報の表示 category_name = row.get('部門', '').strip() duration_str = row.get('時間', '').strip() # 予想されるゼッケン番号を計算(ドライラン用) max_zekken = Entry.objects.filter(event=event).aggregate( max_zekken=models.Max('zekken_number') )['max_zekken'] or 0 predicted_zekken = max_zekken + stats['entries_created'] self.stdout.write(f' エントリー: ゼッケン{predicted_zekken}, カテゴリー:{category_name}, 時間:{duration_str}時間') # 参加登録状況の確認 existing_entry = Entry.objects.filter(team__team_name=team_name, event=event).first() if existing_entry: self.stdout.write(f' 参加登録: 済み(既存エントリーID: {existing_entry.id})') else: self.stdout.write(f' 参加登録: 新規作成予定') # メンバー数カウント member_count = 0 member_names = [] for i in range(1, 8): # 最大7名 # 全角数字を使用 name_key = f'氏名{chr(0xFF10 + i)}' # 全角数字123...を生成 name = row.get(name_key, '').strip() if name: member_count += 1 birthday_key = f'誕生日{chr(0xFF10 + i)}' birthday = row.get(birthday_key, '').strip() member_names.append(f'{name}({birthday})') stats['members_created'] += member_count self.stdout.write(f' メンバー: {member_count}名 [{", ".join(member_names)}]') def get_or_create_user(self, row, stats): """カスタムユーザーの取得または作成""" email = row.get('メール', '').strip() password = row.get('パスワード', '').strip() phone = row.get('電話番号', '').strip() if not email: raise ValueError('メールアドレスが必要です') # 特殊文字のクリーンアップ email = email.replace('@', '@') phone = self.clean_phone_number(phone) # 既存ユーザーの検索 user = CustomUser.objects.filter(email=email).first() if user: self.stdout.write(f' 既存ユーザー: {email}') stats['users_existing'] += 1 return user # 新規ユーザー作成 user = CustomUser.objects.create_user( email=email, password=password if password else 'defaultpassword123', is_active=True ) self.stdout.write(f' 新規ユーザー作成: {email}') stats['users_created'] += 1 return user def clean_phone_number(self, phone): """電話番号のクリーンアップ""" if not phone: return '' # 全角文字を半角に変換 phone = phone.replace('-', '-').replace('÷', '-') # 余分な文字を削除 phone = re.sub(r'[^\d\-]', '', phone) return phone def create_team(self, row, user, event, stats): """チームの作成""" team_name = row.get('チーム名', '').strip() category_name = row.get('部門', '').strip() if not team_name: raise ValueError('チーム名が必要です') # チームの重複チェック(同一ユーザー・同一チーム名) existing_team = Team.objects.filter(team_name=team_name, owner=user).first() if existing_team: self.stdout.write(f' 既存チーム使用: {team_name}') return existing_team team = Team.objects.create( team_name=team_name, owner=user, class_name=category_name, event=event, # イベントを設定 created_at=timezone.now() ) self.stdout.write(f' 新規チーム作成: {team_name}') stats['teams_created'] += 1 return team def create_members(self, row, team, stats): """メンバーの作成(最大7名)""" members = [] for i in range(1, 8): # 最大7名 # 全角数字を使用 name_key = f'氏名{chr(0xFF10 + i)}' birthday_key = f'誕生日{chr(0xFF10 + i)}' name = row.get(name_key, '').strip() birthday_str = row.get(birthday_key, '').strip() if not name: continue # 誕生日の解析 birthday = self.parse_birthday(birthday_str) # ダミーメールアドレス生成 dummy_email = f"{team.team_name.replace(' ', '_').replace('!', '').replace('?', '').lower()}_member_{i}@dummy.local" # 既存のダミーユーザーチェック existing_dummy = CustomUser.objects.filter(email=dummy_email).first() if existing_dummy: dummy_user = existing_dummy else: # ダミーユーザー作成 dummy_user = CustomUser.objects.create_user( email=dummy_email, password='dummypassword123', firstname=name, date_of_birth=birthday, is_active=True ) # 既存メンバーチェック existing_member = Member.objects.filter(team=team, user=dummy_user).first() if existing_member: member = existing_member else: # メンバー作成(1番目の人がリーダー) member = Member.objects.create( team=team, user=dummy_user, firstname=name, date_of_birth=birthday ) # 1番目の人をチームのオーナーとして設定(リーダーの概念) if i == 1 and team.owner != dummy_user: team.owner = dummy_user team.save() stats['members_created'] += 1 members.append(member) self.stdout.write(f' メンバー{i}作成: {name}') return members def parse_birthday(self, birthday_str): """誕生日文字列の解析""" if not birthday_str or birthday_str in ['不明', '']: return None # 年齢のみの場合(例:71歳) if '歳' in birthday_str: try: age = int(birthday_str.replace('歳', '')) # 現在年から年齢を引いて生年を推定 birth_year = datetime.now().year - age return datetime(birth_year, 1, 1).date() except ValueError: return None # 日付形式の解析 try: # YYYY/MM/DD形式 if '/' in birthday_str: return datetime.strptime(birthday_str, '%Y/%m/%d').date() # YYYYMMDD形式 elif len(birthday_str) == 8 and birthday_str.isdigit(): return datetime.strptime(birthday_str, '%Y%m%d').date() # YYYY-MM-DD形式 elif '-' in birthday_str: return datetime.strptime(birthday_str, '%Y-%m-%d').date() except ValueError: pass self.stdout.write(self.style.WARNING(f' 誕生日形式エラー: {birthday_str}')) return None def create_entry(self, row, team, event, stats): """エントリーの作成""" category_name = row.get('部門', '').strip() duration_str = row.get('時間', '').strip() # メンバー数を取得 member_count = team.members.count() # カテゴリーの取得または作成 category = None if category_name and duration_str: # 既存のカテゴリーから最適なものを選択 duration_hours = int(duration_str) if duration_str.isdigit() else 3 # お試し部門の判定 is_trial = 'お試し' in category_name if is_trial: # お試しカテゴリーを検索(時間が一致するもの) trial_categories = NewCategory.objects.filter( trial=True, duration__exact=timedelta(hours=duration_hours) ).order_by('-num_of_member') # メンバー数が多い順 # メンバー数に適したカテゴリーを選択(お試しは1名でも可) for cat in trial_categories: if cat.num_of_member >= member_count: category = cat break if not category and trial_categories.exists(): # 適切なサイズがない場合は最大のお試しカテゴリーを使用 category = trial_categories.first() else: # 1. 完全一致するカテゴリーを探す full_category_name = f"{category_name}-{duration_hours}時間" category = NewCategory.objects.filter(category_name=full_category_name).first() if not category: # 2. 部分一致するカテゴリーを探す(時間が一致するもの) categories = NewCategory.objects.filter( category_name__icontains=category_name, duration__exact=timedelta(hours=duration_hours), trial=False # お試しではないもの ).order_by('-num_of_member') # メンバー数が多い順 # メンバー数に適したカテゴリーを選択 for cat in categories: if cat.num_of_member >= member_count: category = cat break if not category: # 3. どれも見つからない場合は新規作成(メンバー数を考慮) if is_trial: # お試しの場合は1名から最大7名まで許可 max_members = max(7, member_count) else: # 通常カテゴリーは既存の制限に従う max_members = max(7, member_count) full_category_name = f"{category_name}-{duration_hours}時間" category = NewCategory.objects.create( category_name=full_category_name, duration=timedelta(hours=duration_hours), num_of_member=max_members, trial=is_trial, # お試し判定を反映 family='ファミリー' in category_name, # ファミリー判定 category_number=0 ) trial_text = f", お試し={is_trial}" if is_trial else "" self.stdout.write(f' 新規カテゴリー作成: {full_category_name} (最大{max_members}名{trial_text})') else: trial_text = ", お試し" if category.trial else "" self.stdout.write(f' 使用カテゴリー: {category.category_name} (最大{category.num_of_member}名{trial_text})') # 重複エントリーチェック existing_entry = Entry.objects.filter(team=team, event=event).first() if existing_entry: self.stdout.write(f' 既存エントリー使用: {team.team_name}') return existing_entry # 最大ゼッケン番号を取得 max_zekken = Entry.objects.filter(event=event).aggregate( max_zekken=models.Max('zekken_number') )['max_zekken'] or 0 entry = Entry.objects.create( team=team, event=event, category=category, owner=team.owner, zekken_number=max_zekken + 1, date=timezone.now(), is_active=True ) self.stdout.write(f' エントリー作成: ゼッケン{entry.zekken_number}') stats['entries_created'] += 1 return entry # 重複エントリーチェック existing_entry = Entry.objects.filter(team=team, event=event).first() if existing_entry: self.stdout.write(f' 既存エントリー使用: {team.team_name}') return existing_entry # 最大ゼッケン番号を取得 max_zekken = Entry.objects.filter(event=event).aggregate( max_zekken=models.Max('zekken_number') )['max_zekken'] or 0 entry = Entry.objects.create( team=team, event=event, category=category, owner=team.owner, zekken_number=max_zekken + 1, date=timezone.now(), is_active=True ) self.stdout.write(f' エントリー作成: ゼッケン{entry.zekken_number}') stats['entries_created'] += 1 return entry def print_stats(self, stats, dry_run): """統計情報の表示""" mode = '[DRY RUN] ' if dry_run else '' self.stdout.write(self.style.SUCCESS('\n' + '='*50)) self.stdout.write(self.style.SUCCESS(f'{mode}インポート結果')) self.stdout.write(self.style.SUCCESS('='*50)) self.stdout.write(f'処理行数: {stats["total_rows"]}') self.stdout.write(f'ユーザー新規作成: {stats["users_created"]}') self.stdout.write(f'ユーザー既存利用: {stats["users_existing"]}') self.stdout.write(f'チーム作成: {stats["teams_created"]}') self.stdout.write(f'メンバー作成: {stats["members_created"]}') self.stdout.write(f'エントリー作成: {stats["entries_created"]}') if stats['errors']: self.stdout.write(self.style.ERROR(f'\nエラー数: {len(stats["errors"])}')) for error in stats['errors']: self.stdout.write(self.style.ERROR(f' {error}')) if dry_run: self.stdout.write(self.style.WARNING('\n※ ドライランのため、実際のデータは作成されていません')) else: self.stdout.write(self.style.SUCCESS('\nインポート完了!'))