""" Django管理コマンド: CSVファイルからチーム情報をデータベースに登録 使用方法: docker-compose exec app python manage.py register_teams_from_csv --event_code TEST2025 --dry_run """ from django.core.management.base import BaseCommand, CommandError from django.db import transaction from django.contrib.auth import get_user_model from django.utils import timezone from datetime import datetime, date, timedelta import csv import os from rog.models import ( CustomUser, NewEvent2, NewCategory, Team, Member, Entry, EntryMember ) User = get_user_model() class Command(BaseCommand): help = 'CSVファイルからチーム情報をデータベースに登録' def add_arguments(self, parser): parser.add_argument( '--event_code', type=str, required=True, help='イベントコード' ) parser.add_argument( '--csv_file', type=str, default='CPLIST/input/teams2025.csv', help='CSVファイルパス (デフォルト: CPLIST/input/teams2025.csv)' ) parser.add_argument( '--dry_run', action='store_true', help='ドライランモード(実際のDB更新を行わない)' ) def handle(self, *args, **options): event_code = options['event_code'] csv_file = options['csv_file'] dry_run = options['dry_run'] # CSVファイルの存在確認 if not os.path.exists(csv_file): raise CommandError(f'CSVファイルが見つかりません: {csv_file}') if dry_run: self.stdout.write( self.style.WARNING('DRY RUN MODE: データベースの変更は行いません') ) try: processor = TeamRegistrationProcessor( event_code=event_code, dry_run=dry_run, stdout=self.stdout, style=self.style ) processor.initialize() processor.process_csv_file(csv_file) processor.print_stats() if dry_run: self.stdout.write( self.style.SUCCESS(f'DRY RUN 完了: イベント {event_code}') ) else: self.stdout.write( self.style.SUCCESS(f'処理完了: イベント {event_code}') ) except Exception as e: raise CommandError(f'処理エラー: {str(e)}') class TeamRegistrationProcessor: def __init__(self, event_code, dry_run=False, stdout=None, style=None): self.event_code = event_code self.dry_run = dry_run self.stdout = stdout self.style = style self.event = None self.categories = {} self.stats = { 'users_created': 0, 'users_updated': 0, 'teams_created': 0, 'members_created': 0, 'entries_created': 0, 'participations_created': 0, 'errors': [] } def log(self, message, level='INFO'): """ログ出力""" if self.stdout: if level == 'ERROR': self.stdout.write(self.style.ERROR(message)) elif level == 'WARNING': self.stdout.write(self.style.WARNING(message)) elif level == 'SUCCESS': self.stdout.write(self.style.SUCCESS(message)) else: self.stdout.write(message) else: print(message) def initialize(self): """イベントとカテゴリの初期化""" if self.dry_run: self.log("DRY RUN MODE: データベースの変更は行いません", 'WARNING') try: self.event = NewEvent2.objects.get(event_code=self.event_code) self.log(f"イベント取得: {self.event.event_name} ({self.event_code})") except NewEvent2.DoesNotExist: if self.dry_run: self.log(f"DRY RUN: Event with code '{self.event_code}' would be searched") # ダミーイベントオブジェクトを作成 class DummyEvent: def __init__(self, event_code): self.event_name = f"Dummy Event for {event_code}" self.event_code = event_code self.event = DummyEvent(self.event_code) return else: raise ValueError(f"Event with code '{self.event_code}' not found") # カテゴリ情報をプリロード if not self.dry_run: for category in NewCategory.objects.all(): hours = int(category.duration.total_seconds() // 3600) key = (category.category_name, hours) self.categories[key] = category else: # DRY RUNの場合はダミーカテゴリを作成 dummy_categories = [ ('一般', 3), ('一般', 5), ('ファミリー', 3), ('ファミリー', 5), ('男性ソロ', 3), ('男性ソロ', 5), ('女性ソロ', 3), ('女性ソロ', 5) ] for cat_name, hours in dummy_categories: class DummyCategory: def __init__(self, name, hours): self.category_name = name self.category_number = len(self.categories) + 1 self.duration = timedelta(hours=hours) self.num_of_member = 7 self.family = (name == 'ファミリー') self.female = (name == '女性ソロ') self.trial = False self.categories[(cat_name, hours)] = DummyCategory(cat_name, hours) self.log(f"利用可能なカテゴリ: {list(self.categories.keys())}") def parse_csv_row(self, row): """CSV行をパース""" if len(row) < 20: raise ValueError(f"不正な行形式: {len(row)} columns found, expected at least 20") data = { 'department_count': row[0].strip(), 'hours': row[1].strip(), 'department': row[2].strip(), 'team_name': row[3].strip(), 'email': row[4].strip(), 'password': row[5].strip(), 'phone': row[6].strip(), 'members': [] } # メンバー情報を解析(最大7名) for i in range(7): name_idx = 7 + i * 2 birth_idx = 8 + i * 2 if name_idx < len(row) and birth_idx < len(row): name = row[name_idx].strip() if row[name_idx] else None birth_str = row[birth_idx].strip() if row[birth_idx] else None if name and birth_str: try: # 誕生日の解析(複数フォーマット対応) birth_date = None for fmt in ['%Y/%m/%d', '%Y-%m-%d', '%Y/%m/%d ']: try: birth_date = datetime.strptime(birth_str.strip(), fmt).date() break except ValueError: continue if birth_date: data['members'].append({ 'name': name, 'birth_date': birth_date }) else: self.log(f"警告: 誕生日の形式が不正です: {birth_str}", 'WARNING') except Exception as e: self.log(f"警告: メンバー情報の解析エラー: {e}", 'WARNING') return data def get_or_create_category(self, department, hours): """カテゴリを取得または作成""" try: hours_int = int(hours) except ValueError: hours_int = 5 # デフォルト # 既存カテゴリから検索 key = (department, hours_int) if key in self.categories: return self.categories[key] # 一般的なカテゴリ名でマッピング category_mappings = { '一般': 'General', 'ファミリー': 'Family', '男性ソロ': 'Solo Male', '女性ソロ': 'Solo Female', } mapped_name = category_mappings.get(department, department) key_mapped = (mapped_name, hours_int) if key_mapped in self.categories: return self.categories[key_mapped] # 時間だけでマッチング(一般カテゴリとして) for (cat_name, cat_hours), category in self.categories.items(): if cat_hours == hours_int and cat_name in ['General', '一般']: return category # 新しいカテゴリを作成 self.log(f"新しいカテゴリを作成: {department} ({hours_int}時間)") if self.dry_run: self.log(f"DRY RUN: カテゴリ作成 - {department} ({hours_int}時間)") # ダミーカテゴリオブジェクトを作成 class DummyCategory: def __init__(self, processor): self.category_name = department self.category_number = len(processor.categories) + 1 self.duration = timedelta(hours=hours_int) self.num_of_member = 7 self.family = (department == 'ファミリー') self.female = (department == '女性ソロ') self.trial = False category = DummyCategory(self) else: category = NewCategory.objects.create( category_name=department, category_number=len(self.categories) + 1, duration=timedelta(hours=hours_int), num_of_member=7, # 最大7名 family=(department == 'ファミリー'), female=(department == '女性ソロ'), trial=False ) self.categories[key] = category return category def process_user(self, data): """ユーザーの処理(2-1)""" email = data['email'] password = data['password'] team_name = data['team_name'] # ゼッケン番号は部門別数を使用 zekken_number = data['department_count'] if self.dry_run: self.log(f"DRY RUN: ユーザー処理 - {email}") self.log(f" - チーム名: {team_name}") self.log(f" - ゼッケン番号: {zekken_number}") # ダミーユーザーオブジェクトを返す class DummyUser: def __init__(self): self.email = email self.firstname = data['members'][0]['name'] if data['members'] else 'Unknown' self.lastname = '' self.date_of_birth = data['members'][0]['birth_date'] if data['members'] else date.today() self.female = False self.zekken_number = zekken_number self.event_code = self.event_code self.team_name = team_name self.stats['users_created'] += 1 return DummyUser() try: # 既存ユーザーを検索 user = CustomUser.objects.get(email=email) # パスワードとその他の情報を更新 user.set_password(password) user.event_code = self.event_code user.zekken_number = zekken_number user.team_name = team_name user.is_rogaining = True user.save() self.log(f"ユーザー更新: {email}") self.stats['users_updated'] += 1 except CustomUser.DoesNotExist: # 新規ユーザー作成 # メンバー情報から代表者の情報を取得 first_member = data['members'][0] if data['members'] else None user = CustomUser.objects.create( email=email, firstname=first_member['name'] if first_member else 'Unknown', lastname='', date_of_birth=first_member['birth_date'] if first_member else date.today(), female=False, # デフォルト group=data['department'], is_active=True, is_rogaining=True, zekken_number=zekken_number, event_code=self.event_code, team_name=team_name ) user.set_password(password) user.save() self.log(f"ユーザー作成: {email}") self.stats['users_created'] += 1 return user def create_dummy_users_for_members(self, data, main_user): """メンバー用ダミーユーザーを作成""" dummy_users = [] for i, member_data in enumerate(data['members']): # メインユーザーをスキップ if i == 0: dummy_users.append(main_user) continue # ダミーメールアドレス生成 dummy_email = f"dummy_{self.event_code}_{data['department_count']}_{i}@dummy.local" if self.dry_run: self.log(f"DRY RUN: ダミーユーザー作成 - {dummy_email}") self.log(f" - 名前: {member_data['name']}") self.log(f" - 誕生日: {member_data['birth_date']}") # ダミーユーザーオブジェクトを作成 class DummyUser: def __init__(self): self.email = dummy_email self.firstname = member_data['name'] self.lastname = '' self.date_of_birth = member_data['birth_date'] self.female = False self.event_code = self.event_code self.team_name = data['team_name'] dummy_users.append(DummyUser()) continue try: # 既存のダミーユーザーを確認 dummy_user = CustomUser.objects.get(email=dummy_email) except CustomUser.DoesNotExist: # ダミーユーザー作成 dummy_user = CustomUser.objects.create( email=dummy_email, firstname=member_data['name'], lastname='', date_of_birth=member_data['birth_date'], female=False, # 名前から推測するかデフォルト group=data['department'], is_active=False, # ダミーユーザーは非アクティブ is_rogaining=True, event_code=self.event_code, team_name=data['team_name'] ) dummy_user.set_password('dummy123') dummy_user.save() self.log(f"ダミーユーザー作成: {dummy_email}") dummy_users.append(dummy_user) return dummy_users def process_team(self, data, owner, category): """チーム登録(2-2)""" team_name = data['team_name'] zekken_number = data['department_count'] if self.dry_run: self.log(f"DRY RUN: チーム作成 - {team_name}") self.log(f" - ゼッケン番号: {zekken_number}") self.log(f" - カテゴリ: {category.category_name if hasattr(category, 'category_name') else 'Unknown'}") self.log(f" - オーナー: {owner.email}") # ダミーチームオブジェクトを作成 class DummyTeam: def __init__(self, processor): self.team_name = team_name self.zekken_number = zekken_number self.owner = owner self.event = processor.event self.password = data['password'] self.class_name = data['department'] self.stats['teams_created'] += 1 return DummyTeam(self) # 既存チームを確認 try: team = Team.objects.get( team_name=team_name, event=self.event, zekken_number=zekken_number ) self.log(f"既存チーム使用: {team_name}") except Team.DoesNotExist: # 新規チーム作成 team = Team.objects.create( team_name=team_name, owner=owner, category=category, zekken_number=zekken_number, event=self.event, password=data['password'], class_name=data['department'] ) self.log(f"チーム作成: {team_name}") self.stats['teams_created'] += 1 return team def process_members(self, data, team, users): """メンバー登録""" if self.dry_run: self.log(f"DRY RUN: メンバー登録 - {team.team_name}") for user in users: self.log(f" - {user.firstname} ({user.email})") self.stats['members_created'] += 1 return # 既存メンバーを削除(更新の場合) Member.objects.filter(team=team).delete() for user in users: member = Member.objects.create( team=team, user=user, firstname=user.firstname, lastname=user.lastname, date_of_birth=user.date_of_birth, female=user.female, is_temporary=True if hasattr(user, 'email') and user.email.startswith('dummy_') else False ) self.log(f"メンバー追加: {user.firstname} to {team.team_name}") self.stats['members_created'] += 1 def process_entry(self, team, category): """エントリー登録(2-3)""" if self.dry_run: self.log(f"DRY RUN: エントリー作成 - {team.team_name}") self.log(f" - カテゴリ: {category.category_name if hasattr(category, 'category_name') else 'Unknown'}") self.log(f" - ゼッケン番号: {team.zekken_number}") # ダミーエントリーオブジェクトを作成 class DummyEntry: def __init__(self, processor): self.team = team self.event = processor.event self.category = category self.zekken_number = int(team.zekken_number) self.is_active = True self.stats['entries_created'] += 1 return DummyEntry(self) try: entry = Entry.objects.get( team=team, event=self.event, category=category ) self.log(f"既存エントリー使用: {team.team_name}") except Entry.DoesNotExist: entry = Entry.objects.create( team=team, event=self.event, category=category, owner=team.owner, zekken_number=int(team.zekken_number), is_active=True, hasParticipated=False, hasGoaled=False ) self.log(f"エントリー作成: {team.team_name}") self.stats['entries_created'] += 1 return entry def process_participation(self, entry): """イベント参加(2-4)""" if self.dry_run: self.log(f"DRY RUN: 参加登録 - {entry.team.team_name}") # ダミーメンバーリストを作成 for i in range(len(getattr(entry.team, 'dummy_members', [entry.team.owner]))): self.log(f" - Member {i+1}") self.stats['participations_created'] += 1 return # エントリーメンバーを作成 EntryMember.objects.filter(entry=entry).delete() for member in entry.team.members.all(): entry_member = EntryMember.objects.create( entry=entry, member=member, is_temporary=getattr(member, 'is_temporary', False) ) self.log(f"参加登録: {member.user.firstname}") self.stats['participations_created'] += 1 # エントリーを有効化 entry.is_active = True entry.save() def process_csv_file(self, csv_file_path): """CSVファイルを処理""" self.log(f"CSV処理開始: {csv_file_path}") with open(csv_file_path, 'r', encoding='utf-8') as file: # ヘッダーをスキップ csv_reader = csv.reader(file) header = next(csv_reader) self.log(f"CSVヘッダー: {header[:10]}...") # 最初の10列を表示 row_count = 0 for row in csv_reader: row_count += 1 if not any(row): # 空行をスキップ continue try: # DRY RUNの場合はトランザクションを使用しない if self.dry_run: self.log(f"\n--- Row {row_count}: {row[3] if len(row) > 3 else 'Unknown'} ---") # CSV行をパース data = self.parse_csv_row(row) # カテゴリ取得 category = self.get_or_create_category(data['department'], data['hours']) # 2-1. ユーザー処理 main_user = self.process_user(data) # メンバー用ダミーユーザー作成 all_users = self.create_dummy_users_for_members(data, main_user) # 2-2. チーム登録 team = self.process_team(data, main_user, category) # メンバー登録 self.process_members(data, team, all_users) # 2-3. エントリー登録 entry = self.process_entry(team, category) # 2-4. イベント参加 self.process_participation(entry) self.log(f"Row {row_count} 完了: {data['team_name']}") else: with transaction.atomic(): self.log(f"\n--- Row {row_count}: {row[3] if len(row) > 3 else 'Unknown'} ---") # CSV行をパース data = self.parse_csv_row(row) # カテゴリ取得 category = self.get_or_create_category(data['department'], data['hours']) # 2-1. ユーザー処理 main_user = self.process_user(data) # メンバー用ダミーユーザー作成 all_users = self.create_dummy_users_for_members(data, main_user) # 2-2. チーム登録 team = self.process_team(data, main_user, category) # メンバー登録 self.process_members(data, team, all_users) # 2-3. エントリー登録 entry = self.process_entry(team, category) # 2-4. イベント参加 self.process_participation(entry) self.log(f"Row {row_count} 完了: {data['team_name']}") except Exception as e: error_msg = f"Row {row_count} エラー: {str(e)}" self.log(f"エラー: {error_msg}", 'ERROR') self.stats['errors'].append(error_msg) self.log(f"\nCSV処理完了: {row_count} 行処理") def print_stats(self): """統計情報を表示""" self.log("\n=== 処理結果統計 ===", 'SUCCESS') self.log(f"作成されたユーザー: {self.stats['users_created']}") self.log(f"更新されたユーザー: {self.stats['users_updated']}") self.log(f"作成されたチーム: {self.stats['teams_created']}") self.log(f"作成されたメンバー: {self.stats['members_created']}") self.log(f"作成されたエントリー: {self.stats['entries_created']}") self.log(f"作成された参加登録: {self.stats['participations_created']}") self.log(f"エラー数: {len(self.stats['errors'])}") if self.stats['errors']: self.log("\n=== エラー詳細 ===", 'WARNING') for error in self.stats['errors']: self.log(f"- {error}", 'ERROR')