initial setting at 20-Aug-2025

This commit is contained in:
2025-08-20 19:15:19 +09:00
parent eab529bd3b
commit 1ba305641e
149 changed files with 170449 additions and 1802 deletions

View File

@ -0,0 +1,644 @@
import csv
import os
import logging
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction, connections
from django.utils import timezone
from django.conf import settings
from django.contrib.auth import get_user_model
from rog.models import Member, Team, NewEvent2, Entry, Location,NewCategory #, GpsLog
CustomUser = get_user_model()
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = 'CSVファイルからイベント参加者情報をインポートし、rogdbとgifurogeデータベースに登録します。'
def add_arguments(self, parser):
parser.add_argument('csv_file', type=str, help='インポートするCSVファイルのパス')
parser.add_argument('event_code', type=str, help='登録するイベントコード')
def handle(self, *args, **options):
csv_file = options['csv_file']
event_code = options['event_code']
# 処理結果を保存するリストを初期化
self.processed_entries = []
if not os.path.exists(csv_file):
raise CommandError(f'ファイルが見つかりません: {csv_file}')
try:
event = NewEvent2.objects.get(event_name=event_code)
except NewEvent2.DoesNotExist:
raise CommandError(f'イベントが見つかりません: {event_code}')
self.stdout.write(self.style.SUCCESS(f'イベント "{event.event_name}" のデータをインポートします'))
# CSVファイルを読み込み、rogdbデータベースに登録
with open(csv_file, 'r', encoding='utf-8') as file:
reader = csv.reader(file)
next(reader) # ヘッダー行をスキップ
with transaction.atomic():
for i, row in enumerate(reader, 1):
try:
self.process_entry(row, event)
except Exception as e:
self.stdout.write(self.style.ERROR(f'{i} のデータ処理中にエラーが発生しました: {str(e)}'))
self.stdout.write(self.style.WARNING(f'この行はスキップして続行します'))
# gifurogeデータベースへの転送
self.transfer_to_gifuroge(event)
# 結果をCSVファイルに出力
self.export_processed_entries(event_code)
self.stdout.write(self.style.SUCCESS('データのインポートが完了しました'))
def process_entry(self, row, event):
"""CSVの1行からエントリー情報を処理"""
try:
# 新しいCSVフォーマットに対応したインデックス
participation_time = row[0]
division = row[1]
is_trial = row[2].strip() == 'お試し' # 「お試し」フラグ
division_number = row[3]
team_name = row[4]
leader_name = row[5]
leader_kana = row[6]
leader_gender = row[7] # 新しく追加された性別フィールド
password = row[8] # インデックスが1つずれる
member_count = int(row[9]) # インデックスが1つずれる
zekken_label = row[10] # インデックスが1つずれる
zekken_number = row[11] # インデックスが1つずれる
leader_email = row[12] # インデックスが1つずれる
leader_birth_date = row[13] # インデックスが1つずれる
name_parts = leader_name.split(' ', 1)
lastname = name_parts[0]
firstname = name_parts[1] if len(name_parts) > 1 else ""
# 半角数字を全角数字に変換する関数
def to_fullwidth(s):
"""半角数字を全角数字に変換する"""
result = ""
for char in s:
if char.isdigit():
# 半角数字を全角数字に変換
result += chr(ord(char) + 0xFEE0)
else:
result += char
return result
# 日付フォーマットを変換する関数
def format_date(date_str):
"""YYYY/MM/DD形式をYYYY-MM-DD形式に変換する"""
if not date_str:
return None
try:
# スラッシュやピリオドなどの区切り文字を処理
parts = None
if '/' in date_str:
parts = date_str.split('/')
elif '.' in date_str:
parts = date_str.split('.')
elif '-' in date_str:
# 既にハイフン形式の場合はそのまま返す
return date_str
if parts and len(parts) == 3:
year, month, day = parts
# 必要に応じて年を4桁に修正'91' → '1991'
if len(year) == 2:
if int(year) > 50: # 50より大きい場合は1900年代と仮定
year = f"19{year}"
else:
year = f"20{year}"
# 月と日が1桁の場合は2桁に揃える
month = month.zfill(2)
day = day.zfill(2)
return f"{year}-{month}-{day}"
return date_str # 変換できない場合は元の文字列を返す
except Exception as e:
self.stdout.write(self.style.ERROR(f'日付変換エラー: {date_str} - {str(e)}'))
return None
# 代表者の生年月日をフォーマット変換
formatted_leader_birth_date = format_date(leader_birth_date)
# 参加時間を全角に変換
fullwidth_participation_time = to_fullwidth(participation_time)
# 代表者の性別を設定Femaleならtrue、それ以外ならfalse
is_female = leader_gender.strip().lower() == "female"
# 1. CustomUserを検索または作成
new_user = False
password_to_save = ""
try:
leader = CustomUser.objects.get(email=leader_email)
# 既存ユーザー
password_to_save = "(既存)"
# 既存ユーザーの性別情報を更新
if leader.female != is_female:
leader.female = is_female
leader.save()
self.stdout.write(f'既存ユーザーを代表者として使用します: {leader_email} (性別: {leader_gender})')
except CustomUser.DoesNotExist:
# 新規ユーザーの場合
# leader_nameを空白で分離
leader = CustomUser.objects.create_user(
email=leader_email,
password=password,
firstname=firstname,
lastname=lastname,
date_of_birth=formatted_leader_birth_date,
group=event.event_name,
female=is_female, # 性別を設定
is_active=True
)
password_to_save = password # 新規ユーザーの場合は実際のパスワード
self.stdout.write(f'代表者を新規作成しました: {leader_email} (パスワード: {password}, 性別: {leader_gender})')
# 処理した代表者情報をリストに追加
self.processed_entries.append({
'leader_name': leader_name,
'team_name': team_name,
'email': leader_email,
'password': password_to_save
})
# CSVの参加部門から対応するカテゴリーを検索
# division + "-" + participation_time + "時間" の形式で検索
category_name_with_time = f"{division}-{fullwidth_participation_time}時間"
try:
category = NewCategory.objects.get(category_name=category_name_with_time)
except NewCategory.DoesNotExist:
# カテゴリーが見つからない場合のエラーハンドリング
self.stdout.write(self.style.ERROR(f'カテゴリーが見つかりません: {category_name_with_time}'))
raise CommandError(f'カテゴリー "{category_name_with_time}" が存在しません。先にカテゴリーを作成してください。')
# 2. チームの作成とメンバーの登録
team = Team.objects.create(
team_name=team_name,
owner=leader,
category=category # eventではなくcategoryを使用
)
# メンバーの登録(代表者を含む)
Member.objects.create(
team=team,
user=leader,
firstname=firstname,
lastname=lastname,
date_of_birth=formatted_leader_birth_date,
female=is_female, # 性別を設定
is_temporary=False # 代表者は一時的なメンバーではない
)
# 追加メンバーの登録CSVの14列目以降に存在する場合
for i in range(1, min(member_count, 5) + 1): # 最大5人まで
# 各メンバーは3つのフィールド名前、生年月日、性別を持つ
member_name_idx = 14 + (i-1) * 3
member_birth_idx = member_name_idx + 1
member_gender_idx = member_name_idx + 2 # 性別のインデックス
if len(row) > member_name_idx and row[member_name_idx]:
member_name = row[member_name_idx]
member_birth = row[member_birth_idx] if len(row) > member_birth_idx else None
# メンバーの生年月日もフォーマット変換
formatted_member_birth = format_date(member_birth) if member_birth else None
member_gender = row[member_gender_idx] if len(row) > member_gender_idx else "Male"
member_is_female = member_gender.strip().lower() == "female"
# 名前を分割
name_parts = member_name.split(' ', 1)
lastname = name_parts[0]
firstname = name_parts[1] if len(name_parts) > 1 else ""
# メンバー用のユーザー作成(メールアドレスは一時的なもの)
temp_email = f"{team_name.replace(' ', '_')}_{i}@example.com"
# 既存のメンバーチェック
try:
member_user = CustomUser.objects.filter(email=temp_email).first()
if not member_user:
raise CustomUser.DoesNotExist()
# 既存ユーザーの性別情報を更新
if member_user.female != member_is_female:
member_user.female = member_is_female
member_user.save()
except CustomUser.DoesNotExist:
import secrets
member_user = CustomUser.objects.create_user(
email=temp_email,
password=secrets.token_urlsafe(12), # メンバーにはランダムパスワード
firstname=firstname,
lastname=lastname,
date_of_birth=formatted_member_birth,
female=member_is_female, # 性別を設定
is_active=False # メンバーは直接ログインしないのでFalse
)
Member.objects.create(
team=team,
user=member_user,
firstname=firstname,
lastname=lastname,
date_of_birth=formatted_member_birth,
female=member_is_female, # 性別を設定
is_temporary=True # 追加メンバーは一時的なメンバーとして設定
)
self.stdout.write(f' => メンバーを追加しました: {member_name} (性別: {member_gender})')
# 3. エントリーの作成
# イベントの実施日をエントリーに割り当て
# イベントの開始日時から日付部分のみを取得
entry_date = event.start_datetime.date() if event.start_datetime else None # イベントの実施日
entry = Entry.objects.create(
team=team,
event=event,
date=entry_date, # イベントの実施日をエントリーに割り当て
zekken_number=zekken_number,
zekken_label=zekken_label,
category=category,
owner=leader,
is_trial=is_trial # お試しフラグを設定
)
# エントリー登録完了のログ出力
self.stdout.write(f'チーム "{team_name}" をイベント "{event.event_name}" に登録しました (ゼッケン: {zekken_label}, お試し: {is_trial})')
except Exception as e:
self.stdout.write(self.style.ERROR(f'エラーが発生しました: {str(e)}'))
# エラーが発生してもスキップして続行するため、例外を再スローしない
except Exception as e:
self.stdout.write(self.style.ERROR(f'エラーが発生しました: {str(e)}'))
raise
def export_processed_entries(self, event_code):
"""処理した代表者情報をCSVファイルに出力"""
if not self.processed_entries:
self.stdout.write('処理したエントリーがありません')
return
output_file = f"{event_code}_leaders_{timezone.now().strftime('%Y%m%d_%H%M%S')}.csv"
with open(output_file, 'w', encoding='utf-8', newline='') as csvfile:
fieldnames = ['代表者名', 'チーム名', 'メールアドレス', 'パスワード']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for entry in self.processed_entries:
writer.writerow({
'代表者名': entry['leader_name'],
'チーム名': entry['team_name'],
'メールアドレス': entry['email'],
'パスワード': entry['password']
})
self.stdout.write(self.style.SUCCESS(f'代表者情報をCSVファイルに出力しました: {output_file}'))
def process_entry_old(self, row, event):
"""CSVの1行からエントリー情報を処理"""
self.stdout.write(self.style.SUCCESS(f'イベント "{event.event_name}", row="{row}"'))
try:
# 新しいCSVフォーマットに対応したインデックス
participation_time = row[0]
division = row[1]
is_trial = row[2].strip() == 'お試し' # 「お試し」フラグ
division_number = row[3]
team_name = row[4]
leader_name = row[5]
leader_kana = row[6]
password = row[7] # 新しいフィールド:パスワード
member_count = int(row[8])
zekken_label = row[9] # ゼッケンラベル
zekken_number = row[10] # ナンバー
leader_email = row[11]
leader_birth_date = row[12]
# 1. CustomUserを検索または作成
try:
leader = CustomUser.objects.get(email=leader_email)
self.stdout.write(f'既存ユーザーを代表者として使用します: {leader_email}')
except CustomUser.DoesNotExist:
# 新規ユーザーの場合はランダムパスワードを生成
import secrets
# leader_nameを空白で分離
name_parts = leader_name.split(' ', 1)
lastname = name_parts[0]
firstname = name_parts[1] if len(name_parts) > 1 else ""
leader = CustomUser.objects.create_user(
email=leader_email,
password=password,
firstname=firstname, # 名前の後半部分
lastname=lastname, # 名前の前半部分
birth_date=leader_birth_date,
is_active=True
)
self.stdout.write(f'代表者を新規作成しました: {leader_email} (パスワード: {password})')
# CSVの参加部門から対応するカテゴリーを検索
try:
category = NewCategory.objects.get(category_name=division)
self.stdout.write(f'カテゴリーを見つけました: {category.category_name}')
except NewCategory.DoesNotExist:
# カテゴリーが見つからない場合のエラーハンドリング
self.stdout.write(self.style.ERROR(f'カテゴリーが見つかりません: {division}'))
raise CommandError(f'カテゴリー "{division}" が存在しません。先にカテゴリーを作成してください。')
# 2. チームの作成とメンバーの登録
team = Team.objects.create(
team_name=team_name,
owner=leader,
category=category
)
Member.objects.create(
team=team,
user=leader,
is_leader=True,
firstname=leader.firstname,
lastname=leader.lastname,
date_of_birth=leader.date_of_birth,
is_temporary=False # 代表者は一時的なメンバーではない
)
# 追加メンバーの登録CSVの13列目以降に存在する場合
for i in range(1, min(member_count, 5) + 1): # 最大5人まで
member_name_idx = 13 + (i-1) * 2
member_birth_idx = member_name_idx + 1
if len(row) > member_name_idx and row[member_name_idx]:
member_name = row[member_name_idx]
member_birth = row[member_birth_idx] if len(row) > member_birth_idx else None
# メンバー用のユーザー作成(メールアドレスは一時的なもの)
temp_email = f"{team_name.replace(' ', '_')}_{i}@example.com"
# 既存のメンバーチェック
try:
member_user = CustomUser.objects.filter(name=member_name).first()
if not member_user:
raise CustomUser.DoesNotExist()
except CustomUser.DoesNotExist:
import secrets
member_user = CustomUser.objects.create_user(
email=temp_email,
password=secrets.token_urlsafe(12), # メンバーにはランダムパスワード
name=member_name,
birth_date=member_birth,
is_active=False # メンバーは直接ログインしないのでFalse
)
# 名前を分割(姓と名の分離)
name_parts = member_name.split(' ', 1)
firstname = name_parts[1] if len(name_parts) > 1 else ""
lastname = name_parts[0]
Member.objects.create(
team=team,
user=member_user,
is_leader=False,
firstname=firstname,
lastname=lastname,
date_of_birth=member_birth,
is_temporary=True # 追加メンバーは一時的なメンバーとして設定
)
# 3. エントリーの作成
entry = Entry.objects.create(
team=team,
event=event,
zekken_number=zekken_number,
zekken_label=zekken_label, # 新しいフィールドに設定
class_type=division,
leader=leader,
)
# スタート記録の追加
#GpsLog.record_start(entry)
self.stdout.write(f'チーム "{team_name}" を登録しました (お試し: {is_trial})')
except Exception as e:
self.stdout.write(self.style.ERROR(f'エラーが発生しました: {str(e)}'))
raise
def transfer_to_gifuroge(self, event):
"""rogdbからgifurogeデータベースへデータを転送"""
self.stdout.write('gifurogeデータベースへのデータ転送を開始します')
with connections['gifuroge'].cursor() as cursor:
try:
# 1. Event data transfer from NewEvent2 to event_table
self.stdout.write('イベントデータを転送中...')
# Extract fields from the event object
event_code = event.event_name
event_name = event.event_description or event.event_name
start_datetime = event.start_datetime
# Format start_datetime to get only the date part
event_date = start_datetime.date() if start_datetime else None
cursor.execute("""
INSERT INTO event_table
(event_code, event_name, start_time, event_day)
VALUES (%s, %s, %s, %s)
ON CONFLICT (event_code) DO UPDATE
SET event_name = %s, start_time = %s, event_day = %s
""", [
event_code, event_name, start_datetime, event_date,
event_name, start_datetime, event_date
])
self.stdout.write(f'イベント "{event_code}" を転送しました')
# 4. Locationテーブルからcheckpoint_tableへの転送
self.stdout.write('checkpointデータを転送中...')
locations = Location.objects.filter(group=event.event_name)
# Print the number of location records
location_count = locations.count()
self.stdout.write(f'checkpointデータ: {location_count}件を転送中...')
for location in locations:
# Display the cp_number, event_code, and colabo_company_memo
# self.stdout.write(f' CP: {location.cp}, Event: {event.event_name}, Memo: {"" or "(empty)"}')
cursor.execute("""
INSERT INTO checkpoint_table
(cp_number, event_code, cp_name, latitude, longitude, photo_point, buy_point, sample_photo, colabo_company_memo)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (cp_number, event_code,colabo_company_memo) DO UPDATE
SET cp_name = %s, latitude = %s, longitude = %s, photo_point = %s, buy_point = %s, sample_photo = %s, colabo_company_memo = %s
""", [
location.cp, event.event_name, location.location_name,
location.latitude, location.longitude, location.checkin_point,
location.buy_point, location.photos, '',
location.location_name, location.latitude, location.longitude,
location.checkin_point, location.buy_point, location.photos, ''
])
# If cp=-1, insert another record with cp=-2
if location.cp == -1:
cursor.execute("""
INSERT INTO checkpoint_table
(cp_number, event_code, cp_name, latitude, longitude, photo_point, buy_point, sample_photo, colabo_company_memo)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (cp_number, event_code,colabo_company_memo) DO UPDATE
SET cp_name = %s, latitude = %s, longitude = %s, photo_point = %s, buy_point = %s, sample_photo = %s, colabo_company_memo = %s
""", [
-2, event.event_name, location.location_name,
location.latitude, location.longitude, location.checkin_point,
location.buy_point, location.photos, '',
location.location_name, location.latitude, location.longitude,
location.checkin_point, location.buy_point, location.photos, ''
])
# 5. user_tableへの転送をスキップ
self.stdout.write('ユーザーデータの転送をスキップします')
# 6. Teamテーブルからteam_tableへの転送修正版
entries = Entry.objects.filter(event__event_name=event.event_name)
# Print the number of team entries
entry_count = entries.count()
self.stdout.write(f'チームデータ: {entry_count}件を転送中...')
for entry in entries:
team = entry.team
# 「お試し」かどうかを判定
is_trial = False
if hasattr(entry, 'zekken_label') and entry.zekken_label and 'お試し' in entry.zekken_label:
is_trial = True
# パスワード処理
leader = team.owner
user_password = ''
# リーダーが新規登録のユーザーかどうかを確認
if hasattr(leader, '_password') and leader._password:
user_password = leader._password
else:
# 既存のユーザーの場合はパスワードを空にする
user_password = '(existing)'
cursor.execute("""
INSERT INTO team_table
(zekken_number, event_code, team_name, class_name, password, trial)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (zekken_number, event_code) DO UPDATE
SET team_name = %s, class_name = %s, password = %s, trial = %s
""", [
entry.zekken_label, event.event_name, team.team_name,
team.category.category_name, user_password, is_trial,
team.team_name, team.category.category_name,
user_password, is_trial
])
self.stdout.write(self.style.SUCCESS('gifurogeデータベースへの転送が完了しました'))
except Exception as e:
self.stdout.write(self.style.ERROR(f'転送中にエラーが発生しました: {str(e)}'))
raise
def transfer_to_gifuroge_old(self, event):
"""rogdbからgifurogeデータベースへデータを転送"""
self.stdout.write('gifurogeデータベースへのデータ転送を開始します')
with connections['gifuroge'].cursor() as cursor:
try:
# 4. Locationテーブルからcheckpoint_tableへの転送
self.stdout.write('checkpointデータを転送中...')
locations = Location.objects.filter(event=event)
for location in locations:
cursor.execute("""
INSERT INTO checkpoint_table
(checkpoint_id, checkpoint_name, point_value, latitude, longitude, event_code)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (checkpoint_id, event_code) DO UPDATE
SET checkpoint_name = %s, point_value = %s, latitude = %s, longitude = %s
""", [
location.id, location.name, location.point_value,
location.latitude, location.longitude, event.event_name,
location.name, location.point_value,
location.latitude, location.longitude
])
# 5. CustomUserテーブルからuser_tableへの転送
self.stdout.write('ユーザーデータを転送中...')
entries = Entry.objects.filter(event=event)
users = CustomUser.objects.filter(entry__event=event).distinct()
for user in users:
cursor.execute("""
INSERT INTO user_table
(user_id, name, email, event_code)
VALUES (%s, %s, %s, %s)
ON CONFLICT (user_id, event_code) DO UPDATE
SET name = %s, email = %s
""", [
user.id, user.name, user.email, event.event_name,
user.name, user.email
])
# 6. Teamテーブルからteam_tableへの転送trialフィールドを追加
self.stdout.write('チームデータを転送中...')
teams = Team.objects.filter(entry__event=event).distinct()
for team in teams:
entry = Entry.objects.get(team=team, event=event)
# CSVで「お試し」フラグがあったかどうかを確認
# ここでは仮にTeamモデルから判断できないので別途Entry.zekken_labelとの比較などで判断
is_trial = False
try:
# お試しフラグの判定ロジックを実装
# 実際のデータ構造に基づいて修正が必要
entries_with_trial = Entry.objects.filter(
team=team, event=event
).first()
if entries_with_trial:
# ここでお試しフラグを設定する実装が必要
# 例えば特定のゼッケンラベルパターンでお試し判定など
pass
except:
is_trial = False
cursor.execute("""
INSERT INTO team_table
(team_id, team_name, class_type, zekken_number, leader_id, event_code, trial)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (team_id, event_code) DO UPDATE
SET team_name = %s, class_type = %s, zekken_number = %s, leader_id = %s, trial = %s
""", [
team.id, team.team_name, entry.class_type, entry.zekken_number,
team.leader.id, event.event_name, is_trial,
team.team_name, entry.class_type, entry.zekken_number, team.leader.id, is_trial
])
self.stdout.write(self.style.SUCCESS('gifurogeデータベースへの転送が完了しました'))
except Exception as e:
self.stdout.write(self.style.ERROR(f'転送中にエラーが発生しました: {str(e)}'))
raise