add automatic entry script

This commit is contained in:
2025-09-05 16:57:18 +09:00
parent 4a5f6273ed
commit 4e1ef7c230
16 changed files with 1051 additions and 0 deletions

View File

@ -0,0 +1,548 @@
"""
CSVファイルからチームエントリーをインポートするDjango管理コマンド
CPLIST/input/teams2025.csv 本番用対応
Usage:
docker compose exec app python manage.py import_teams --event_code=岐阜ロゲイニング2025 --csv_file=CPLIST/input/teams2025.csv
Author: システム開発チーム
Date: 2025-09-05
"""
import csv
import os
import logging
import re
from datetime import datetime, timedelta
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction, models
from django.contrib.auth.hashers import make_password
from django.utils import timezone
from django.core.exceptions import ValidationError
from rog.models import (
CustomUser, Team, Member, Entry, NewEvent2, NewCategory
)
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = 'CSVファイルからチームエントリーをインポート'
def add_arguments(self, parser):
parser.add_argument(
'--event_code',
type=str,
required=True,
help='インポート先のイベントコード'
)
parser.add_argument(
'--csv_file',
type=str,
default='CPLIST/input/teams2025.csv',
help='インポートするCSVファイルのパス (default: CPLIST/input/teams2025.csv)'
)
parser.add_argument(
'--dry_run',
action='store_true',
help='実際にはデータベースに書き込まずに処理を確認'
)
def handle(self, *args, **options):
event_code = options['event_code']
csv_file = options['csv_file']
dry_run = options['dry_run']
self.stdout.write(
self.style.SUCCESS(f'CSVインポート開始: event_code={event_code}, csv_file={csv_file}, dry_run={dry_run}')
)
# イベントの存在確認
try:
event = NewEvent2.objects.get(event_name=event_code)
self.stdout.write(f'イベント見つかりました: {event.event_name} (ID: {event.id})')
except NewEvent2.DoesNotExist:
raise CommandError(f'イベントが見つかりません: {event_code}')
# CSVファイルの存在確認
if not os.path.exists(csv_file):
raise CommandError(f'CSVファイルが見つかりません: {csv_file}')
# 統計情報
stats = {
'total_rows': 0,
'users_created': 0,
'users_existing': 0,
'teams_created': 0,
'members_created': 0,
'entries_created': 0,
'errors': []
}
try:
with open(csv_file, 'r', encoding='utf-8-sig') as f: # BOM対応のためutf-8-sigを使用
reader = csv.DictReader(f)
for row_num, row in enumerate(reader, start=2): # ヘッダー行の次から開始
stats['total_rows'] += 1
try:
if dry_run:
self.process_row_dry_run(row, event, stats, row_num)
else:
with transaction.atomic():
self.process_row(row, event, stats, row_num)
except Exception as e:
error_msg = f'{row_num}: {str(e)}'
stats['errors'].append(error_msg)
self.stdout.write(self.style.ERROR(error_msg))
continue
except Exception as e:
raise CommandError(f'CSVファイル読み込みエラー: {str(e)}')
# 結果レポート
self.print_stats(stats, dry_run)
# CSV出力
if not dry_run:
self.export_results_to_csv(event, options['csv_file'])
def export_results_to_csv(self, event, input_csv_file):
"""インポート結果をCSVファイルに出力"""
# 出力ファイル名を生成
input_dir = os.path.dirname(input_csv_file)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_file = os.path.join(input_dir, f'import_results_{event.event_code}_{timestamp}.csv')
try:
with open(output_file, 'w', newline='', encoding='utf-8-sig') as csvfile:
fieldnames = [
'チーム名', 'ゼッケン番号', 'カテゴリー', '時間',
'オーナーメール', 'リーダー', 'メンバー数', 'メンバー一覧',
'参加登録状況', 'エントリーID', '作成日時'
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
# イベントのエントリーを取得
entries = Entry.objects.filter(event=event).select_related(
'team', 'category', 'owner'
).prefetch_related('team__members__user')
for entry in entries:
# メンバー一覧を作成
member_list = []
for member in entry.team.members.all():
member_info = f"{member.user.firstname}"
if member.user.date_of_birth:
member_info += f"({member.user.date_of_birth})"
member_list.append(member_info)
# リーダー情報を取得(チームオーナー)
leader_name = ""
if entry.team.owner:
leader_name = f"{entry.team.owner.firstname}"
if entry.team.owner.date_of_birth:
leader_name += f"({entry.team.owner.date_of_birth})"
writer.writerow({
'チーム名': entry.team.team_name,
'ゼッケン番号': entry.zekken_number,
'カテゴリー': entry.category.category_name if entry.category else '',
'時間': f"{entry.category.duration.total_seconds() // 3600}時間" if entry.category else '',
'オーナーメール': entry.owner.email,
'リーダー': leader_name,
'メンバー数': entry.team.members.count(),
'メンバー一覧': '; '.join(member_list),
'参加登録状況': '完了',
'エントリーID': entry.id,
'作成日時': entry.date.strftime('%Y-%m-%d %H:%M:%S')
})
self.stdout.write(
self.style.SUCCESS(f'インポート結果をCSVに出力しました: {output_file}')
)
except Exception as e:
self.stdout.write(
self.style.ERROR(f'CSV出力エラー: {str(e)}')
)
def process_row(self, row, event, stats, row_num):
"""CSVの1行を処理実際にデータベースに書き込み"""
team_name = row.get('チーム名', '').strip()
self.stdout.write(f'{row_num} 処理中: チーム={team_name}')
# 2-1. カスタムユーザー登録
user = self.get_or_create_user(row, stats)
# 2-2. チーム登録
team = self.create_team(row, user, event, stats)
# メンバー登録最大7名
members = self.create_members(row, team, stats)
# 2-3. エントリー登録
entry = self.create_entry(row, team, event, stats)
self.stdout.write(self.style.SUCCESS(f'{row_num} 完了: チーム={team.team_name}'))
def process_row_dry_run(self, row, event, stats, row_num):
"""CSVの1行を処理ドライラン - データベースには書き込まない)"""
team_name = row.get('チーム名', '').strip()
self.stdout.write(f'[DRY RUN] 行 {row_num}: チーム={team_name}')
# ユーザーの存在確認のみ
email = row.get('メール', '').strip()
password = row.get('パスワード', '').strip()
if email:
existing_user = CustomUser.objects.filter(email=email).first()
if existing_user:
self.stdout.write(f' ユーザー既存: {email} パスワード:既存')
stats['users_existing'] += 1
else:
display_password = password if password else 'defaultpassword123'
self.stdout.write(f' ユーザー新規作成予定: {email} パスワード:{display_password}')
stats['users_created'] += 1
stats['teams_created'] += 1
stats['entries_created'] += 1
# エントリー情報の表示
category_name = row.get('部門', '').strip()
duration_str = row.get('時間', '').strip()
# 予想されるゼッケン番号を計算(ドライラン用)
max_zekken = Entry.objects.filter(event=event).aggregate(
max_zekken=models.Max('zekken_number')
)['max_zekken'] or 0
predicted_zekken = max_zekken + stats['entries_created']
self.stdout.write(f' エントリー: ゼッケン{predicted_zekken}, カテゴリー:{category_name}, 時間:{duration_str}時間')
# 参加登録状況の確認
existing_entry = Entry.objects.filter(team__team_name=team_name, event=event).first()
if existing_entry:
self.stdout.write(f' 参加登録: 済み既存エントリーID: {existing_entry.id}')
else:
self.stdout.write(f' 参加登録: 新規作成予定')
# メンバー数カウント
member_count = 0
member_names = []
for i in range(1, 8): # 最大7名
# 全角数字を使用
name_key = f'氏名{chr(0xFF10 + i)}' # 全角数字123...を生成
name = row.get(name_key, '').strip()
if name:
member_count += 1
birthday_key = f'誕生日{chr(0xFF10 + i)}'
birthday = row.get(birthday_key, '').strip()
member_names.append(f'{name}({birthday})')
stats['members_created'] += member_count
self.stdout.write(f' メンバー: {member_count}名 [{", ".join(member_names)}]')
def get_or_create_user(self, row, stats):
"""カスタムユーザーの取得または作成"""
email = row.get('メール', '').strip()
password = row.get('パスワード', '').strip()
phone = row.get('電話番号', '').strip()
if not email:
raise ValueError('メールアドレスが必要です')
# 特殊文字のクリーンアップ
email = email.replace('', '@')
phone = self.clean_phone_number(phone)
# 既存ユーザーの検索
user = CustomUser.objects.filter(email=email).first()
if user:
self.stdout.write(f' 既存ユーザー: {email}')
stats['users_existing'] += 1
return user
# 新規ユーザー作成
user = CustomUser.objects.create_user(
email=email,
password=password if password else 'defaultpassword123',
is_active=True
)
self.stdout.write(f' 新規ユーザー作成: {email}')
stats['users_created'] += 1
return user
def clean_phone_number(self, phone):
"""電話番号のクリーンアップ"""
if not phone:
return ''
# 全角文字を半角に変換
phone = phone.replace('', '-').replace('÷', '-')
# 余分な文字を削除
phone = re.sub(r'[^\d\-]', '', phone)
return phone
def create_team(self, row, user, event, stats):
"""チームの作成"""
team_name = row.get('チーム名', '').strip()
category_name = row.get('部門', '').strip()
if not team_name:
raise ValueError('チーム名が必要です')
# チームの重複チェック(同一ユーザー・同一チーム名)
existing_team = Team.objects.filter(team_name=team_name, owner=user).first()
if existing_team:
self.stdout.write(f' 既存チーム使用: {team_name}')
return existing_team
team = Team.objects.create(
team_name=team_name,
owner=user,
class_name=category_name,
event=event, # イベントを設定
created_at=timezone.now()
)
self.stdout.write(f' 新規チーム作成: {team_name}')
stats['teams_created'] += 1
return team
def create_members(self, row, team, stats):
"""メンバーの作成最大7名"""
members = []
for i in range(1, 8): # 最大7名
# 全角数字を使用
name_key = f'氏名{chr(0xFF10 + i)}'
birthday_key = f'誕生日{chr(0xFF10 + i)}'
name = row.get(name_key, '').strip()
birthday_str = row.get(birthday_key, '').strip()
if not name:
continue
# 誕生日の解析
birthday = self.parse_birthday(birthday_str)
# ダミーメールアドレス生成
dummy_email = f"{team.team_name.replace(' ', '_').replace('!', '').replace('?', '').lower()}_member_{i}@dummy.local"
# 既存のダミーユーザーチェック
existing_dummy = CustomUser.objects.filter(email=dummy_email).first()
if existing_dummy:
dummy_user = existing_dummy
else:
# ダミーユーザー作成
dummy_user = CustomUser.objects.create_user(
email=dummy_email,
password='dummypassword123',
firstname=name,
date_of_birth=birthday,
is_active=True
)
# 既存メンバーチェック
existing_member = Member.objects.filter(team=team, user=dummy_user).first()
if existing_member:
member = existing_member
else:
# メンバー作成1番目の人がリーダー
member = Member.objects.create(
team=team,
user=dummy_user,
firstname=name,
date_of_birth=birthday
)
# 1番目の人をチームのオーナーとして設定リーダーの概念
if i == 1 and team.owner != dummy_user:
team.owner = dummy_user
team.save()
stats['members_created'] += 1
members.append(member)
self.stdout.write(f' メンバー{i}作成: {name}')
return members
def parse_birthday(self, birthday_str):
"""誕生日文字列の解析"""
if not birthday_str or birthday_str in ['不明', '']:
return None
# 年齢のみの場合71歳
if '' in birthday_str:
try:
age = int(birthday_str.replace('', ''))
# 現在年から年齢を引いて生年を推定
birth_year = datetime.now().year - age
return datetime(birth_year, 1, 1).date()
except ValueError:
return None
# 日付形式の解析
try:
# YYYY/MM/DD形式
if '/' in birthday_str:
return datetime.strptime(birthday_str, '%Y/%m/%d').date()
# YYYYMMDD形式
elif len(birthday_str) == 8 and birthday_str.isdigit():
return datetime.strptime(birthday_str, '%Y%m%d').date()
# YYYY-MM-DD形式
elif '-' in birthday_str:
return datetime.strptime(birthday_str, '%Y-%m-%d').date()
except ValueError:
pass
self.stdout.write(self.style.WARNING(f' 誕生日形式エラー: {birthday_str}'))
return None
def create_entry(self, row, team, event, stats):
"""エントリーの作成"""
category_name = row.get('部門', '').strip()
duration_str = row.get('時間', '').strip()
# メンバー数を取得
member_count = team.members.count()
# カテゴリーの取得または作成
category = None
if category_name and duration_str:
# 既存のカテゴリーから最適なものを選択
duration_hours = int(duration_str) if duration_str.isdigit() else 3
# お試し部門の判定
is_trial = 'お試し' in category_name
if is_trial:
# お試しカテゴリーを検索(時間が一致するもの)
trial_categories = NewCategory.objects.filter(
trial=True,
duration__exact=timedelta(hours=duration_hours)
).order_by('-num_of_member') # メンバー数が多い順
# メンバー数に適したカテゴリーを選択
for cat in trial_categories:
if cat.num_of_member >= member_count:
category = cat
break
if not category and trial_categories.exists():
# 適切なサイズがない場合は最大のお試しカテゴリーを使用
category = trial_categories.first()
else:
# 1. 完全一致するカテゴリーを探す
full_category_name = f"{category_name}-{duration_hours}時間"
category = NewCategory.objects.filter(category_name=full_category_name).first()
if not category:
# 2. 部分一致するカテゴリーを探す(時間が一致するもの)
categories = NewCategory.objects.filter(
category_name__icontains=category_name,
duration__exact=timedelta(hours=duration_hours),
trial=False # お試しではないもの
).order_by('-num_of_member') # メンバー数が多い順
# メンバー数に適したカテゴリーを選択
for cat in categories:
if cat.num_of_member >= member_count:
category = cat
break
if not category:
# 3. どれも見つからない場合は新規作成(メンバー数を考慮)
max_members = max(7, member_count) # 最低でも現在のメンバー数は受け入れる
full_category_name = f"{category_name}-{duration_hours}時間"
category = NewCategory.objects.create(
category_name=full_category_name,
duration=timedelta(hours=duration_hours),
num_of_member=max_members,
trial=is_trial, # お試し判定を反映
family='ファミリー' in category_name, # ファミリー判定
category_number=0
)
self.stdout.write(f' 新規カテゴリー作成: {full_category_name} (最大{max_members}名, お試し={is_trial})')
else:
trial_text = ", お試し" if category.trial else ""
self.stdout.write(f' 使用カテゴリー: {category.category_name} (最大{category.num_of_member}{trial_text})')
# 重複エントリーチェック
existing_entry = Entry.objects.filter(team=team, event=event).first()
if existing_entry:
self.stdout.write(f' 既存エントリー使用: {team.team_name}')
return existing_entry
# 最大ゼッケン番号を取得
max_zekken = Entry.objects.filter(event=event).aggregate(
max_zekken=models.Max('zekken_number')
)['max_zekken'] or 0
entry = Entry.objects.create(
team=team,
event=event,
category=category,
owner=team.owner,
zekken_number=max_zekken + 1,
date=timezone.now(),
is_active=True
)
self.stdout.write(f' エントリー作成: ゼッケン{entry.zekken_number}')
stats['entries_created'] += 1
return entry
# 重複エントリーチェック
existing_entry = Entry.objects.filter(team=team, event=event).first()
if existing_entry:
self.stdout.write(f' 既存エントリー使用: {team.team_name}')
return existing_entry
# 最大ゼッケン番号を取得
max_zekken = Entry.objects.filter(event=event).aggregate(
max_zekken=models.Max('zekken_number')
)['max_zekken'] or 0
entry = Entry.objects.create(
team=team,
event=event,
category=category,
owner=team.owner,
zekken_number=max_zekken + 1,
date=timezone.now(),
is_active=True
)
self.stdout.write(f' エントリー作成: ゼッケン{entry.zekken_number}')
stats['entries_created'] += 1
return entry
def print_stats(self, stats, dry_run):
"""統計情報の表示"""
mode = '[DRY RUN] ' if dry_run else ''
self.stdout.write(self.style.SUCCESS('\n' + '='*50))
self.stdout.write(self.style.SUCCESS(f'{mode}インポート結果'))
self.stdout.write(self.style.SUCCESS('='*50))
self.stdout.write(f'処理行数: {stats["total_rows"]}')
self.stdout.write(f'ユーザー新規作成: {stats["users_created"]}')
self.stdout.write(f'ユーザー既存利用: {stats["users_existing"]}')
self.stdout.write(f'チーム作成: {stats["teams_created"]}')
self.stdout.write(f'メンバー作成: {stats["members_created"]}')
self.stdout.write(f'エントリー作成: {stats["entries_created"]}')
if stats['errors']:
self.stdout.write(self.style.ERROR(f'\nエラー数: {len(stats["errors"])}'))
for error in stats['errors']:
self.stdout.write(self.style.ERROR(f' {error}'))
if dry_run:
self.stdout.write(self.style.WARNING('\n※ ドライランのため、実際のデータは作成されていません'))
else:
self.stdout.write(self.style.SUCCESS('\nインポート完了!'))