add photo exif

This commit is contained in:
2025-09-06 01:10:28 +09:00
parent 775f77a440
commit f50d1e1c79

View File

@ -18,12 +18,219 @@ from rest_framework.response import Response
from rest_framework import status
from rest_framework.parsers import MultiPartParser, FormParser
from knox.auth import TokenAuthentication
from PIL import Image
from PIL.ExifTags import TAGS, GPSTAGS
import math
from ..models import NewEvent2, Entry, GpsLog, Location2025
from ..models import NewEvent2, Entry, GpsLog, Location2025, GpsCheckin
# ログ設定
logger = logging.getLogger(__name__)
def extract_exif_data(image_file):
"""
画像ファイルからEXIF情報を抽出する
Returns:
dict: {
'latitude': float,
'longitude': float,
'datetime': datetime,
'has_gps': bool
}
"""
try:
# PIL Imageオブジェクトを作成
image = Image.open(image_file)
# EXIF情報を取得
exif_data = image._getexif()
if not exif_data:
logger.warning(f"No EXIF data found in image: {image_file.name}")
return {'has_gps': False, 'latitude': None, 'longitude': None, 'datetime': None}
# GPS情報とDatetime情報を抽出
gps_info = {}
datetime_original = None
for tag_id, value in exif_data.items():
tag = TAGS.get(tag_id, tag_id)
if tag == "GPSInfo":
# GPS情報の詳細を展開
for gps_tag_id, gps_value in value.items():
gps_tag = GPSTAGS.get(gps_tag_id, gps_tag_id)
gps_info[gps_tag] = gps_value
elif tag == "DateTime" or tag == "DateTimeOriginal":
try:
datetime_original = datetime.strptime(str(value), '%Y:%m:%d %H:%M:%S')
except ValueError:
logger.warning(f"Failed to parse datetime: {value}")
# GPS座標の変換
latitude = None
longitude = None
if 'GPSLatitude' in gps_info and 'GPSLongitude' in gps_info:
lat_deg, lat_min, lat_sec = gps_info['GPSLatitude']
lon_deg, lon_min, lon_sec = gps_info['GPSLongitude']
# 度分秒を小数度に変換
latitude = float(lat_deg) + float(lat_min)/60 + float(lat_sec)/3600
longitude = float(lon_deg) + float(lon_min)/60 + float(lon_sec)/3600
# 南緯・西経の場合は負の値にする
if gps_info.get('GPSLatitudeRef') == 'S':
latitude = -latitude
if gps_info.get('GPSLongitudeRef') == 'W':
longitude = -longitude
logger.info(f"EXIF extracted from {image_file.name}: lat={latitude}, lon={longitude}, datetime={datetime_original}")
return {
'has_gps': latitude is not None and longitude is not None,
'latitude': latitude,
'longitude': longitude,
'datetime': datetime_original
}
except Exception as e:
logger.error(f"Error extracting EXIF from {image_file.name}: {str(e)}")
return {'has_gps': False, 'latitude': None, 'longitude': None, 'datetime': None}
def find_nearest_checkpoint(latitude, longitude, event_code, max_distance_m=100):
"""
指定された座標から最も近いチェックポイントを検索する
Args:
latitude: 緯度
longitude: 経度
event_code: イベントコード
max_distance_m: 最大距離(メートル)
Returns:
Location2025オブジェクトまたはNone
"""
try:
# 該当イベントのチェックポイントを取得
checkpoints = Location2025.objects.filter(event__event_name=event_code)
if not checkpoints.exists():
logger.warning(f"No checkpoints found for event: {event_code}")
return None
# 最も近いチェックポイントを検索
nearest_cp = None
min_distance = float('inf')
for cp in checkpoints:
if cp.latitude and cp.longitude:
# ハーバーサイン距離の計算
distance = calculate_distance(latitude, longitude, cp.latitude, cp.longitude)
if distance < min_distance and distance <= max_distance_m:
min_distance = distance
nearest_cp = cp
if nearest_cp:
logger.info(f"Found nearest checkpoint: {nearest_cp.location} (CP{nearest_cp.cp_number}) at distance {min_distance:.1f}m")
else:
logger.info(f"No checkpoint found within {max_distance_m}m of lat={latitude}, lon={longitude}")
return nearest_cp
except Exception as e:
logger.error(f"Error finding nearest checkpoint: {str(e)}")
return None
def calculate_distance(lat1, lon1, lat2, lon2):
"""
ハーバーサイン公式を使用して2点間の距離を計算メートル単位
"""
R = 6371000 # 地球の半径(メートル)
lat1_rad = math.radians(lat1)
lat2_rad = math.radians(lat2)
delta_lat = math.radians(lat2 - lat1)
delta_lon = math.radians(lon2 - lon1)
a = (math.sin(delta_lat/2) * math.sin(delta_lat/2) +
math.cos(lat1_rad) * math.cos(lat2_rad) *
math.sin(delta_lon/2) * math.sin(delta_lon/2))
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
distance = R * c
return distance
def create_checkin_from_photo(entry, checkpoint, photo_datetime, zekken_number, event_code, photo_filename, request_id):
"""
写真情報からチェックインデータを作成する
Args:
entry: Entryオブジェクト
checkpoint: Location2025オブジェクト
photo_datetime: 撮影日時
zekken_number: ゼッケン番号
event_code: イベントコード
photo_filename: 写真ファイル名
request_id: リクエストID
Returns:
GpsCheckinオブジェクトまたはNone
"""
try:
# 既存のチェックインをチェック(重複防止)
existing_checkin = GpsCheckin.objects.filter(
zekken=str(zekken_number),
event_code=event_code,
cp_number=checkpoint.cp_number
).first()
if existing_checkin:
logger.info(f"[BULK_UPLOAD] 📍 Existing checkin found - ID: {request_id}, CP: {checkpoint.cp}, existing_id: {existing_checkin.id}")
return existing_checkin
# 新規チェックインの作成
# 撮影時刻をJSTに変換
if photo_datetime:
# 撮影時刻をUTCとして扱い、JSTに変換
create_at = timezone.make_aware(photo_datetime, timezone.utc)
else:
create_at = timezone.now()
# シリアル番号を決定(既存のチェックイン数+1
existing_count = GpsCheckin.objects.filter(
zekken=str(zekken_number),
event_code=event_code
).count()
serial_number = existing_count + 1
# チェックインデータを作成
checkin = GpsCheckin.objects.create(
zekken=str(zekken_number),
event_code=event_code,
cp_number=checkpoint.cp_number,
serial_number=serial_number,
create_at=create_at,
update_at=timezone.now(),
validate_location=True, # 写真から作成されたものは自動承認
buy_flag=False,
points=checkpoint.checkin_point if checkpoint.checkin_point else 0,
create_user=f"bulk_upload_{request_id}", # 一括アップロードで作成されたことを示す
update_user=f"bulk_upload_{request_id}",
photo_source=photo_filename # 元画像ファイル名を記録(カスタムフィールドがあれば)
)
logger.info(f"[BULK_UPLOAD] ✅ Created checkin - ID: {request_id}, checkin_id: {checkin.id}, CP: {checkpoint.cp_number}, time: {create_at}, points: {checkin.points}")
return checkin
except Exception as e:
logger.error(f"[BULK_UPLOAD] ❌ Error creating checkin - ID: {request_id}, CP: {checkpoint.cp_number if checkpoint else 'None'}, error: {str(e)}")
return None
@api_view(['POST', 'GET'])
@permission_classes([IsAuthenticated])
@parser_classes([MultiPartParser, FormParser])
@ -54,7 +261,13 @@ def bulk_upload_checkin_photos(request):
"endpoint": "/api/bulk_upload_checkin_photos/",
"method": "POST",
"required_params": ["event_code", "zekken_number", "photos"],
"optional_params": ["auto_process"],
"optional_params": ["auto_process", "skip_team_validation"],
"features": [
"写真からEXIF情報を抽出",
"GPS座標から最寄りチェックポイントを自動検索",
"撮影時刻を使用してチェックインデータを自動作成",
"チーム検証機能"
],
"user": request.user.email if request.user.is_authenticated else 'Not authenticated'
}, status=status.HTTP_200_OK)
@ -223,16 +436,82 @@ def bulk_upload_checkin_photos(request):
"file_url": f"{settings.MEDIA_URL}{saved_path}"
})
# EXIF情報の抽出(今後実装予定)
# EXIF情報の抽出とチェックイン作成
if auto_process:
# TODO: EXIF情報の抽出とGPS座標取得
# TODO: 撮影時刻の取得
# TODO: 近接チェックポイントの検索
# TODO: 自動チェックイン処理
logger.info(f"[BULK_UPLOAD] 🔍 Starting EXIF processing for {uploaded_file.name}")
# ファイルポインタを先頭に戻す
uploaded_file.seek(0)
# EXIF情報の抽出
exif_data = extract_exif_data(uploaded_file)
if exif_data['has_gps']:
logger.info(f"[BULK_UPLOAD] 📍 GPS data found - lat: {exif_data['latitude']}, lon: {exif_data['longitude']}, datetime: {exif_data['datetime']}")
# 最も近いチェックポイントを検索
nearest_checkpoint = find_nearest_checkpoint(
exif_data['latitude'],
exif_data['longitude'],
event_code
)
if nearest_checkpoint:
# チェックインデータを作成
checkin = create_checkin_from_photo(
entry,
nearest_checkpoint,
exif_data['datetime'],
zekken_number,
event_code,
uploaded_file.name,
request_id
)
if checkin:
file_result.update({
"auto_process_status": "pending",
"auto_process_message": "EXIF解析機能は今後実装予定です"
"auto_process_status": "success",
"auto_process_message": f"チェックイン作成完了",
"checkin_info": {
"checkpoint_name": nearest_checkpoint.location,
"cp_number": nearest_checkpoint.cp_number,
"points": checkin.points,
"checkin_time": checkin.create_at.strftime("%Y-%m-%d %H:%M:%S"),
"checkin_id": checkin.id
},
"gps_info": {
"latitude": exif_data['latitude'],
"longitude": exif_data['longitude'],
"photo_datetime": exif_data['datetime'].strftime("%Y-%m-%d %H:%M:%S") if exif_data['datetime'] else None
}
})
else:
file_result.update({
"auto_process_status": "failed",
"auto_process_message": "チェックイン作成に失敗しました",
"checkpoint_info": {
"checkpoint_name": nearest_checkpoint.location,
"cp_number": nearest_checkpoint.cp_number
}
})
else:
file_result.update({
"auto_process_status": "no_checkpoint",
"auto_process_message": "近くにチェックポイントが見つかりませんでした",
"gps_info": {
"latitude": exif_data['latitude'],
"longitude": exif_data['longitude'],
"photo_datetime": exif_data['datetime'].strftime("%Y-%m-%d %H:%M:%S") if exif_data['datetime'] else None
}
})
else:
file_result.update({
"auto_process_status": "no_gps",
"auto_process_message": "GPS情報が見つかりませんでした",
"exif_info": {
"has_datetime": exif_data['datetime'] is not None,
"photo_datetime": exif_data['datetime'].strftime("%Y-%m-%d %H:%M:%S") if exif_data['datetime'] else None
}
})
successful_uploads += 1
@ -254,7 +533,7 @@ def bulk_upload_checkin_photos(request):
# 成功レスポンス
return Response({
"status": "OK",
"message": "写真の一括アップロードが完了しました",
"message": "写真の一括アップロードとチェックイン処理が完了しました",
"upload_summary": {
"total_files": len(uploaded_files),
"successful_uploads": successful_uploads,
@ -268,12 +547,12 @@ def bulk_upload_checkin_photos(request):
},
"processed_files": processed_files,
"auto_process_enabled": auto_process,
"next_steps": [
"アップロードされた写真のEXIF情報解析",
"GPS座標からチェックポイント自動判定",
"通過履歴の自動生成と校正",
"ユーザーによる確認と承認"
]
"processing_summary": {
"gps_found": len([f for f in processed_files if f.get('auto_process_status') == 'success']),
"checkins_created": len([f for f in processed_files if f.get('checkin_info')]),
"no_gps": len([f for f in processed_files if f.get('auto_process_status') == 'no_gps']),
"no_checkpoint": len([f for f in processed_files if f.get('auto_process_status') == 'no_checkpoint'])
}
})
except Exception as e: