Files
rogaining_srv/Ruby-Django移行仕様書.md
2025-08-20 19:15:19 +09:00

41 KiB

Ruby Server → Docker/Django 移行仕様書

1. 概要

本文書は、現在Ruby Sinatra Serverで実装されているロゲイニング大会管理システムの機能を、Docker/Django環境に移行するための詳細仕様書です。

現在のシステム構成

  • Ruby Sinatra Server (MobServer_gifuroge.rb)
  • PostgreSQL with PostGIS
  • LINE Bot機能
  • AWS S3連携
  • Excel/PDF生成機能

移行先システム構成

  • Django REST Framework (Python)
  • Docker Compose 環境
  • PostgreSQL with PostGIS (継続)
  • Celery (非同期タスク処理)
  • Redis (キュー管理)

2. 移行方針

2.1 段階的移行アプローチ

Phase 1: 基盤移行 (4週間)

  • Docker環境の構築
  • Django基盤の整備
  • データベースモデルの移行

Phase 2: API移行 (6週間)

  • Ruby APIのDjango API化
  • 外部連携機能の移行
  • 認証・認可システムの構築

Phase 3: 機能拡張 (4週間)

  • 非同期処理の実装
  • 監視・ログ機能の強化
  • テストの実装

2.2 移行戦略

  • 並行運用: 移行期間中はRuby/Django両方を動作
  • データ同期: リアルタイムでデータを同期
  • 段階的切り替え: 機能単位で順次切り替え

3. Docker環境設計

3.1 Docker Compose構成

# docker-compose.yml
version: "3.9"

services:
  # PostgreSQL Database
  postgres-db:
    image: kartoza/postgis:14.0
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql
      - ./postgresql.conf:/etc/postgresql/postgresql.conf
      - ./init-db.sql:/docker-entrypoint-initdb.d/init-db.sql
    environment:
      - POSTGRES_USER=${POSTGRES_USER}
      - POSTGRES_PASS=${POSTGRES_PASS}
      - POSTGRES_DBNAME=${POSTGRES_DBNAME}
      - POSTGRES_MAX_CONNECTIONS=600
    restart: always
    networks:
      - rogaining-network

  # Redis for Celery
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    restart: always
    networks:
      - rogaining-network

  # Django Application
  django-app:
    build:
      context: .
      dockerfile: Dockerfile.django
    command: gunicorn config.wsgi:application --bind 0.0.0.0:8000 --workers 4
    volumes:
      - .:/app
      - static_volume:/app/static
      - media_volume:/app/media
      - uploads_volume:/app/uploads
    env_file:
      - .env
    restart: always
    depends_on:
      - postgres-db
      - redis
    networks:
      - rogaining-network

  # Celery Worker
  celery-worker:
    build:
      context: .
      dockerfile: Dockerfile.django
    command: celery -A config worker -l info
    volumes:
      - .:/app
      - media_volume:/app/media
      - uploads_volume:/app/uploads
    env_file:
      - .env
    restart: always
    depends_on:
      - postgres-db
      - redis
    networks:
      - rogaining-network

  # Celery Beat (スケジューラー)
  celery-beat:
    build:
      context: .
      dockerfile: Dockerfile.django
    command: celery -A config beat -l info
    volumes:
      - .:/app
    env_file:
      - .env
    restart: always
    depends_on:
      - postgres-db
      - redis
    networks:
      - rogaining-network

  # Nginx Reverse Proxy
  nginx:
    image: nginx:1.21
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - static_volume:/app/static
      - media_volume:/app/media
      - uploads_volume:/app/uploads
    ports:
      - "80:80"
      - "443:443"
    depends_on:
      - django-app
    restart: always
    networks:
      - rogaining-network

  # Flower (Celery監視)
  flower:
    build:
      context: .
      dockerfile: Dockerfile.django
    command: celery -A config flower
    ports:
      - "5555:5555"
    env_file:
      - .env
    depends_on:
      - redis
    networks:
      - rogaining-network

volumes:
  postgres_data:
  redis_data:
  static_volume:
  media_volume:
  uploads_volume:

networks:
  rogaining-network:
    driver: bridge

3.2 Dockerfile設計

# Dockerfile.django
FROM python:3.9-slim

