Files
rogaining_srv/rog/management/commands/import_event_data.py

645 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import csv
import os
import logging
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction, connections
from django.utils import timezone
from django.conf import settings
from django.contrib.auth import get_user_model
from rog.models import Member, Team, NewEvent2, Entry, Location2025,NewCategory #, GpsLog
CustomUser = get_user_model()
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = 'CSVファイルからイベント参加者情報をインポートし、rogdbとgifurogeデータベースに登録します。'
def add_arguments(self, parser):
parser.add_argument('csv_file', type=str, help='インポートするCSVファイルのパス')
parser.add_argument('event_code', type=str, help='登録するイベントコード')
def handle(self, *args, **options):
csv_file = options['csv_file']
event_code = options['event_code']
# 処理結果を保存するリストを初期化
self.processed_entries = []
if not os.path.exists(csv_file):
raise CommandError(f'ファイルが見つかりません: {csv_file}')
try:
event = NewEvent2.objects.get(event_name=event_code)
except NewEvent2.DoesNotExist:
raise CommandError(f'イベントが見つかりません: {event_code}')
self.stdout.write(self.style.SUCCESS(f'イベント "{event.event_name}" のデータをインポートします'))
# CSVファイルを読み込み、rogdbデータベースに登録
with open(csv_file, 'r', encoding='utf-8') as file:
reader = csv.reader(file)
next(reader) # ヘッダー行をスキップ
with transaction.atomic():
for i, row in enumerate(reader, 1):
try:
self.process_entry(row, event)
except Exception as e:
self.stdout.write(self.style.ERROR(f'{i} のデータ処理中にエラーが発生しました: {str(e)}'))
self.stdout.write(self.style.WARNING(f'この行はスキップして続行します'))
# gifurogeデータベースへの転送
self.transfer_to_gifuroge(event)
# 結果をCSVファイルに出力
self.export_processed_entries(event_code)
self.stdout.write(self.style.SUCCESS('データのインポートが完了しました'))
def process_entry(self, row, event):
"""CSVの1行からエントリー情報を処理"""
try:
# 新しいCSVフォーマットに対応したインデックス
participation_time = row[0]
division = row[1]
is_trial = row[2].strip() == 'お試し' # 「お試し」フラグ
division_number = row[3]
team_name = row[4]
leader_name = row[5]
leader_kana = row[6]
leader_gender = row[7] # 新しく追加された性別フィールド
password = row[8] # インデックスが1つずれる
member_count = int(row[9]) # インデックスが1つずれる
zekken_label = row[10] # インデックスが1つずれる
zekken_number = row[11] # インデックスが1つずれる
leader_email = row[12] # インデックスが1つずれる
leader_birth_date = row[13] # インデックスが1つずれる
name_parts = leader_name.split(' ', 1)
lastname = name_parts[0]
firstname = name_parts[1] if len(name_parts) > 1 else ""
# 半角数字を全角数字に変換する関数
def to_fullwidth(s):
"""半角数字を全角数字に変換する"""
result = ""
for char in s:
if char.isdigit():
# 半角数字を全角数字に変換
result += chr(ord(char) + 0xFEE0)
else:
result += char
return result
# 日付フォーマットを変換する関数
def format_date(date_str):
"""YYYY/MM/DD形式をYYYY-MM-DD形式に変換する"""
if not date_str:
return None
try:
# スラッシュやピリオドなどの区切り文字を処理
parts = None
if '/' in date_str:
parts = date_str.split('/')
elif '.' in date_str:
parts = date_str.split('.')
elif '-' in date_str:
# 既にハイフン形式の場合はそのまま返す
return date_str
if parts and len(parts) == 3:
year, month, day = parts
# 必要に応じて年を4桁に修正'91' → '1991'
if len(year) == 2:
if int(year) > 50: # 50より大きい場合は1900年代と仮定
year = f"19{year}"
else:
year = f"20{year}"
# 月と日が1桁の場合は2桁に揃える
month = month.zfill(2)
day = day.zfill(2)
return f"{year}-{month}-{day}"
return date_str # 変換できない場合は元の文字列を返す
except Exception as e:
self.stdout.write(self.style.ERROR(f'日付変換エラー: {date_str} - {str(e)}'))
return None
# 代表者の生年月日をフォーマット変換
formatted_leader_birth_date = format_date(leader_birth_date)
# 参加時間を全角に変換
fullwidth_participation_time = to_fullwidth(participation_time)
# 代表者の性別を設定Femaleならtrue、それ以外ならfalse
is_female = leader_gender.strip().lower() == "female"
# 1. CustomUserを検索または作成
new_user = False
password_to_save = ""
try:
leader = CustomUser.objects.get(email=leader_email)
# 既存ユーザー
password_to_save = "(既存)"
# 既存ユーザーの性別情報を更新
if leader.female != is_female:
leader.female = is_female
leader.save()
self.stdout.write(f'既存ユーザーを代表者として使用します: {leader_email} (性別: {leader_gender})')
except CustomUser.DoesNotExist:
# 新規ユーザーの場合
# leader_nameを空白で分離
leader = CustomUser.objects.create_user(
email=leader_email,
password=password,
firstname=firstname,
lastname=lastname,
date_of_birth=formatted_leader_birth_date,
group=event.event_name,
female=is_female, # 性別を設定
is_active=True
)
password_to_save = password # 新規ユーザーの場合は実際のパスワード
self.stdout.write(f'代表者を新規作成しました: {leader_email} (パスワード: {password}, 性別: {leader_gender})')
# 処理した代表者情報をリストに追加
self.processed_entries.append({
'leader_name': leader_name,
'team_name': team_name,
'email': leader_email,
'password': password_to_save
})
# CSVの参加部門から対応するカテゴリーを検索
# division + "-" + participation_time + "時間" の形式で検索
category_name_with_time = f"{division}-{fullwidth_participation_time}時間"
try:
category = NewCategory.objects.get(category_name=category_name_with_time)
except NewCategory.DoesNotExist:
# カテゴリーが見つからない場合のエラーハンドリング
self.stdout.write(self.style.ERROR(f'カテゴリーが見つかりません: {category_name_with_time}'))
raise CommandError(f'カテゴリー "{category_name_with_time}" が存在しません。先にカテゴリーを作成してください。')
# 2. チームの作成とメンバーの登録
team = Team.objects.create(
team_name=team_name,
owner=leader,
category=category # eventではなくcategoryを使用
)
# メンバーの登録(代表者を含む)
Member.objects.create(
team=team,
user=leader,
firstname=firstname,
lastname=lastname,
date_of_birth=formatted_leader_birth_date,
female=is_female, # 性別を設定
is_temporary=False # 代表者は一時的なメンバーではない
)
# 追加メンバーの登録CSVの14列目以降に存在する場合
for i in range(1, min(member_count, 5) + 1): # 最大5人まで
# 各メンバーは3つのフィールド名前、生年月日、性別を持つ
member_name_idx = 14 + (i-1) * 3
member_birth_idx = member_name_idx + 1
member_gender_idx = member_name_idx + 2 # 性別のインデックス
if len(row) > member_name_idx and row[member_name_idx]:
member_name = row[member_name_idx]
member_birth = row[member_birth_idx] if len(row) > member_birth_idx else None
# メンバーの生年月日もフォーマット変換
formatted_member_birth = format_date(member_birth) if member_birth else None
member_gender = row[member_gender_idx] if len(row) > member_gender_idx else "Male"
member_is_female = member_gender.strip().lower() == "female"
# 名前を分割
name_parts = member_name.split(' ', 1)
lastname = name_parts[0]
firstname = name_parts[1] if len(name_parts) > 1 else ""
# メンバー用のユーザー作成(メールアドレスは一時的なもの)
temp_email = f"{team_name.replace(' ', '_')}_{i}@example.com"
# 既存のメンバーチェック
try:
member_user = CustomUser.objects.filter(email=temp_email).first()
if not member_user:
raise CustomUser.DoesNotExist()
# 既存ユーザーの性別情報を更新
if member_user.female != member_is_female:
member_user.female = member_is_female
member_user.save()
except CustomUser.DoesNotExist:
import secrets
member_user = CustomUser.objects.create_user(
email=temp_email,
password=secrets.token_urlsafe(12), # メンバーにはランダムパスワード
firstname=firstname,
lastname=lastname,
date_of_birth=formatted_member_birth,
female=member_is_female, # 性別を設定
is_active=False # メンバーは直接ログインしないのでFalse
)
Member.objects.create(
team=team,
user=member_user,
firstname=firstname,
lastname=lastname,
date_of_birth=formatted_member_birth,
female=member_is_female, # 性別を設定
is_temporary=True # 追加メンバーは一時的なメンバーとして設定
)
self.stdout.write(f' => メンバーを追加しました: {member_name} (性別: {member_gender})')
# 3. エントリーの作成
# イベントの実施日をエントリーに割り当て
# イベントの開始日時から日付部分のみを取得
entry_date = event.start_datetime.date() if event.start_datetime else None # イベントの実施日
entry = Entry.objects.create(
team=team,
event=event,
date=entry_date, # イベントの実施日をエントリーに割り当て
zekken_number=zekken_number,
zekken_label=zekken_label,
category=category,
owner=leader,
is_trial=is_trial # お試しフラグを設定
)
# エントリー登録完了のログ出力
self.stdout.write(f'チーム "{team_name}" をイベント "{event.event_name}" に登録しました (ゼッケン: {zekken_label}, お試し: {is_trial})')
except Exception as e:
self.stdout.write(self.style.ERROR(f'エラーが発生しました: {str(e)}'))
# エラーが発生してもスキップして続行するため、例外を再スローしない
except Exception as e:
self.stdout.write(self.style.ERROR(f'エラーが発生しました: {str(e)}'))
raise
def export_processed_entries(self, event_code):
"""処理した代表者情報をCSVファイルに出力"""
if not self.processed_entries:
self.stdout.write('処理したエントリーがありません')
return
output_file = f"{event_code}_leaders_{timezone.now().strftime('%Y%m%d_%H%M%S')}.csv"
with open(output_file, 'w', encoding='utf-8', newline='') as csvfile:
fieldnames = ['代表者名', 'チーム名', 'メールアドレス', 'パスワード']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for entry in self.processed_entries:
writer.writerow({
'代表者名': entry['leader_name'],
'チーム名': entry['team_name'],
'メールアドレス': entry['email'],
'パスワード': entry['password']
})
self.stdout.write(self.style.SUCCESS(f'代表者情報をCSVファイルに出力しました: {output_file}'))
def process_entry_old(self, row, event):
"""CSVの1行からエントリー情報を処理"""
self.stdout.write(self.style.SUCCESS(f'イベント "{event.event_name}", row="{row}"'))
try:
# 新しいCSVフォーマットに対応したインデックス
participation_time = row[0]
division = row[1]
is_trial = row[2].strip() == 'お試し' # 「お試し」フラグ
division_number = row[3]
team_name = row[4]
leader_name = row[5]
leader_kana = row[6]
password = row[7] # 新しいフィールド:パスワード
member_count = int(row[8])
zekken_label = row[9] # ゼッケンラベル
zekken_number = row[10] # ナンバー
leader_email = row[11]
leader_birth_date = row[12]
# 1. CustomUserを検索または作成
try:
leader = CustomUser.objects.get(email=leader_email)
self.stdout.write(f'既存ユーザーを代表者として使用します: {leader_email}')
except CustomUser.DoesNotExist:
# 新規ユーザーの場合はランダムパスワードを生成
import secrets
# leader_nameを空白で分離
name_parts = leader_name.split(' ', 1)
lastname = name_parts[0]
firstname = name_parts[1] if len(name_parts) > 1 else ""
leader = CustomUser.objects.create_user(
email=leader_email,
password=password,
firstname=firstname, # 名前の後半部分
lastname=lastname, # 名前の前半部分
birth_date=leader_birth_date,
is_active=True
)
self.stdout.write(f'代表者を新規作成しました: {leader_email} (パスワード: {password})')
# CSVの参加部門から対応するカテゴリーを検索
try:
category = NewCategory.objects.get(category_name=division)
self.stdout.write(f'カテゴリーを見つけました: {category.category_name}')
except NewCategory.DoesNotExist:
# カテゴリーが見つからない場合のエラーハンドリング
self.stdout.write(self.style.ERROR(f'カテゴリーが見つかりません: {division}'))
raise CommandError(f'カテゴリー "{division}" が存在しません。先にカテゴリーを作成してください。')
# 2. チームの作成とメンバーの登録
team = Team.objects.create(
team_name=team_name,
owner=leader,
category=category
)
Member.objects.create(
team=team,
user=leader,
is_leader=True,
firstname=leader.firstname,
lastname=leader.lastname,
date_of_birth=leader.date_of_birth,
is_temporary=False # 代表者は一時的なメンバーではない
)
# 追加メンバーの登録CSVの13列目以降に存在する場合
for i in range(1, min(member_count, 5) + 1): # 最大5人まで
member_name_idx = 13 + (i-1) * 2
member_birth_idx = member_name_idx + 1
if len(row) > member_name_idx and row[member_name_idx]:
member_name = row[member_name_idx]
member_birth = row[member_birth_idx] if len(row) > member_birth_idx else None
# メンバー用のユーザー作成(メールアドレスは一時的なもの)
temp_email = f"{team_name.replace(' ', '_')}_{i}@example.com"
# 既存のメンバーチェック
try:
member_user = CustomUser.objects.filter(name=member_name).first()
if not member_user:
raise CustomUser.DoesNotExist()
except CustomUser.DoesNotExist:
import secrets
member_user = CustomUser.objects.create_user(
email=temp_email,
password=secrets.token_urlsafe(12), # メンバーにはランダムパスワード
name=member_name,
birth_date=member_birth,
is_active=False # メンバーは直接ログインしないのでFalse
)
# 名前を分割(姓と名の分離)
name_parts = member_name.split(' ', 1)
firstname = name_parts[1] if len(name_parts) > 1 else ""
lastname = name_parts[0]
Member.objects.create(
team=team,
user=member_user,
is_leader=False,
firstname=firstname,
lastname=lastname,
date_of_birth=member_birth,
is_temporary=True # 追加メンバーは一時的なメンバーとして設定
)
# 3. エントリーの作成
entry = Entry.objects.create(
team=team,
event=event,
zekken_number=zekken_number,
zekken_label=zekken_label, # 新しいフィールドに設定
class_type=division,
leader=leader,
)
# スタート記録の追加
#GpsLog.record_start(entry)
self.stdout.write(f'チーム "{team_name}" を登録しました (お試し: {is_trial})')
except Exception as e:
self.stdout.write(self.style.ERROR(f'エラーが発生しました: {str(e)}'))
raise
def transfer_to_gifuroge(self, event):
"""rogdbからgifurogeデータベースへデータを転送"""
self.stdout.write('gifurogeデータベースへのデータ転送を開始します')
with connections['gifuroge'].cursor() as cursor:
try:
# 1. Event data transfer from NewEvent2 to event_table
self.stdout.write('イベントデータを転送中...')
# Extract fields from the event object
event_code = event.event_name
event_name = event.event_description or event.event_name
start_datetime = event.start_datetime
# Format start_datetime to get only the date part
event_date = start_datetime.date() if start_datetime else None
cursor.execute("""
INSERT INTO event_table
(event_code, event_name, start_time, event_day)
VALUES (%s, %s, %s, %s)
ON CONFLICT (event_code) DO UPDATE
SET event_name = %s, start_time = %s, event_day = %s
""", [
event_code, event_name, start_datetime, event_date,
event_name, start_datetime, event_date
])
self.stdout.write(f'イベント "{event_code}" を転送しました')
# 4. Locationテーブルからcheckpoint_tableへの転送
self.stdout.write('checkpointデータを転送中...')
locations = Location2025.objects.filter(group=event.event_name)
# Print the number of location records
location_count = locations.count()
self.stdout.write(f'checkpointデータ: {location_count}件を転送中...')
for location in locations:
# Display the cp_number, event_code, and colabo_company_memo
# self.stdout.write(f' CP: {location.cp}, Event: {event.event_name}, Memo: {"" or "(empty)"}')
cursor.execute("""
INSERT INTO checkpoint_table
(cp_number, event_code, cp_name, latitude, longitude, photo_point, buy_point, sample_photo, colabo_company_memo)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (cp_number, event_code,colabo_company_memo) DO UPDATE
SET cp_name = %s, latitude = %s, longitude = %s, photo_point = %s, buy_point = %s, sample_photo = %s, colabo_company_memo = %s
""", [
location.cp, event.event_name, location.location_name,
location.latitude, location.longitude, location.checkin_point,
location.buy_point, location.photos, '',
location.location_name, location.latitude, location.longitude,
location.checkin_point, location.buy_point, location.photos, ''
])
# If cp=-1, insert another record with cp=-2
if location.cp == -1:
cursor.execute("""
INSERT INTO checkpoint_table
(cp_number, event_code, cp_name, latitude, longitude, photo_point, buy_point, sample_photo, colabo_company_memo)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (cp_number, event_code,colabo_company_memo) DO UPDATE
SET cp_name = %s, latitude = %s, longitude = %s, photo_point = %s, buy_point = %s, sample_photo = %s, colabo_company_memo = %s
""", [
-2, event.event_name, location.location_name,
location.latitude, location.longitude, location.checkin_point,
location.buy_point, location.photos, '',
location.location_name, location.latitude, location.longitude,
location.checkin_point, location.buy_point, location.photos, ''
])
# 5. user_tableへの転送をスキップ
self.stdout.write('ユーザーデータの転送をスキップします')
# 6. Teamテーブルからteam_tableへの転送修正版
entries = Entry.objects.filter(event__event_name=event.event_name)
# Print the number of team entries
entry_count = entries.count()
self.stdout.write(f'チームデータ: {entry_count}件を転送中...')
for entry in entries:
team = entry.team
# 「お試し」かどうかを判定
is_trial = False
if hasattr(entry, 'zekken_label') and entry.zekken_label and 'お試し' in entry.zekken_label:
is_trial = True
# パスワード処理
leader = team.owner
user_password = ''
# リーダーが新規登録のユーザーかどうかを確認
if hasattr(leader, '_password') and leader._password:
user_password = leader._password
else:
# 既存のユーザーの場合はパスワードを空にする
user_password = '(existing)'
cursor.execute("""
INSERT INTO team_table
(zekken_number, event_code, team_name, class_name, password, trial)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (zekken_number, event_code) DO UPDATE
SET team_name = %s, class_name = %s, password = %s, trial = %s
""", [
entry.zekken_label, event.event_name, team.team_name,
team.category.category_name, user_password, is_trial,
team.team_name, team.category.category_name,
user_password, is_trial
])
self.stdout.write(self.style.SUCCESS('gifurogeデータベースへの転送が完了しました'))
except Exception as e:
self.stdout.write(self.style.ERROR(f'転送中にエラーが発生しました: {str(e)}'))
raise
def transfer_to_gifuroge_old(self, event):
"""rogdbからgifurogeデータベースへデータを転送"""
self.stdout.write('gifurogeデータベースへのデータ転送を開始します')
with connections['gifuroge'].cursor() as cursor:
try:
# 4. Locationテーブルからcheckpoint_tableへの転送
self.stdout.write('checkpointデータを転送中...')
locations = Location2025.objects.filter(event_id=event.id)
for location in locations:
cursor.execute("""
INSERT INTO checkpoint_table
(checkpoint_id, checkpoint_name, point_value, latitude, longitude, event_code)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (checkpoint_id, event_code) DO UPDATE
SET checkpoint_name = %s, point_value = %s, latitude = %s, longitude = %s
""", [
location.id, location.name, location.point_value,
location.latitude, location.longitude, event.event_name,
location.name, location.point_value,
location.latitude, location.longitude
])
# 5. CustomUserテーブルからuser_tableへの転送
self.stdout.write('ユーザーデータを転送中...')
entries = Entry.objects.filter(event=event)
users = CustomUser.objects.filter(entry__event=event).distinct()
for user in users:
cursor.execute("""
INSERT INTO user_table
(user_id, name, email, event_code)
VALUES (%s, %s, %s, %s)
ON CONFLICT (user_id, event_code) DO UPDATE
SET name = %s, email = %s
""", [
user.id, user.name, user.email, event.event_name,
user.name, user.email
])
# 6. Teamテーブルからteam_tableへの転送trialフィールドを追加
self.stdout.write('チームデータを転送中...')
teams = Team.objects.filter(entry__event=event).distinct()
for team in teams:
entry = Entry.objects.get(team=team, event=event)
# CSVで「お試し」フラグがあったかどうかを確認
# ここでは仮にTeamモデルから判断できないので別途Entry.zekken_labelとの比較などで判断
is_trial = False
try:
# お試しフラグの判定ロジックを実装
# 実際のデータ構造に基づいて修正が必要
entries_with_trial = Entry.objects.filter(
team=team, event=event
).first()
if entries_with_trial:
# ここでお試しフラグを設定する実装が必要
# 例えば特定のゼッケンラベルパターンでお試し判定など
pass
except:
is_trial = False
cursor.execute("""
INSERT INTO team_table
(team_id, team_name, class_type, zekken_number, leader_id, event_code, trial)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (team_id, event_code) DO UPDATE
SET team_name = %s, class_type = %s, zekken_number = %s, leader_id = %s, trial = %s
""", [
team.id, team.team_name, entry.class_type, entry.zekken_number,
team.leader.id, event.event_name, is_trial,
team.team_name, entry.class_type, entry.zekken_number, team.leader.id, is_trial
])
self.stdout.write(self.style.SUCCESS('gifurogeデータベースへの転送が完了しました'))
except Exception as e:
self.stdout.write(self.style.ERROR(f'転送中にエラーが発生しました: {str(e)}'))
raise