import csv import os import logging from django.core.management.base import BaseCommand, CommandError from django.db import transaction, connections from django.utils import timezone from django.conf import settings from django.contrib.auth import get_user_model from rog.models import Member, Team, NewEvent2, Entry, Location2025,NewCategory #, GpsLog CustomUser = get_user_model() logger = logging.getLogger(__name__) class Command(BaseCommand): help = 'CSVファイルからイベント参加者情報をインポートし、rogdbとgifurogeデータベースに登録します。' def add_arguments(self, parser): parser.add_argument('csv_file', type=str, help='インポートするCSVファイルのパス') parser.add_argument('event_code', type=str, help='登録するイベントコード') def handle(self, *args, **options): csv_file = options['csv_file'] event_code = options['event_code'] # 処理結果を保存するリストを初期化 self.processed_entries = [] if not os.path.exists(csv_file): raise CommandError(f'ファイルが見つかりません: {csv_file}') try: event = NewEvent2.objects.get(event_name=event_code) except NewEvent2.DoesNotExist: raise CommandError(f'イベントが見つかりません: {event_code}') self.stdout.write(self.style.SUCCESS(f'イベント "{event.event_name}" のデータをインポートします')) # CSVファイルを読み込み、rogdbデータベースに登録 with open(csv_file, 'r', encoding='utf-8') as file: reader = csv.reader(file) next(reader) # ヘッダー行をスキップ with transaction.atomic(): for i, row in enumerate(reader, 1): try: self.process_entry(row, event) except Exception as e: self.stdout.write(self.style.ERROR(f'行 {i} のデータ処理中にエラーが発生しました: {str(e)}')) self.stdout.write(self.style.WARNING(f'この行はスキップして続行します')) # gifurogeデータベースへの転送 self.transfer_to_gifuroge(event) # 結果をCSVファイルに出力 self.export_processed_entries(event_code) self.stdout.write(self.style.SUCCESS('データのインポートが完了しました')) def process_entry(self, row, event): """CSVの1行からエントリー情報を処理""" try: # 新しいCSVフォーマットに対応したインデックス participation_time = row[0] division = row[1] is_trial = row[2].strip() == 'お試し' # 「お試し」フラグ division_number = row[3] team_name = row[4] leader_name = row[5] leader_kana = row[6] leader_gender = row[7] # 新しく追加された性別フィールド password = row[8] # インデックスが1つずれる member_count = int(row[9]) # インデックスが1つずれる zekken_label = row[10] # インデックスが1つずれる zekken_number = row[11] # インデックスが1つずれる leader_email = row[12] # インデックスが1つずれる leader_birth_date = row[13] # インデックスが1つずれる name_parts = leader_name.split(' ', 1) lastname = name_parts[0] firstname = name_parts[1] if len(name_parts) > 1 else "" # 半角数字を全角数字に変換する関数 def to_fullwidth(s): """半角数字を全角数字に変換する""" result = "" for char in s: if char.isdigit(): # 半角数字を全角数字に変換 result += chr(ord(char) + 0xFEE0) else: result += char return result # 日付フォーマットを変換する関数 def format_date(date_str): """YYYY/MM/DD形式をYYYY-MM-DD形式に変換する""" if not date_str: return None try: # スラッシュやピリオドなどの区切り文字を処理 parts = None if '/' in date_str: parts = date_str.split('/') elif '.' in date_str: parts = date_str.split('.') elif '-' in date_str: # 既にハイフン形式の場合はそのまま返す return date_str if parts and len(parts) == 3: year, month, day = parts # 必要に応じて年を4桁に修正(例:'91' → '1991') if len(year) == 2: if int(year) > 50: # 50より大きい場合は1900年代と仮定 year = f"19{year}" else: year = f"20{year}" # 月と日が1桁の場合は2桁に揃える month = month.zfill(2) day = day.zfill(2) return f"{year}-{month}-{day}" return date_str # 変換できない場合は元の文字列を返す except Exception as e: self.stdout.write(self.style.ERROR(f'日付変換エラー: {date_str} - {str(e)}')) return None # 代表者の生年月日をフォーマット変換 formatted_leader_birth_date = format_date(leader_birth_date) # 参加時間を全角に変換 fullwidth_participation_time = to_fullwidth(participation_time) # 代表者の性別を設定(Femaleならtrue、それ以外ならfalse) is_female = leader_gender.strip().lower() == "female" # 1. CustomUserを検索または作成 new_user = False password_to_save = "" try: leader = CustomUser.objects.get(email=leader_email) # 既存ユーザー password_to_save = "(既存)" # 既存ユーザーの性別情報を更新 if leader.female != is_female: leader.female = is_female leader.save() self.stdout.write(f'既存ユーザーを代表者として使用します: {leader_email} (性別: {leader_gender})') except CustomUser.DoesNotExist: # 新規ユーザーの場合 # leader_nameを空白で分離 leader = CustomUser.objects.create_user( email=leader_email, password=password, firstname=firstname, lastname=lastname, date_of_birth=formatted_leader_birth_date, group=event.event_name, female=is_female, # 性別を設定 is_active=True ) password_to_save = password # 新規ユーザーの場合は実際のパスワード self.stdout.write(f'代表者を新規作成しました: {leader_email} (パスワード: {password}, 性別: {leader_gender})') # 処理した代表者情報をリストに追加 self.processed_entries.append({ 'leader_name': leader_name, 'team_name': team_name, 'email': leader_email, 'password': password_to_save }) # CSVの参加部門から対応するカテゴリーを検索 # division + "-" + participation_time + "時間" の形式で検索 category_name_with_time = f"{division}-{fullwidth_participation_time}時間" try: category = NewCategory.objects.get(category_name=category_name_with_time) except NewCategory.DoesNotExist: # カテゴリーが見つからない場合のエラーハンドリング self.stdout.write(self.style.ERROR(f'カテゴリーが見つかりません: {category_name_with_time}')) raise CommandError(f'カテゴリー "{category_name_with_time}" が存在しません。先にカテゴリーを作成してください。') # 2. チームの作成とメンバーの登録 team = Team.objects.create( team_name=team_name, owner=leader, category=category # eventではなくcategoryを使用 ) # メンバーの登録(代表者を含む) Member.objects.create( team=team, user=leader, firstname=firstname, lastname=lastname, date_of_birth=formatted_leader_birth_date, female=is_female, # 性別を設定 is_temporary=False # 代表者は一時的なメンバーではない ) # 追加メンバーの登録(CSVの14列目以降に存在する場合) for i in range(1, min(member_count, 5) + 1): # 最大5人まで # 各メンバーは3つのフィールド(名前、生年月日、性別)を持つ member_name_idx = 14 + (i-1) * 3 member_birth_idx = member_name_idx + 1 member_gender_idx = member_name_idx + 2 # 性別のインデックス if len(row) > member_name_idx and row[member_name_idx]: member_name = row[member_name_idx] member_birth = row[member_birth_idx] if len(row) > member_birth_idx else None # メンバーの生年月日もフォーマット変換 formatted_member_birth = format_date(member_birth) if member_birth else None member_gender = row[member_gender_idx] if len(row) > member_gender_idx else "Male" member_is_female = member_gender.strip().lower() == "female" # 名前を分割 name_parts = member_name.split(' ', 1) lastname = name_parts[0] firstname = name_parts[1] if len(name_parts) > 1 else "" # メンバー用のユーザー作成(メールアドレスは一時的なもの) temp_email = f"{team_name.replace(' ', '_')}_{i}@example.com" # 既存のメンバーチェック try: member_user = CustomUser.objects.filter(email=temp_email).first() if not member_user: raise CustomUser.DoesNotExist() # 既存ユーザーの性別情報を更新 if member_user.female != member_is_female: member_user.female = member_is_female member_user.save() except CustomUser.DoesNotExist: import secrets member_user = CustomUser.objects.create_user( email=temp_email, password=secrets.token_urlsafe(12), # メンバーにはランダムパスワード firstname=firstname, lastname=lastname, date_of_birth=formatted_member_birth, female=member_is_female, # 性別を設定 is_active=False # メンバーは直接ログインしないのでFalse ) Member.objects.create( team=team, user=member_user, firstname=firstname, lastname=lastname, date_of_birth=formatted_member_birth, female=member_is_female, # 性別を設定 is_temporary=True # 追加メンバーは一時的なメンバーとして設定 ) self.stdout.write(f' => メンバーを追加しました: {member_name} (性別: {member_gender})') # 3. エントリーの作成 # イベントの実施日をエントリーに割り当て # イベントの開始日時から日付部分のみを取得 entry_date = event.start_datetime.date() if event.start_datetime else None # イベントの実施日 entry = Entry.objects.create( team=team, event=event, date=entry_date, # イベントの実施日をエントリーに割り当て zekken_number=zekken_number, zekken_label=zekken_label, category=category, owner=leader, is_trial=is_trial # お試しフラグを設定 ) # エントリー登録完了のログ出力 self.stdout.write(f'チーム "{team_name}" をイベント "{event.event_name}" に登録しました (ゼッケン: {zekken_label}, お試し: {is_trial})') except Exception as e: self.stdout.write(self.style.ERROR(f'エラーが発生しました: {str(e)}')) # エラーが発生してもスキップして続行するため、例外を再スローしない except Exception as e: self.stdout.write(self.style.ERROR(f'エラーが発生しました: {str(e)}')) raise def export_processed_entries(self, event_code): """処理した代表者情報をCSVファイルに出力""" if not self.processed_entries: self.stdout.write('処理したエントリーがありません') return output_file = f"{event_code}_leaders_{timezone.now().strftime('%Y%m%d_%H%M%S')}.csv" with open(output_file, 'w', encoding='utf-8', newline='') as csvfile: fieldnames = ['代表者名', 'チーム名', 'メールアドレス', 'パスワード'] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for entry in self.processed_entries: writer.writerow({ '代表者名': entry['leader_name'], 'チーム名': entry['team_name'], 'メールアドレス': entry['email'], 'パスワード': entry['password'] }) self.stdout.write(self.style.SUCCESS(f'代表者情報をCSVファイルに出力しました: {output_file}')) def process_entry_old(self, row, event): """CSVの1行からエントリー情報を処理""" self.stdout.write(self.style.SUCCESS(f'イベント "{event.event_name}", row="{row}"')) try: # 新しいCSVフォーマットに対応したインデックス participation_time = row[0] division = row[1] is_trial = row[2].strip() == 'お試し' # 「お試し」フラグ division_number = row[3] team_name = row[4] leader_name = row[5] leader_kana = row[6] password = row[7] # 新しいフィールド:パスワード member_count = int(row[8]) zekken_label = row[9] # ゼッケンラベル zekken_number = row[10] # ナンバー leader_email = row[11] leader_birth_date = row[12] # 1. CustomUserを検索または作成 try: leader = CustomUser.objects.get(email=leader_email) self.stdout.write(f'既存ユーザーを代表者として使用します: {leader_email}') except CustomUser.DoesNotExist: # 新規ユーザーの場合はランダムパスワードを生成 import secrets # leader_nameを空白で分離 name_parts = leader_name.split(' ', 1) lastname = name_parts[0] firstname = name_parts[1] if len(name_parts) > 1 else "" leader = CustomUser.objects.create_user( email=leader_email, password=password, firstname=firstname, # 名前の後半部分 lastname=lastname, # 名前の前半部分 birth_date=leader_birth_date, is_active=True ) self.stdout.write(f'代表者を新規作成しました: {leader_email} (パスワード: {password})') # CSVの参加部門から対応するカテゴリーを検索 try: category = NewCategory.objects.get(category_name=division) self.stdout.write(f'カテゴリーを見つけました: {category.category_name}') except NewCategory.DoesNotExist: # カテゴリーが見つからない場合のエラーハンドリング self.stdout.write(self.style.ERROR(f'カテゴリーが見つかりません: {division}')) raise CommandError(f'カテゴリー "{division}" が存在しません。先にカテゴリーを作成してください。') # 2. チームの作成とメンバーの登録 team = Team.objects.create( team_name=team_name, owner=leader, category=category ) Member.objects.create( team=team, user=leader, is_leader=True, firstname=leader.firstname, lastname=leader.lastname, date_of_birth=leader.date_of_birth, is_temporary=False # 代表者は一時的なメンバーではない ) # 追加メンバーの登録(CSVの13列目以降に存在する場合) for i in range(1, min(member_count, 5) + 1): # 最大5人まで member_name_idx = 13 + (i-1) * 2 member_birth_idx = member_name_idx + 1 if len(row) > member_name_idx and row[member_name_idx]: member_name = row[member_name_idx] member_birth = row[member_birth_idx] if len(row) > member_birth_idx else None # メンバー用のユーザー作成(メールアドレスは一時的なもの) temp_email = f"{team_name.replace(' ', '_')}_{i}@example.com" # 既存のメンバーチェック try: member_user = CustomUser.objects.filter(name=member_name).first() if not member_user: raise CustomUser.DoesNotExist() except CustomUser.DoesNotExist: import secrets member_user = CustomUser.objects.create_user( email=temp_email, password=secrets.token_urlsafe(12), # メンバーにはランダムパスワード name=member_name, birth_date=member_birth, is_active=False # メンバーは直接ログインしないのでFalse ) # 名前を分割(姓と名の分離) name_parts = member_name.split(' ', 1) firstname = name_parts[1] if len(name_parts) > 1 else "" lastname = name_parts[0] Member.objects.create( team=team, user=member_user, is_leader=False, firstname=firstname, lastname=lastname, date_of_birth=member_birth, is_temporary=True # 追加メンバーは一時的なメンバーとして設定 ) # 3. エントリーの作成 entry = Entry.objects.create( team=team, event=event, zekken_number=zekken_number, zekken_label=zekken_label, # 新しいフィールドに設定 class_type=division, leader=leader, ) # スタート記録の追加 #GpsLog.record_start(entry) self.stdout.write(f'チーム "{team_name}" を登録しました (お試し: {is_trial})') except Exception as e: self.stdout.write(self.style.ERROR(f'エラーが発生しました: {str(e)}')) raise def transfer_to_gifuroge(self, event): """rogdbからgifurogeデータベースへデータを転送""" self.stdout.write('gifurogeデータベースへのデータ転送を開始します') with connections['gifuroge'].cursor() as cursor: try: # 1. Event data transfer from NewEvent2 to event_table self.stdout.write('イベントデータを転送中...') # Extract fields from the event object event_code = event.event_name event_name = event.event_description or event.event_name start_datetime = event.start_datetime # Format start_datetime to get only the date part event_date = start_datetime.date() if start_datetime else None cursor.execute(""" INSERT INTO event_table (event_code, event_name, start_time, event_day) VALUES (%s, %s, %s, %s) ON CONFLICT (event_code) DO UPDATE SET event_name = %s, start_time = %s, event_day = %s """, [ event_code, event_name, start_datetime, event_date, event_name, start_datetime, event_date ]) self.stdout.write(f'イベント "{event_code}" を転送しました') # 4. Locationテーブルからcheckpoint_tableへの転送 self.stdout.write('checkpointデータを転送中...') locations = Location2025.objects.filter(group=event.event_name) # Print the number of location records location_count = locations.count() self.stdout.write(f'checkpointデータ: {location_count}件を転送中...') for location in locations: # Display the cp_number, event_code, and colabo_company_memo # self.stdout.write(f' CP: {location.cp}, Event: {event.event_name}, Memo: {"" or "(empty)"}') cursor.execute(""" INSERT INTO checkpoint_table (cp_number, event_code, cp_name, latitude, longitude, photo_point, buy_point, sample_photo, colabo_company_memo) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (cp_number, event_code,colabo_company_memo) DO UPDATE SET cp_name = %s, latitude = %s, longitude = %s, photo_point = %s, buy_point = %s, sample_photo = %s, colabo_company_memo = %s """, [ location.cp, event.event_name, location.location_name, location.latitude, location.longitude, location.checkin_point, location.buy_point, location.photos, '', location.location_name, location.latitude, location.longitude, location.checkin_point, location.buy_point, location.photos, '' ]) # If cp=-1, insert another record with cp=-2 if location.cp == -1: cursor.execute(""" INSERT INTO checkpoint_table (cp_number, event_code, cp_name, latitude, longitude, photo_point, buy_point, sample_photo, colabo_company_memo) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (cp_number, event_code,colabo_company_memo) DO UPDATE SET cp_name = %s, latitude = %s, longitude = %s, photo_point = %s, buy_point = %s, sample_photo = %s, colabo_company_memo = %s """, [ -2, event.event_name, location.location_name, location.latitude, location.longitude, location.checkin_point, location.buy_point, location.photos, '', location.location_name, location.latitude, location.longitude, location.checkin_point, location.buy_point, location.photos, '' ]) # 5. user_tableへの転送をスキップ self.stdout.write('ユーザーデータの転送をスキップします') # 6. Teamテーブルからteam_tableへの転送(修正版) entries = Entry.objects.filter(event__event_name=event.event_name) # Print the number of team entries entry_count = entries.count() self.stdout.write(f'チームデータ: {entry_count}件を転送中...') for entry in entries: team = entry.team # 「お試し」かどうかを判定 is_trial = False if hasattr(entry, 'zekken_label') and entry.zekken_label and 'お試し' in entry.zekken_label: is_trial = True # パスワード処理 leader = team.owner user_password = '' # リーダーが新規登録のユーザーかどうかを確認 if hasattr(leader, '_password') and leader._password: user_password = leader._password else: # 既存のユーザーの場合はパスワードを空にする user_password = '(existing)' cursor.execute(""" INSERT INTO team_table (zekken_number, event_code, team_name, class_name, password, trial) VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT (zekken_number, event_code) DO UPDATE SET team_name = %s, class_name = %s, password = %s, trial = %s """, [ entry.zekken_label, event.event_name, team.team_name, team.category.category_name, user_password, is_trial, team.team_name, team.category.category_name, user_password, is_trial ]) self.stdout.write(self.style.SUCCESS('gifurogeデータベースへの転送が完了しました')) except Exception as e: self.stdout.write(self.style.ERROR(f'転送中にエラーが発生しました: {str(e)}')) raise def transfer_to_gifuroge_old(self, event): """rogdbからgifurogeデータベースへデータを転送""" self.stdout.write('gifurogeデータベースへのデータ転送を開始します') with connections['gifuroge'].cursor() as cursor: try: # 4. Locationテーブルからcheckpoint_tableへの転送 self.stdout.write('checkpointデータを転送中...') locations = Location2025.objects.filter(event_id=event.id) for location in locations: cursor.execute(""" INSERT INTO checkpoint_table (checkpoint_id, checkpoint_name, point_value, latitude, longitude, event_code) VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT (checkpoint_id, event_code) DO UPDATE SET checkpoint_name = %s, point_value = %s, latitude = %s, longitude = %s """, [ location.id, location.name, location.point_value, location.latitude, location.longitude, event.event_name, location.name, location.point_value, location.latitude, location.longitude ]) # 5. CustomUserテーブルからuser_tableへの転送 self.stdout.write('ユーザーデータを転送中...') entries = Entry.objects.filter(event=event) users = CustomUser.objects.filter(entry__event=event).distinct() for user in users: cursor.execute(""" INSERT INTO user_table (user_id, name, email, event_code) VALUES (%s, %s, %s, %s) ON CONFLICT (user_id, event_code) DO UPDATE SET name = %s, email = %s """, [ user.id, user.name, user.email, event.event_name, user.name, user.email ]) # 6. Teamテーブルからteam_tableへの転送(trialフィールドを追加) self.stdout.write('チームデータを転送中...') teams = Team.objects.filter(entry__event=event).distinct() for team in teams: entry = Entry.objects.get(team=team, event=event) # CSVで「お試し」フラグがあったかどうかを確認 # ここでは仮にTeamモデルから判断できないので別途Entry.zekken_labelとの比較などで判断 is_trial = False try: # お試しフラグの判定ロジックを実装 # 実際のデータ構造に基づいて修正が必要 entries_with_trial = Entry.objects.filter( team=team, event=event ).first() if entries_with_trial: # ここでお試しフラグを設定する実装が必要 # 例えば特定のゼッケンラベルパターンでお試し判定など pass except: is_trial = False cursor.execute(""" INSERT INTO team_table (team_id, team_name, class_type, zekken_number, leader_id, event_code, trial) VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (team_id, event_code) DO UPDATE SET team_name = %s, class_type = %s, zekken_number = %s, leader_id = %s, trial = %s """, [ team.id, team.team_name, entry.class_type, entry.zekken_number, team.leader.id, event.event_name, is_trial, team.team_name, entry.class_type, entry.zekken_number, team.leader.id, is_trial ]) self.stdout.write(self.style.SUCCESS('gifurogeデータベースへの転送が完了しました')) except Exception as e: self.stdout.write(self.style.ERROR(f'転送中にエラーが発生しました: {str(e)}')) raise