# システムパッケージのインストール
RUN apt-get update && apt-get install -y \
    gdal-bin \
    libgdal-dev \
    libpq-dev \
    gcc \
    g++ \
    libmagickwand-dev \
    imagemagick \
    libreoffice \
    wkhtmltopdf \
    && rm -rf /var/lib/apt/lists/*

# 作業ディレクトリの設定
WORKDIR /app

# Python依存関係のインストール
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# アプリケーションコードのコピー
COPY . .

# 静的ファイルの収集
RUN python manage.py collectstatic --noinput

# アプリケーションポート
EXPOSE 8000

# エントリーポイント
CMD ["gunicorn", "config.wsgi:application", "--bind", "0.0.0.0:8000"]

4. Django アプリケーション設計

4.1 プロジェクト構造

rogaining_django/
├── config/
│   ├── __init__.py
│   ├── settings/
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── development.py
│   │   ├── production.py
│   │   └── testing.py
│   ├── urls.py
│   ├── wsgi.py
│   └── celery.py
├── apps/
│   ├── __init__.py
│   ├── rogaining/
│   │   ├── __init__.py
│   │   ├── models.py
│   │   ├── views.py
│   │   ├── serializers.py
│   │   ├── urls.py
│   │   └── tasks.py
│   ├── external_api/
│   │   ├── __init__.py
│   │   ├── views.py
│   │   ├── serializers.py
│   │   ├── services.py
│   │   └── tasks.py
│   ├── linebot/
│   │   ├── __init__.py
│   │   ├── views.py
│   │   ├── handlers.py
│   │   └── utils.py
│   ├── scoring/
│   │   ├── __init__.py
│   │   ├── models.py
│   │   ├── services.py
│   │   └── tasks.py
│   └── common/
│       ├── __init__.py
│       ├── utils.py
│       ├── exceptions.py
│       └── middleware.py
├── requirements/
│   ├── base.txt
│   ├── development.txt
│   └── production.txt
├── docker/
│   ├── django/
│   │   └── Dockerfile
│   └── nginx/
│       ├── Dockerfile
│       └── nginx.conf
└── docker-compose.yml

4.2 設定ファイル (settings/base.py)

# config/settings/base.py
import os
import environ
from pathlib import Path

# Build paths
BASE_DIR = Path(__file__).resolve().parent.parent.parent

# Environment variables
env = environ.Env(DEBUG=(bool, False))
environ.Env.read_env(env_file=os.path.join(BASE_DIR, '.env'))

# Security
SECRET_KEY = env('SECRET_KEY')
DEBUG = env('DEBUG')
ALLOWED_HOSTS = env.list('ALLOWED_HOSTS', default=[])

# Application definition
DJANGO_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'django.contrib.gis',
]

THIRD_PARTY_APPS = [
    'rest_framework',
    'rest_framework_gis',
    'corsheaders',
    'django_filters',
    'django_celery_beat',
    'django_celery_results',
]

LOCAL_APPS = [
    'apps.rogaining',
    'apps.external_api',
    'apps.linebot',
    'apps.scoring',
    'apps.common',
]

INSTALLED_APPS = DJANGO_APPS + THIRD_PARTY_APPS + LOCAL_APPS

MIDDLEWARE = [
    'corsheaders.middleware.CorsMiddleware',
    'django.middleware.security.SecurityMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
    'apps.common.middleware.LoggingMiddleware',
]

ROOT_URLCONF = 'config.urls'

# Database
DATABASES = {
    'default': {
        'ENGINE': 'django.contrib.gis.db.backends.postgis',
        'NAME': env('POSTGRES_DBNAME'),
        'USER': env('POSTGRES_USER'),
        'PASSWORD': env('POSTGRES_PASS'),
        'HOST': env('PG_HOST'),
        'PORT': env('PG_PORT'),
        'OPTIONS': {
            'options': '-c default_transaction_isolation=read_committed'
        },
    }
}

# Celery Configuration
CELERY_BROKER_URL = env('CELERY_BROKER_URL', default='redis://redis:6379/0')
CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND', default='redis://redis:6379/0')
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'Asia/Tokyo'
CELERY_ENABLE_UTC = True

# REST Framework
REST_FRAMEWORK = {
    'DEFAULT_AUTHENTICATION_CLASSES': [
        'rest_framework.authentication.SessionAuthentication',
        'rest_framework.authentication.TokenAuthentication',
    ],
    'DEFAULT_PERMISSION_CLASSES': [
        'rest_framework.permissions.IsAuthenticated',
    ],
    'DEFAULT_FILTER_BACKENDS': [
        'django_filters.rest_framework.DjangoFilterBackend',
    ],
    'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.PageNumberPagination',
    'PAGE_SIZE': 100,
}

# AWS S3 Configuration
AWS_ACCESS_KEY_ID = env('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = env('AWS_SECRET_ACCESS_KEY')
AWS_STORAGE_BUCKET_NAME = env('AWS_STORAGE_BUCKET_NAME')
AWS_S3_REGION_NAME = env('AWS_S3_REGION_NAME', default='us-west-2')
AWS_S3_CUSTOM_DOMAIN = f'{AWS_STORAGE_BUCKET_NAME}.s3.amazonaws.com'

# LINE Bot Configuration
LINE_CHANNEL_ACCESS_TOKEN = env('LINE_CHANNEL_ACCESS_TOKEN')
LINE_CHANNEL_SECRET = env('LINE_CHANNEL_SECRET')

# File Upload Settings
MEDIA_URL = '/media/'
MEDIA_ROOT = os.path.join(BASE_DIR, 'media')

# Static Files
STATIC_URL = '/static/'
STATIC_ROOT = os.path.join(BASE_DIR, 'static')

# Logging
LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'verbose': {
            'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
            'style': '{',
        },
    },
    'handlers': {
        'file': {
            'level': 'INFO',
            'class': 'logging.FileHandler',
            'filename': '/app/logs/django.log',
            'formatter': 'verbose',
        },
        'console': {
            'level': 'INFO',
            'class': 'logging.StreamHandler',
            'formatter': 'verbose',
        },
    },
    'loggers': {
        'django': {
            'handlers': ['file', 'console'],
            'level': 'INFO',
            'propagate': True,
        },
        'apps': {
            'handlers': ['file', 'console'],
            'level': 'INFO',
            'propagate': True,
        },
    },
}

4.3 Celery設定 (config/celery.py)

# config/celery.py
import os
from celery import Celery
from django.conf import settings

# Django設定モジュールの指定
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.production')

# Celeryアプリの作成
app = Celery('rogaining')

# Django設定の使用
app.config_from_object('django.conf:settings', namespace='CELERY')

# タスクの自動検出
app.autodiscover_tasks()

# 定期タスクの設定
app.conf.beat_schedule = {
    'generate-scoreboard-reports': {
        'task': 'apps.scoring.tasks.generate_scheduled_reports',
        'schedule': 300.0,  # 5分ごと
    },
    'cleanup-old-files': {
        'task': 'apps.common.tasks.cleanup_old_files',
        'schedule': 86400.0,  # 24時間ごと
    },
}

app.conf.timezone = 'Asia/Tokyo'

5. Ruby機能のDjango移行実装

5.1 チーム管理機能

Django Models (apps/rogaining/models.py)

# apps/rogaining/models.py
from django.contrib.gis.db import models
from django.contrib.auth.models import User
from django.utils import timezone

class Event(models.Model):
    event_code = models.CharField(max_length=50, unique=True)
    event_name = models.CharField(max_length=200)
    start_time = models.DateTimeField()
    end_time = models.DateTimeField()
    is_active = models.BooleanField(default=True)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    class Meta:
        db_table = 'event_table'

class Team(models.Model):
    zekken_number = models.CharField(max_length=50)
    event_code = models.ForeignKey(Event, on_delete=models.CASCADE, to_field='event_code')
    team_name = models.CharField(max_length=200)
    class_name = models.CharField(max_length=100)
    password = models.CharField(max_length=100)
    members_count = models.IntegerField(default=1)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    class Meta:
        db_table = 'team_table'
        unique_together = ['zekken_number', 'event_code']

class Checkpoint(models.Model):
    cp_number = models.IntegerField()
    event_code = models.ForeignKey(Event, on_delete=models.CASCADE, to_field='event_code')
    cp_name = models.CharField(max_length=200)
    latitude = models.FloatField()
    longitude = models.FloatField()
    location = models.PointField(geography=True, null=True, blank=True)
    photo_point = models.IntegerField(default=0)
    buy_point = models.IntegerField(default=0)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    class Meta:
        db_table = 'checkpoint_table'
        unique_together = ['cp_number', 'event_code']

class GpsInformation(models.Model):
    serial_number = models.AutoField(primary_key=True)
    zekken_number = models.CharField(max_length=50)
    event_code = models.ForeignKey(Event, on_delete=models.CASCADE, to_field='event_code')
    cp_number = models.IntegerField()
    image_address = models.URLField(max_length=500, null=True, blank=True)
    latitude = models.FloatField(null=True, blank=True)
    longitude = models.FloatField(null=True, blank=True)
    location = models.PointField(geography=True, null=True, blank=True)
    goal_time = models.TimeField(null=True, blank=True)
    late_point = models.IntegerField(default=0)
    buy_flag = models.BooleanField(default=False)
    minus_photo_flag = models.BooleanField(default=False)
    colabo_company_memo = models.CharField(max_length=100, null=True, blank=True)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    class Meta:
        db_table = 'gps_information'

class UserTable(models.Model):
    user = models.OneToOneField(User, on_delete=models.CASCADE)
    userid = models.CharField(max_length=100, unique=True)  # LINE User ID
    user_name = models.CharField(max_length=200)
    zekken_number = models.CharField(max_length=50, null=True, blank=True)
    event_code = models.CharField(max_length=50, null=True, blank=True)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    class Meta:
        db_table = 'user_table'

class ChatStatus(models.Model):
    userid = models.CharField(max_length=100, unique=True)
    status = models.CharField(max_length=50)
    memory = models.TextField(null=True, blank=True)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    class Meta:
        db_table = 'chat_status'

class ReportList(models.Model):
    zekken_number = models.CharField(max_length=50)
    event_code = models.CharField(max_length=50)
    report_address = models.URLField(max_length=500)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    class Meta:
        db_table = 'report_list'
        unique_together = ['zekken_number', 'event_code']

Django Views (apps/rogaining/views.py)

# apps/rogaining/views.py
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from django.contrib.gis.geos import Point
from django.db import transaction
from .models import Team, Event, Checkpoint, GpsInformation
from .serializers import TeamSerializer, CheckpointSerializer
from .tasks import generate_scoreboard_task
import logging

logger = logging.getLogger(__name__)

class TeamViewSet(viewsets.ModelViewSet):
    queryset = Team.objects.all()
    serializer_class = TeamSerializer
    filterset_fields = ['event_code', 'zekken_number']

    @action(detail=False, methods=['post'])
    def register_team(self, request):
        """チーム登録 (Ruby: zekkenAuthorization相当)"""
        try:
            zekken_number = request.data.get('zekken_number')
            password = request.data.get('password')
            
            team = Team.objects.filter(
                zekken_number=zekken_number,
                password=password
            ).first()
            
            if not team:
                return Response({
                    'status': 'ERROR',
                    'message': 'パスワードが一致しません'
                }, status=status.HTTP_401_UNAUTHORIZED)
            
            return Response({
                'status': 'OK',
                'zekken_number': team.zekken_number,
                'team_name': team.team_name,
                'event_code': team.event_code.event_code
            })
            
        except Exception as e:
            logger.error(f"Team registration error: {e}")
            return Response({
                'status': 'ERROR',
                'message': str(e)
            }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)

    @action(detail=False, methods=['post'])
    def update_team_name(self, request):
        """チーム名更新"""
        try:
            zekken_number = request.data.get('zekken_number')
            event_code = request.data.get('event_code')
            new_team_name = request.data.get('new_team_name')
            
            team = Team.objects.filter(
                zekken_number=zekken_number,
                event_code__event_code=event_code
            ).first()
            
            if not team:
                return Response({
                    'status': 'ERROR',
                    'message': 'チームが見つかりません'
                }, status=status.HTTP_404_NOT_FOUND)
            
            old_name = team.team_name
            team.team_name = new_team_name
            team.save()
            
            return Response({
                'status': 'OK',
                'message': 'チーム名を更新しました',
                'old_name': old_name,
                'new_name': new_team_name
            })
            
        except Exception as e:
            logger.error(f"Team name update error: {e}")
            return Response({
                'status': 'ERROR',
                'message': str(e)
            }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)

class CheckpointViewSet(viewsets.ModelViewSet):
    queryset = Checkpoint.objects.all()
    serializer_class = CheckpointSerializer
    filterset_fields = ['event_code', 'cp_number']

    @action(detail=False, methods=['post'])
    def register_cp(self, request):
        """チェックポイント登録 (Ruby: inputCPnumber相当)"""
        try:
            with transaction.atomic():
                zekken_number = request.data.get('zekken_number')
                event_code = request.data.get('event_code')
                cp_number = request.data.get('cp_number')
                image_address = request.data.get('image_address')
                latitude = request.data.get('latitude')
                longitude = request.data.get('longitude')
                
                # 重複チェック
                today = timezone.now().date()
                existing = GpsInformation.objects.filter(
                    zekken_number=zekken_number,
                    event_code__event_code=event_code,
                    cp_number=cp_number,
                    created_at__date=today
                ).exists()
                
                if existing:
                    return Response({
                        'status': 'ERROR',
                        'message': '既に登録済みです'
                    }, status=status.HTTP_409_CONFLICT)
                
                # GPS情報の保存
                location = Point(longitude, latitude) if latitude and longitude else None
                
                gps_info = GpsInformation.objects.create(
                    zekken_number=zekken_number,
                    event_code_id=event_code,
                    cp_number=cp_number,
                    image_address=image_address,
                    latitude=latitude,
                    longitude=longitude,
                    location=location,
                    colabo_company_memo="FC岐阜"  # 期間限定
                )
                
                return Response({
                    'status': 'OK',
                    'serial_number': gps_info.serial_number,
                    'message': 'チェックポイントを登録しました'
                })
                
        except Exception as e:
            logger.error(f"Checkpoint registration error: {e}")
            return Response({
                'status': 'ERROR',
                'message': str(e)
            }, status=status.HTTP_500_INTERNAL_SERVER_ERROR)

5.2 スコアボード生成機能

Celery Tasks (apps/scoring/tasks.py)

# apps/scoring/tasks.py
from celery import shared_task
from django.core.files.storage import default_storage
from django.conf import settings
import os
import subprocess
import boto3
from datetime import datetime
import logging

logger = logging.getLogger(__name__)

@shared_task
def generate_scoreboard_task(zekken_number, event_code, reprint=False):
    """スコアボード生成タスク (Ruby: makeScoreboard相当)"""
    try:
        # Excel生成
        excel_path = generate_excel_scoreboard(zekken_number, event_code)
        
        if not excel_path:
            return {'status': 'ERROR', 'message': 'データが見つかりません'}
        
        # PDF変換
        pdf_path = convert_excel_to_pdf(excel_path, reprint)
        
        # S3アップロード
        s3_url = upload_to_s3(pdf_path, event_code, zekken_number)
        
        # レポート情報の更新
        update_report_record(zekken_number, event_code, s3_url)
        
        # 一時ファイルの削除
        cleanup_temp_files([excel_path, pdf_path])
        
        return {
            'status': 'OK',
            'pdf_url': s3_url,
            'zekken_number': zekken_number
        }
        
    except Exception as e:
        logger.error(f"Scoreboard generation error: {e}")
        return {'status': 'ERROR', 'message': str(e)}

def generate_excel_scoreboard(zekken_number, event_code):
    """Excel形式のスコアボード生成"""
    from apps.rogaining.models import GpsInformation, Team
    from openpyxl import Workbook
    from openpyxl.drawing import Image as OpenpyxlImage
    
    try:
        # チーム情報の取得
        team = Team.objects.filter(
            zekken_number=zekken_number,
            event_code__event_code=event_code
        ).first()
        
        if not team:
            return None
        
        # GPS情報の取得
        gps_data = GpsInformation.objects.filter(
            zekken_number=zekken_number,
            event_code__event_code=event_code
        ).order_by('serial_number')
        
        if not gps_data:
            return None
        
        # Excelワークブックの作成
        wb = Workbook()
        ws = wb.active
        ws.title = "スコアボード"
        
        # ヘッダー情報
        ws['A1'] = f"チーム名: {team.team_name}"
        ws['A2'] = f"ゼッケン番号: {zekken_number}"
        ws['A3'] = f"クラス: {team.class_name}"
        
        # データ行のヘッダー
        headers = ['順番', 'CP番号', 'CP名', '得点', '写真']
        for col, header in enumerate(headers, 1):
            ws.cell(row=5, column=col, value=header)
        
        # データの挿入
        row = 6
        for gps_info in gps_data:
            if gps_info.cp_number > 0:  # ゴール以外
                checkpoint = get_checkpoint_info(gps_info.cp_number, event_code)
                
                ws.cell(row=row, column=1, value=gps_info.serial_number)
                ws.cell(row=row, column=2, value=gps_info.cp_number)
                ws.cell(row=row, column=3, value=checkpoint.get('cp_name', ''))
                ws.cell(row=row, column=4, value=checkpoint.get('photo_point', 0))
                
                # 画像の挿入
                if gps_info.image_address:
                    try:
                        image_path = download_image_from_s3(gps_info.image_address)
                        if image_path:
                            img = OpenpyxlImage(image_path)
                            img.width = 150
                            img.height = 105
                            ws.add_image(img, f'E{row}')
                    except Exception as e:
                        logger.warning(f"Image insertion failed: {e}")
                
                row += 1
        
        # ファイル保存
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        filename = f"{zekken_number}_scoreboard_{timestamp}.xlsx"
        filepath = os.path.join(settings.MEDIA_ROOT, 'temp', filename)
        
        os.makedirs(os.path.dirname(filepath), exist_ok=True)
        wb.save(filepath)
        
        return filepath
        
    except Exception as e:
        logger.error(f"Excel generation error: {e}")
        return None

def convert_excel_to_pdf(excel_path, reprint=False):
    """ExcelをPDFに変換"""
    try:
        pdf_path = excel_path.replace('.xlsx', '.pdf')
        
        # LibreOfficeを使用したPDF変換
        cmd = [
            'libreoffice',
            '--headless',
            '--convert-to', 'pdf',
            '--outdir', os.path.dirname(excel_path),
            excel_path
        ]
        
        subprocess.run(cmd, check=True, timeout=60)
        
        if os.path.exists(pdf_path):
            return pdf_path
        else:
            raise Exception("PDF conversion failed")
            
    except Exception as e:
        logger.error(f"PDF conversion error: {e}")
        raise

def upload_to_s3(file_path, event_code, zekken_number):
    """S3へのファイルアップロード"""
    try:
        s3_client = boto3.client(
            's3',
            aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
            aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
            region_name=settings.AWS_S3_REGION_NAME
        )
        
        filename = os.path.basename(file_path)
        s3_key = f"{event_code}/scoreboard/{filename}"
        
        s3_client.upload_file(
            file_path,
            settings.AWS_STORAGE_BUCKET_NAME,
            s3_key
        )
        
        s3_url = f"https://{settings.AWS_STORAGE_BUCKET_NAME}.s3.amazonaws.com/{s3_key}"
        return s3_url
        
    except Exception as e:
        logger.error(f"S3 upload error: {e}")
        raise

@shared_task
def generate_scheduled_reports():
    """定期的なレポート生成"""
    from apps.rogaining.models import Team
    
    try:
        # アクティブなイベントのチーム一覧を取得
        teams = Team.objects.filter(
            event_code__is_active=True
        ).values_list('zekken_number', 'event_code__event_code')
        
        for zekken_number, event_code in teams:
            # 非同期でスコアボード生成
            generate_scoreboard_task.delay(zekken_number, event_code)
        
        return f"Scheduled {len(teams)} report generations"
        
    except Exception as e:
        logger.error(f"Scheduled report generation error: {e}")
        return f"Error: {str(e)}"

5.3 LINE Bot機能

LINE Bot Views (apps/linebot/views.py)

# apps/linebot/views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from django.conf import settings
from linebot import LineBotApi, WebhookHandler
from linebot.exceptions import InvalidSignatureError
from linebot.models import TextMessage, TextSendMessage
from .handlers import MessageHandler
import logging

logger = logging.getLogger(__name__)

class LineBotCallbackView(APIView):
    """LINE Bot コールバック (Ruby: app.post '/callback_gifuroge'相当)"""
    
    def __init__(self):
        self.line_bot_api = LineBotApi(settings.LINE_CHANNEL_ACCESS_TOKEN)
        self.handler = WebhookHandler(settings.LINE_CHANNEL_SECRET)
        self.message_handler = MessageHandler(self.line_bot_api)
    
    def post(self, request):
        try:
            signature = request.META.get('HTTP_X_LINE_SIGNATURE')
            body = request.body.decode('utf-8')
            
            # 署名検証
            self.handler.handle(body, signature)
            
            # イベント処理
            events = self.line_bot_api.parse_events_from(body)
            
            for event in events:
                if hasattr(event, 'message') and isinstance(event.message, TextMessage):
                    self.message_handler.handle_text_message(event)
            
            return Response({'status': 'OK'})
            
        except InvalidSignatureError:
            logger.error("Invalid signature")
            return Response({'error': 'Invalid signature'}, 
                          status=status.HTTP_400_BAD_REQUEST)
        except Exception as e:
            logger.error(f"LINE Bot callback error: {e}")
            return Response({'error': str(e)}, 
                          status=status.HTTP_500_INTERNAL_SERVER_ERROR)

Message Handler (apps/linebot/handlers.py)

# apps/linebot/handlers.py
import re
import json
from datetime import datetime
from django.utils import timezone
from linebot.models import TextSendMessage
from apps.rogaining.models import UserTable, Team, ChatStatus
from apps.scoring.tasks import generate_scoreboard_task
import logging

logger = logging.getLogger(__name__)

class MessageHandler:
    def __init__(self, line_bot_api):
        self.line_bot_api = line_bot_api
    
    def handle_text_message(self, event):
        """テキストメッセージの処理 (Ruby: case event.type相当)"""
        try:
            user_id = event.source.user_id
            message_text = event.message.text
            reply_token = event.reply_token
            
            # ユーザー情報の取得
            user_info = self.get_or_create_user(user_id)
            
            # チャット状態の確認
            chat_status = self.get_chat_status(user_id)
            
            # メッセージの正規化 (Ruby: NKF.nkf相当)
            normalized_text = self.normalize_text(message_text)
            
            # コマンド処理
            response = self.process_command(user_info, chat_status, normalized_text)
            
            # 応答送信
            if response:
                self.line_bot_api.reply_message(
                    reply_token,
                    TextSendMessage(text=response)
                )
            
            # ログ記録
            self.log_chat(user_id, 'user', 'text', message_text)
            self.log_chat(user_id, 'bot', 'text', response)
            
        except Exception as e:
            logger.error(f"Message handling error: {e}")
            self.line_bot_api.reply_message(
                reply_token,
                TextSendMessage(text="システムエラーが発生しました。")
            )
    
    def process_command(self, user_info, chat_status, text):
        """コマンド処理 (Ruby: case status['status']相当)"""
        try:
            # ゼッケン番号登録
            if text.startswith('ゼッケン番号:'):
                return self.register_zekken_number(user_info, text)
            
            # チェックポイント登録
            elif text.startswith('CP番号:'):
                return self.register_checkpoint(user_info, text)
            
            # ゴール登録
            elif text.startswith('ゴール:'):
                return self.register_goal(user_info, text)
            
            # ログアウト
            elif text.upper() == 'ログアウト' or text.upper() == 'LOGOUT':
                return self.logout_user(user_info)
            
            # スコアボード再印刷
            elif text.upper() == 'REPRINT' or text == '再印刷':
                return self.reprint_scoreboard(user_info)
            
            # 状態に応じた処理
            elif chat_status and chat_status.status == 'zekkenAuthorization':
                return self.authorize_zekken(user_info, chat_status, text)
            
            elif chat_status and chat_status.status == 'cp_stanby':
                return self.confirm_checkpoint(user_info, chat_status, text)
            
            else:
                return self.default_response(user_info)
                
        except Exception as e:
            logger.error(f"Command processing error: {e}")
            return "コマンド処理中にエラーが発生しました。"
    
    def register_zekken_number(self, user_info, text):
        """ゼッケン番号登録 (Ruby: zekkenNumberRegister相当)"""
        try:
            zekken_number = text.replace('ゼッケン番号:', '').strip()
            
            # チーム存在確認
            team = Team.objects.filter(zekken_number=zekken_number).first()
            if not team:
                return "入力されたゼッケン番号は存在しません。"
            
            # 認証待ち状態に設定
            ChatStatus.objects.update_or_create(
                userid=user_info.userid,
                defaults={
                    'status': 'zekkenAuthorization',
                    'memory': zekken_number
                }
            )
            
            return f"{zekken_number}の代表登録を受け付けました。\n確認のためパスワードを入力し、送信して下さい。"
            
        except Exception as e:
            logger.error(f"Zekken registration error: {e}")
            return "ゼッケン番号登録中にエラーが発生しました。"
    
    def authorize_zekken(self, user_info, chat_status, password):
        """ゼッケン認証 (Ruby: zekkenAuthorization相当)"""
        try:
            zekken_number = chat_status.memory
            
            team = Team.objects.filter(
                zekken_number=zekken_number,
                password=password
            ).first()
            
            if not team:
                return "パスワードが一致しません。もう一度入力して下さい。"
            
            # ユーザー情報更新
            user_table = UserTable.objects.filter(userid=user_info.userid).first()
            if user_table:
                user_table.zekken_number = zekken_number
                user_table.event_code = team.event_code.event_code
                user_table.save()
            
            # チャット状態クリア
            chat_status.delete()
            
            return f"{user_info.user_name}さんをチーム:{team.team_name}の代表として登録しました。"
            
        except Exception as e:
            logger.error(f"Zekken authorization error: {e}")
            return "認証中にエラーが発生しました。"
    
    def reprint_scoreboard(self, user_info):
        """スコアボード再印刷 (Ruby: reprinter相当)"""
        try:
            user_table = UserTable.objects.filter(userid=user_info.userid).first()
            if not user_table or not user_table.zekken_number:
                return "チーム登録が必要です。"
            
            # 非同期でスコアボード生成
            generate_scoreboard_task.delay(
                user_table.zekken_number,
                user_table.event_code,
                reprint=True
            )
            
            return "新しいレポートの生成を開始しました。"
            
        except Exception as e:
            logger.error(f"Scoreboard reprint error: {e}")
            return "レポート生成中にエラーが発生しました。"
    
    def normalize_text(self, text):
        """テキスト正規化 (Ruby: NKF.nkf相当)"""
        import unicodedata
        
        # 全角→半角変換
        text = unicodedata.normalize('NFKC', text)
        
        # ひらがな→カタカナ変換
        text = ''.join([chr(ord(c) + 0x60) if 'ぁ' <= c <= 'ゖ' else c for c in text])
        
        # 大文字変換
        text = text.upper()
        
        return text
    
    def get_or_create_user(self, user_id):
        """ユーザー情報の取得・作成 (Ruby: getUserName相当)"""
        try:
            user_table = UserTable.objects.filter(userid=user_id).first()
            
            if not user_table:
                # LINE APIからユーザー名取得
                profile = self.line_bot_api.get_profile(user_id)
                
                user_table = UserTable.objects.create(
                    userid=user_id,
                    user_name=profile.display_name
                )
            
            return user_table
            
        except Exception as e:
            logger.error(f"User creation error: {e}")
            return None
    
    def get_chat_status(self, user_id):
        """チャット状態の取得 (Ruby: checkStatus相当)"""
        return ChatStatus.objects.filter(userid=user_id).first()
    
    def log_chat(self, user_id, talker, message_type, detail):
        """チャットログ記録 (Ruby: chatLogger相当)"""
        from apps.rogaining.models import ChatLog
        
        try:
            ChatLog.objects.create(
                userid=user_id,
                talker=talker,
                message_type=message_type,
                message_detail=detail
            )
        except Exception as e:
            logger.error(f"Chat logging error: {e}")

6. 移行スケジュール

Phase 1: 基盤構築 (週1-4)

Week 1: Docker環境構築

  • Docker Compose設定
  • PostgreSQL/PostGIS設定
  • Redis設定
  • Nginx設定

Week 2: Django基盤構築

  • Django プロジェクト作成
  • 設定ファイル整備
  • データベースモデル作成
  • マイグレーション実行

Week 3: Celery設定

  • Celery Worker設定
  • Celery Beat設定
  • タスクキュー設定
  • 監視機能設定

Week 4: AWS連携

  • S3連携設定
  • IAM設定
  • セキュリティ設定
  • バックアップ設定

Phase 2: 機能移行 (週5-10)

Week 5-6: コア機能移行

  • チーム管理API
  • チェックポイント管理API
  • ユーザー認証API
  • データベース操作関数

Week 7-8: LINE Bot機能移行

  • LINE Bot設定
  • メッセージハンドラー
  • コマンド処理
  • 状態管理

Week 9-10: スコアボード機能移行

  • Excel生成機能
  • PDF変換機能
  • S3アップロード機能
  • レポート管理機能

Phase 3: 最適化・テスト (週11-14)

Week 11-12: 性能最適化

  • データベースクエリ最適化
  • キャッシュ実装
  • 非同期処理最適化
  • 負荷テスト

Week 13-14: テスト・デプロイ

  • 単体テスト実装
  • 統合テスト実装
  • 本番環境構築
  • 移行テスト

7. リスク管理

7.1 技術的リスク

データ移行リスク

  • リスク: データ損失・破損
  • 対策: 段階的移行、バックアップ、検証テスト

性能リスク

  • リスク: レスポンス時間悪化
  • 対策: 負荷テスト、キャッシュ戦略、スケーリング

互換性リスク

  • リスク: 既存機能との非互換
  • 対策: 詳細な機能テスト、段階的切り替え

7.2 運用リスク

サービス停止リスク

  • リスク: 移行中のサービス停止
  • 対策: ブルーグリーンデプロイ、ロードバランサー活用

習熟度リスク

  • リスク: 新技術への習熟不足
  • 対策: 事前研修、ドキュメント整備、段階的移行

8. 成功指標

8.1 技術指標

  • 移行完了率 100%
  • レスポンス時間 < 2秒
  • 可用性 > 99.9%
  • エラー率 < 0.1%

8.2 業務指標

  • 機能完全性 100%
  • ユーザー満足度向上
  • 運用コスト削減
  • 開発効率向上

9. 今後の展開

9.1 追加機能

  • リアルタイム順位表示
  • 多言語対応
  • モバイルアプリ連携
  • AI機能統合

9.2 技術向上

  • マイクロサービス化
  • Kubernetes導入
  • CI/CD強化
  • 監視・ログ分析強化

この移行仕様書により、Ruby ServerからDocker/Django環境への体系的な移行が可能になります。段階的なアプローチにより、リスクを最小限に抑えながら確実な移行を実現できます。