Files
rogaining_srv/rog/management/commands/migrate_mobserver_data.py

355 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
from django.core.management.base import BaseCommand
from django.db import connections, transaction, connection
from django.db.models import Q
from django.contrib.gis.geos import Point
from django.utils import timezone
from rog.models import Team, NewEvent2, Checkpoint, GpsCheckin, GpsLog, Entry
from datetime import datetime
import psycopg2
from psycopg2.extras import execute_values
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = 'MobServerデータベースからDjangoモデルにデータを移行します'
def add_arguments(self, parser):
parser.add_argument(
'--dry-run',
action='store_true',
dest='dry_run',
help='実際の移行を行わず、処理内容のみを表示します',
)
parser.add_argument(
'--batch-size',
type=int,
default=100,
help='バッチサイズ(デフォルト: 100'
)
def handle(self, *args, **options):
dry_run = options['dry_run']
batch_size = options['batch_size']
if dry_run:
self.stdout.write(self.style.WARNING('ドライランモードで実行中...'))
# MobServerデータベース接続を取得
mobserver_conn = connections['mobserver']
try:
with transaction.atomic():
self.migrate_events(mobserver_conn, dry_run, batch_size)
self.migrate_teams(mobserver_conn, dry_run, batch_size)
self.migrate_checkpoints(mobserver_conn, dry_run, batch_size)
self.migrate_gps_logs(mobserver_conn, dry_run, batch_size)
if dry_run:
raise transaction.TransactionManagementError("ドライランのためロールバックします")
except transaction.TransactionManagementError:
if dry_run:
self.stdout.write(self.style.SUCCESS('ドライランが完了しました(変更は保存されていません)'))
else:
raise
except Exception as e:
logger.error(f'データ移行エラー: {e}')
self.stdout.write(self.style.ERROR(f'エラーが発生しました: {e}'))
raise
def migrate_events(self, conn, dry_run, batch_size):
"""イベント情報を移行"""
self.stdout.write('イベント情報を移行中...')
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM event_table")
rows = cursor.fetchall()
events_migrated = 0
batch_data = []
for row in rows:
(event_code, event_name, start_time, event_day) = row
# start_timeのデータクリーニング
cleaned_start_time = start_time
if start_time and isinstance(start_time, str):
# セミコロンをコロンに置換
cleaned_start_time = start_time.replace(';', ':')
# タイムゾーン情報を含む場合は時間部分のみ抽出
if '+' in cleaned_start_time or 'T' in cleaned_start_time:
try:
from datetime import datetime
dt = datetime.fromisoformat(cleaned_start_time.replace('Z', '+00:00'))
cleaned_start_time = dt.strftime('%H:%M:%S')
except:
cleaned_start_time = None
if not dry_run:
batch_data.append(NewEvent2(
event_code=event_code,
event_name=event_name,
event_day=event_day,
start_time=cleaned_start_time,
))
events_migrated += 1
# バッチ処理
if len(batch_data) >= batch_size:
if not dry_run:
NewEvent2.objects.bulk_create(batch_data, ignore_conflicts=True)
batch_data = []
# 残りのデータを処理
if batch_data and not dry_run:
NewEvent2.objects.bulk_create(batch_data, ignore_conflicts=True)
self.stdout.write(f'{events_migrated}件のイベントを移行しました')
def migrate_teams(self, conn, dry_run, batch_size):
"""チーム情報を移行"""
self.stdout.write('チーム情報を移行中...')
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM team_table")
rows = cursor.fetchall()
teams_migrated = 0
batch_data = []
for row in rows:
(zekken_number, event_code, team_name, class_name, password, trial) = row
# 対応するイベントを取得
try:
event = NewEvent2.objects.get(event_code=event_code)
except NewEvent2.DoesNotExist:
self.stdout.write(self.style.WARNING(f' 警告: イベント {event_code} が見つかりません。スキップします。'))
continue
if not dry_run:
batch_data.append(Team(
zekken_number=zekken_number,
team_name=team_name,
event=event,
class_name=class_name,
password=password,
trial=trial,
))
teams_migrated += 1
# バッチ処理
if len(batch_data) >= batch_size:
if not dry_run:
Team.objects.bulk_create(batch_data, ignore_conflicts=True)
batch_data = []
# 残りのデータを処理
if batch_data and not dry_run:
Team.objects.bulk_create(batch_data, ignore_conflicts=True)
self.stdout.write(f'{teams_migrated}件のチームを移行しました')
def migrate_checkpoints(self, conn, dry_run, batch_size):
"""チェックポイント情報を移行"""
self.stdout.write('チェックポイント情報を移行中...')
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM checkpoint_table")
rows = cursor.fetchall()
checkpoints_migrated = 0
batch_data = []
for row in rows:
(cp_number, event_code, cp_name, latitude, longitude,
photo_point, buy_point, sample_photo, colabo_company_memo) = row
# 対応するイベントを取得
try:
event = NewEvent2.objects.get(event_code=event_code)
except NewEvent2.DoesNotExist:
continue
# 位置情報の処理
location = None
if latitude is not None and longitude is not None:
try:
location = Point(longitude, latitude) # Pointは(longitude, latitude)の順序
except (ValueError, AttributeError):
pass
if not dry_run:
batch_data.append(Checkpoint(
cp_number=cp_number,
event=event,
cp_name=cp_name,
location=location,
photo_point=photo_point or 0,
buy_point=buy_point or 0,
sample_photo=sample_photo,
colabo_company_memo=colabo_company_memo,
))
checkpoints_migrated += 1
# バッチ処理
if len(batch_data) >= batch_size:
if not dry_run:
Checkpoint.objects.bulk_create(batch_data, ignore_conflicts=True)
batch_data = []
# 残りのデータを処理
if batch_data and not dry_run:
Checkpoint.objects.bulk_create(batch_data, ignore_conflicts=True)
self.stdout.write(f'{checkpoints_migrated}件のチェックポイントを移行しました')
def migrate_gps_logs(self, conn, dry_run, batch_size):
"""GPS位置情報を移行"""
print('GPS位置情報を移行中...')
# チームとイベントのマッピングを作成
team_to_event_map = {}
for team in Team.objects.select_related('event'):
if team.event: # eventがNoneでないことを確認
team_to_event_map[team.zekken_number] = team.event.id
# チェックポイントのマッピングを作成
checkpoint_id_map = {}
for checkpoint in Checkpoint.objects.select_related('event'):
if checkpoint.event: # eventがNoneでないことを確認
key = (checkpoint.event.event_code, checkpoint.cp_number)
checkpoint_id_map[key] = checkpoint.id
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM gps_information")
rows = cursor.fetchall()
logs_migrated = 0
batch_data = []
for row in rows:
(serial_number, zekken_number, event_code, cp_number,
image_address, goal_time, late_point, create_at, create_user,
update_at, update_user, buy_flag, minus_photo_flag, colabo_company_memo) = row
# 対応するチームを取得
try:
event = NewEvent2.objects.get(event_code=event_code)
team = Team.objects.get(zekken_number=zekken_number, event=event)
# teamが存在し、eventも存在することを確認
if not team or not team.event:
continue
except (NewEvent2.DoesNotExist, Team.DoesNotExist):
continue
# 対応するチェックポイントを取得(存在する場合)
checkpoint = None
if cp_number is not None and cp_number != -1:
try:
checkpoint = Checkpoint.objects.get(cp_number=cp_number, event=event)
except Checkpoint.DoesNotExist:
pass
# checkin_timeの設定必須フィールド
checkin_time = timezone.now() # デフォルト値
if goal_time:
try:
# goal_timeはHH:MM形式と仮定
from datetime import datetime, time
parsed_time = datetime.strptime(goal_time, '%H:%M').time()
if create_at:
checkin_time = timezone.make_aware(datetime.combine(create_at.date(), parsed_time))
else:
checkin_time = timezone.make_aware(datetime.combine(datetime.now().date(), parsed_time))
except:
checkin_time = timezone.make_aware(create_at) if create_at else timezone.now()
elif create_at:
checkin_time = timezone.make_aware(create_at) if timezone.is_naive(create_at) else create_at
if not dry_run:
# GpsCheckinテーブル用のデータ
batch_data.append({
'event_code': event_code,
'zekken': zekken_number,
'serial_number': serial_number,
'cp_number': cp_number or 0,
'lat': None, # 実際のMobServerデータベースから取得
'lng': None, # 実際のMobServerデータベースから取得
'checkin_time': checkin_time,
'record_time': timezone.make_aware(create_at) if create_at and timezone.is_naive(create_at) else (create_at or timezone.now()),
'location': "", # PostGISポイントは後で設定
'mobserver_id': serial_number,
'event_id': team_to_event_map.get(zekken_number),
'team_id': team.id,
'checkpoint_id': checkpoint.id if checkpoint else None
})
logs_migrated += 1
# バッチ処理
if len(batch_data) >= batch_size:
if not dry_run:
self.bulk_insert_gps_logs(batch_data)
batch_data = []
# 残りのデータを処理
if batch_data and not dry_run:
self.bulk_insert_gps_logs(batch_data)
print(f'{logs_migrated}件のGPS位置情報を移行しました')
def bulk_insert_gps_logs(self, batch_data):
"""
GpsCheckinテーブルに直接SQLを使って挿入
"""
if not batch_data:
return
with connection.cursor() as cursor:
# DjangoのGpsCheckinテーブルに挿入
insert_sql = """
INSERT INTO rog_gpscheckin (
event_code, zekken, serial_number, cp_number, lat, lng,
checkin_time, record_time, location, mobserver_id,
event_id, team_id, checkpoint_id
) VALUES %s
ON CONFLICT DO NOTHING
"""
# locationフィールドを除外してバリューを準備
clean_values = []
for data in batch_data:
# lat/lngがある場合はPostGISポイントを作成、ない場合はNULL
if data['lat'] is not None and data['lng'] is not None:
location_point = f"ST_GeomFromText('POINT({data['lng']} {data['lat']})', 4326)"
else:
location_point = None
clean_values.append((
data['event_code'],
data['zekken'],
data['serial_number'],
data['cp_number'],
data['lat'],
data['lng'],
data['checkin_time'],
data['record_time'],
location_point,
data['mobserver_id'],
data['event_id'],
data['team_id'],
data['checkpoint_id']
))
try:
execute_values(cursor, insert_sql, clean_values, template=None, page_size=100)
except Exception as e:
logger.error(f"データ移行エラー: {e}")
raise