Files
rogaining_srv/rog/management/commands/register_teams_from_csv.py
2025-09-06 04:10:20 +09:00

620 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Django管理コマンド: CSVファイルからチーム情報をデータベースに登録
使用方法:
docker-compose exec app python manage.py register_teams_from_csv --event_code TEST2025 --dry_run
"""
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
from django.contrib.auth import get_user_model
from django.utils import timezone
from datetime import datetime, date, timedelta
import csv
import os
from rog.models import (
CustomUser, NewEvent2, NewCategory, Team, Member, Entry, EntryMember
)
User = get_user_model()
class Command(BaseCommand):
help = 'CSVファイルからチーム情報をデータベースに登録'
def add_arguments(self, parser):
parser.add_argument(
'--event_code',
type=str,
required=True,
help='イベントコード'
)
parser.add_argument(
'--csv_file',
type=str,
default='CPLIST/input/teams2025.csv',
help='CSVファイルパス (デフォルト: CPLIST/input/teams2025.csv)'
)
parser.add_argument(
'--dry_run',
action='store_true',
help='ドライランモード実際のDB更新を行わない'
)
def handle(self, *args, **options):
event_code = options['event_code']
csv_file = options['csv_file']
dry_run = options['dry_run']
# CSVファイルの存在確認
if not os.path.exists(csv_file):
raise CommandError(f'CSVファイルが見つかりません: {csv_file}')
if dry_run:
self.stdout.write(
self.style.WARNING('DRY RUN MODE: データベースの変更は行いません')
)
try:
processor = TeamRegistrationProcessor(
event_code=event_code,
dry_run=dry_run,
stdout=self.stdout,
style=self.style
)
processor.initialize()
processor.process_csv_file(csv_file)
processor.print_stats()
if dry_run:
self.stdout.write(
self.style.SUCCESS(f'DRY RUN 完了: イベント {event_code}')
)
else:
self.stdout.write(
self.style.SUCCESS(f'処理完了: イベント {event_code}')
)
except Exception as e:
raise CommandError(f'処理エラー: {str(e)}')
class TeamRegistrationProcessor:
def __init__(self, event_code, dry_run=False, stdout=None, style=None):
self.event_code = event_code
self.dry_run = dry_run
self.stdout = stdout
self.style = style
self.event = None
self.categories = {}
self.stats = {
'users_created': 0,
'users_updated': 0,
'teams_created': 0,
'members_created': 0,
'entries_created': 0,
'participations_created': 0,
'errors': []
}
def log(self, message, level='INFO'):
"""ログ出力"""
if self.stdout:
if level == 'ERROR':
self.stdout.write(self.style.ERROR(message))
elif level == 'WARNING':
self.stdout.write(self.style.WARNING(message))
elif level == 'SUCCESS':
self.stdout.write(self.style.SUCCESS(message))
else:
self.stdout.write(message)
else:
print(message)
def initialize(self):
"""イベントとカテゴリの初期化"""
if self.dry_run:
self.log("DRY RUN MODE: データベースの変更は行いません", 'WARNING')
try:
self.event = NewEvent2.objects.get(event_code=self.event_code)
self.log(f"イベント取得: {self.event.event_name} ({self.event_code})")
except NewEvent2.DoesNotExist:
if self.dry_run:
self.log(f"DRY RUN: Event with code '{self.event_code}' would be searched")
# ダミーイベントオブジェクトを作成
class DummyEvent:
def __init__(self):
self.event_name = f"Dummy Event for {self.event_code}"
self.event_code = self.event_code
self.event = DummyEvent()
return
else:
raise ValueError(f"Event with code '{self.event_code}' not found")
# カテゴリ情報をプリロード
for category in NewCategory.objects.all():
hours = int(category.duration.total_seconds() // 3600)
key = (category.category_name, hours)
self.categories[key] = category
self.log(f"利用可能なカテゴリ: {list(self.categories.keys())}")
def parse_csv_row(self, row):
"""CSV行をパース"""
if len(row) < 20:
raise ValueError(f"不正な行形式: {len(row)} columns found, expected at least 20")
data = {
'department_count': row[0].strip(),
'hours': row[1].strip(),
'department': row[2].strip(),
'team_name': row[3].strip(),
'email': row[4].strip(),
'password': row[5].strip(),
'phone': row[6].strip(),
'members': []
}
# メンバー情報を解析最大7名
for i in range(7):
name_idx = 7 + i * 2
birth_idx = 8 + i * 2
if name_idx < len(row) and birth_idx < len(row):
name = row[name_idx].strip() if row[name_idx] else None
birth_str = row[birth_idx].strip() if row[birth_idx] else None
if name and birth_str:
try:
# 誕生日の解析(複数フォーマット対応)
birth_date = None
for fmt in ['%Y/%m/%d', '%Y-%m-%d', '%Y/%m/%d ']:
try:
birth_date = datetime.strptime(birth_str.strip(), fmt).date()
break
except ValueError:
continue
if birth_date:
data['members'].append({
'name': name,
'birth_date': birth_date
})
else:
self.log(f"警告: 誕生日の形式が不正です: {birth_str}", 'WARNING')
except Exception as e:
self.log(f"警告: メンバー情報の解析エラー: {e}", 'WARNING')
return data
def get_or_create_category(self, department, hours):
"""カテゴリを取得または作成"""
try:
hours_int = int(hours)
except ValueError:
hours_int = 5 # デフォルト
# 既存カテゴリから検索
key = (department, hours_int)
if key in self.categories:
return self.categories[key]
# 一般的なカテゴリ名でマッピング
category_mappings = {
'一般': 'General',
'ファミリー': 'Family',
'男性ソロ': 'Solo Male',
'女性ソロ': 'Solo Female',
}
mapped_name = category_mappings.get(department, department)
key_mapped = (mapped_name, hours_int)
if key_mapped in self.categories:
return self.categories[key_mapped]
# 時間だけでマッチング(一般カテゴリとして)
for (cat_name, cat_hours), category in self.categories.items():
if cat_hours == hours_int and cat_name in ['General', '一般']:
return category
# 新しいカテゴリを作成
self.log(f"新しいカテゴリを作成: {department} ({hours_int}時間)")
if self.dry_run:
self.log(f"DRY RUN: カテゴリ作成 - {department} ({hours_int}時間)")
# ダミーカテゴリオブジェクトを作成
class DummyCategory:
def __init__(self):
self.category_name = department
self.category_number = len(self.categories) + 1
self.duration = timedelta(hours=hours_int)
self.num_of_member = 7
self.family = (department == 'ファミリー')
self.female = (department == '女性ソロ')
self.trial = False
category = DummyCategory()
else:
category = NewCategory.objects.create(
category_name=department,
category_number=len(self.categories) + 1,
duration=timedelta(hours=hours_int),
num_of_member=7, # 最大7名
family=(department == 'ファミリー'),
female=(department == '女性ソロ'),
trial=False
)
self.categories[key] = category
return category
def process_user(self, data):
"""ユーザーの処理2-1"""
email = data['email']
password = data['password']
team_name = data['team_name']
# ゼッケン番号は部門別数を使用
zekken_number = data['department_count']
if self.dry_run:
self.log(f"DRY RUN: ユーザー処理 - {email}")
self.log(f" - チーム名: {team_name}")
self.log(f" - ゼッケン番号: {zekken_number}")
# ダミーユーザーオブジェクトを返す
class DummyUser:
def __init__(self):
self.email = email
self.firstname = data['members'][0]['name'] if data['members'] else 'Unknown'
self.lastname = ''
self.date_of_birth = data['members'][0]['birth_date'] if data['members'] else date.today()
self.female = False
self.zekken_number = zekken_number
self.event_code = self.event_code
self.team_name = team_name
self.stats['users_created'] += 1
return DummyUser()
try:
# 既存ユーザーを検索
user = CustomUser.objects.get(email=email)
# パスワードとその他の情報を更新
user.set_password(password)
user.event_code = self.event_code
user.zekken_number = zekken_number
user.team_name = team_name
user.is_rogaining = True
user.save()
self.log(f"ユーザー更新: {email}")
self.stats['users_updated'] += 1
except CustomUser.DoesNotExist:
# 新規ユーザー作成
# メンバー情報から代表者の情報を取得
first_member = data['members'][0] if data['members'] else None
user = CustomUser.objects.create(
email=email,
firstname=first_member['name'] if first_member else 'Unknown',
lastname='',
date_of_birth=first_member['birth_date'] if first_member else date.today(),
female=False, # デフォルト
group=data['department'],
is_active=True,
is_rogaining=True,
zekken_number=zekken_number,
event_code=self.event_code,
team_name=team_name
)
user.set_password(password)
user.save()
self.log(f"ユーザー作成: {email}")
self.stats['users_created'] += 1
return user
def create_dummy_users_for_members(self, data, main_user):
"""メンバー用ダミーユーザーを作成"""
dummy_users = []
for i, member_data in enumerate(data['members']):
# メインユーザーをスキップ
if i == 0:
dummy_users.append(main_user)
continue
# ダミーメールアドレス生成
dummy_email = f"dummy_{self.event_code}_{data['department_count']}_{i}@dummy.local"
if self.dry_run:
self.log(f"DRY RUN: ダミーユーザー作成 - {dummy_email}")
self.log(f" - 名前: {member_data['name']}")
self.log(f" - 誕生日: {member_data['birth_date']}")
# ダミーユーザーオブジェクトを作成
class DummyUser:
def __init__(self):
self.email = dummy_email
self.firstname = member_data['name']
self.lastname = ''
self.date_of_birth = member_data['birth_date']
self.female = False
self.event_code = self.event_code
self.team_name = data['team_name']
dummy_users.append(DummyUser())
continue
try:
# 既存のダミーユーザーを確認
dummy_user = CustomUser.objects.get(email=dummy_email)
except CustomUser.DoesNotExist:
# ダミーユーザー作成
dummy_user = CustomUser.objects.create(
email=dummy_email,
firstname=member_data['name'],
lastname='',
date_of_birth=member_data['birth_date'],
female=False, # 名前から推測するかデフォルト
group=data['department'],
is_active=False, # ダミーユーザーは非アクティブ
is_rogaining=True,
event_code=self.event_code,
team_name=data['team_name']
)
dummy_user.set_password('dummy123')
dummy_user.save()
self.log(f"ダミーユーザー作成: {dummy_email}")
dummy_users.append(dummy_user)
return dummy_users
def process_team(self, data, owner, category):
"""チーム登録2-2"""
team_name = data['team_name']
zekken_number = data['department_count']
if self.dry_run:
self.log(f"DRY RUN: チーム作成 - {team_name}")
self.log(f" - ゼッケン番号: {zekken_number}")
self.log(f" - カテゴリ: {category.category_name if hasattr(category, 'category_name') else 'Unknown'}")
self.log(f" - オーナー: {owner.email}")
# ダミーチームオブジェクトを作成
class DummyTeam:
def __init__(self, processor):
self.team_name = team_name
self.zekken_number = zekken_number
self.owner = owner
self.event = processor.event
self.password = data['password']
self.class_name = data['department']
self.stats['teams_created'] += 1
return DummyTeam(self)
# 既存チームを確認
try:
team = Team.objects.get(
team_name=team_name,
event=self.event,
zekken_number=zekken_number
)
self.log(f"既存チーム使用: {team_name}")
except Team.DoesNotExist:
# 新規チーム作成
team = Team.objects.create(
team_name=team_name,
owner=owner,
category=category,
zekken_number=zekken_number,
event=self.event,
password=data['password'],
class_name=data['department']
)
self.log(f"チーム作成: {team_name}")
self.stats['teams_created'] += 1
return team
def process_members(self, data, team, users):
"""メンバー登録"""
if self.dry_run:
self.log(f"DRY RUN: メンバー登録 - {team.team_name}")
for user in users:
self.log(f" - {user.firstname} ({user.email})")
self.stats['members_created'] += 1
return
# 既存メンバーを削除(更新の場合)
Member.objects.filter(team=team).delete()
for user in users:
member = Member.objects.create(
team=team,
user=user,
firstname=user.firstname,
lastname=user.lastname,
date_of_birth=user.date_of_birth,
female=user.female,
is_temporary=True if user.email.startswith('dummy_') else False
)
self.log(f"メンバー追加: {user.firstname} to {team.team_name}")
self.stats['members_created'] += 1
def process_entry(self, team, category):
"""エントリー登録2-3"""
if self.dry_run:
self.log(f"DRY RUN: エントリー作成 - {team.team_name}")
self.log(f" - カテゴリ: {category.category_name if hasattr(category, 'category_name') else 'Unknown'}")
self.log(f" - ゼッケン番号: {team.zekken_number}")
# ダミーエントリーオブジェクトを作成
class DummyEntry:
def __init__(self, processor):
self.team = team
self.event = processor.event
self.category = category
self.zekken_number = int(team.zekken_number)
self.is_active = True
self.stats['entries_created'] += 1
return DummyEntry(self)
try:
entry = Entry.objects.get(
team=team,
event=self.event,
category=category
)
self.log(f"既存エントリー使用: {team.team_name}")
except Entry.DoesNotExist:
entry = Entry.objects.create(
team=team,
event=self.event,
category=category,
owner=team.owner,
zekken_number=int(team.zekken_number),
is_active=True,
hasParticipated=False,
hasGoaled=False
)
self.log(f"エントリー作成: {team.team_name}")
self.stats['entries_created'] += 1
return entry
def process_participation(self, entry):
"""イベント参加2-4"""
if self.dry_run:
self.log(f"DRY RUN: 参加登録 - {entry.team.team_name}")
# ダミーメンバーリストを作成
for i in range(len(getattr(entry.team, 'dummy_members', [entry.team.owner]))):
self.log(f" - Member {i+1}")
self.stats['participations_created'] += 1
return
# エントリーメンバーを作成
EntryMember.objects.filter(entry=entry).delete()
for member in entry.team.members.all():
entry_member = EntryMember.objects.create(
entry=entry,
member=member,
is_temporary=member.is_temporary
)
self.log(f"参加登録: {member.user.firstname}")
self.stats['participations_created'] += 1
# エントリーを有効化
entry.is_active = True
entry.save()
def process_csv_file(self, csv_file_path):
"""CSVファイルを処理"""
self.log(f"CSV処理開始: {csv_file_path}")
with open(csv_file_path, 'r', encoding='utf-8') as file:
# ヘッダーをスキップ
csv_reader = csv.reader(file)
header = next(csv_reader)
self.log(f"CSVヘッダー: {header[:10]}...") # 最初の10列を表示
row_count = 0
for row in csv_reader:
row_count += 1
if not any(row): # 空行をスキップ
continue
try:
# DRY RUNの場合はトランザクションを使用しない
if self.dry_run:
self.log(f"\n--- Row {row_count}: {row[3] if len(row) > 3 else 'Unknown'} ---")
# CSV行をパース
data = self.parse_csv_row(row)
# カテゴリ取得
category = self.get_or_create_category(data['department'], data['hours'])
# 2-1. ユーザー処理
main_user = self.process_user(data)
# メンバー用ダミーユーザー作成
all_users = self.create_dummy_users_for_members(data, main_user)
# 2-2. チーム登録
team = self.process_team(data, main_user, category)
# メンバー登録
self.process_members(data, team, all_users)
# 2-3. エントリー登録
entry = self.process_entry(team, category)
# 2-4. イベント参加
self.process_participation(entry)
self.log(f"Row {row_count} 完了: {data['team_name']}")
else:
with transaction.atomic():
self.log(f"\n--- Row {row_count}: {row[3] if len(row) > 3 else 'Unknown'} ---")
# CSV行をパース
data = self.parse_csv_row(row)
# カテゴリ取得
category = self.get_or_create_category(data['department'], data['hours'])
# 2-1. ユーザー処理
main_user = self.process_user(data)
# メンバー用ダミーユーザー作成
all_users = self.create_dummy_users_for_members(data, main_user)
# 2-2. チーム登録
team = self.process_team(data, main_user, category)
# メンバー登録
self.process_members(data, team, all_users)
# 2-3. エントリー登録
entry = self.process_entry(team, category)
# 2-4. イベント参加
self.process_participation(entry)
self.log(f"Row {row_count} 完了: {data['team_name']}")
except Exception as e:
error_msg = f"Row {row_count} エラー: {str(e)}"
self.log(f"エラー: {error_msg}", 'ERROR')
self.stats['errors'].append(error_msg)
self.log(f"\nCSV処理完了: {row_count} 行処理")
def print_stats(self):
"""統計情報を表示"""
self.log("\n=== 処理結果統計 ===", 'SUCCESS')
self.log(f"作成されたユーザー: {self.stats['users_created']}")
self.log(f"更新されたユーザー: {self.stats['users_updated']}")
self.log(f"作成されたチーム: {self.stats['teams_created']}")
self.log(f"作成されたメンバー: {self.stats['members_created']}")
self.log(f"作成されたエントリー: {self.stats['entries_created']}")
self.log(f"作成された参加登録: {self.stats['participations_created']}")
self.log(f"エラー数: {len(self.stats['errors'])}")
if self.stats['errors']:
self.log("\n=== エラー詳細 ===", 'WARNING')
for error in self.stats['errors']:
self.log(f"- {error}", 'ERROR')