import os import json import time from pathlib import Path from typing import List, Optional import boto3 import cups import yaml from dotenv import load_dotenv from loguru import logger class Config: def __init__(self): load_dotenv() with open('config.yaml', 'r') as f: self.config = yaml.safe_load(f) # 設定値を上書き self.config['s3']['bucket'] = os.getenv('S3_BUCKET_NAME') self.config['s3']['prefix'] = f"{os.getenv('LOCATION')}/scoreboard/" self.config['printer']['name'] = os.getenv('PRINTER_NAME') self.config['app']['file_log'] = self.config['app']['file_log'].replace('${LOCATION}', os.getenv('LOCATION', '')) # デバッグログ追加 logger.debug(f"設定値: {self.config}") logger.debug(f"環境変数: S3_BUCKET_NAME={os.getenv('S3_BUCKET_NAME')}, LOCATION={os.getenv('LOCATION')}, PRINTER_NAME={os.getenv('PRINTER_NAME')}") class S3Manager: def __init__(self, config: Config): self.config = config self.s3_client = boto3.client('s3') def list_files(self) -> List[str]: """S3バケットのファイル一覧を取得""" try: files = [] bucket = self.config.config['s3']['bucket'] prefix = self.config.config['s3']['prefix'] logger.info(f"S3バケット監視: {bucket}/{prefix}") paginator = self.s3_client.get_paginator('list_objects_v2') for page in paginator.paginate(Bucket=bucket, Prefix=prefix): if 'Contents' in page: new_files = [obj['Key'] for obj in page['Contents']] files.extend(new_files) if new_files: logger.info(f"検出ファイル: {new_files}") return files except Exception as e: logger.error(f"S3ファイル一覧の取得に失敗: {e}") return [] def get_new_files_old(self, current_files: List[str]) -> List[str]: """新規ファイルを特定""" new_files = list(set(current_files) - set(self.processed_files)) if new_files: logger.info(f"新規印刷対象ファイル: {new_files}") return new_files def list_files_old(self) -> List[str]: """S3バケットのファイル一覧を取得""" try: files = [] paginator = self.s3_client.get_paginator('list_objects_v2') for page in paginator.paginate( Bucket=self.config.config['s3']['bucket'], Prefix=self.config.config['s3']['prefix'] ): if 'Contents' in page: files.extend([obj['Key'] for obj in page['Contents']]) return files except Exception as e: logger.error(f"S3ファイル一覧の取得に失敗: {e}") return [] def download_file(self, key: str, local_path: str) -> bool: """S3からファイルをダウンロード""" try: self.s3_client.download_file( self.config.config['s3']['bucket'], key, local_path ) return True except Exception as e: logger.error(f"ファイルのダウンロードに失敗 {key}: {e}") return False class PrinterManager: def __init__(self, config: Config): self.config = config self.job_status = {} # ジョブID:ステータス を保持 try: self.conn = cups.Connection() logger.info(f"プリンター接続成功: {config.config['printer']['name']}") printers = self.conn.getPrinters() logger.info(f"利用可能なプリンター: {printers}") except Exception as e: logger.error(f"プリンター接続失敗: {e}") def print_file_old(self, filepath: str) -> bool: """ファイルを印刷""" printer_config = self.config.config['printer'] max_attempts = printer_config['retry']['max_attempts'] delay = printer_config['retry']['delay_seconds'] for attempt in range(max_attempts): try: job_id = self.conn.printFile( 'ibigawa', #printer_config['name'], filepath, "Rogaining Score", printer_config['options'] ) logger.info(f"印刷ジョブを送信: {filepath} (Job ID: {job_id})") return True except Exception as e: logger.error(f"印刷に失敗 (試行 {attempt + 1}/{max_attempts}): {e}") if attempt < max_attempts - 1: time.sleep(delay) else: return False def print_file(self, filepath: str) -> bool: try: if not os.path.exists(filepath): logger.error(f"ファイルが存在しません: {filepath}") return False logger.debug(f"印刷前確認: ファイルサイズ={os.path.getsize(filepath)}, パス={filepath}") job_id = self.conn.printFile('ibigawa', filepath, "Rogaining Score", self.config.config['printer']['options']) self.job_status[job_id] = 'pending' logger.info(f"印刷ジョブ送信: {filepath} (Job ID: {job_id})") return True except Exception as e: logger.error(f"印刷失敗: {e}") return False def check_job_status(self): completed_jobs = [] for job_id in self.job_status: try: job_info = self.conn.getJobAttributes(job_id) status = job_info.get('job-state', 0) if status == 9: # 完了 logger.info(f"印刷完了 (Job ID: {job_id})") completed_jobs.append(job_id) elif status == 8: # 中止 logger.error(f"印刷中止 (Job ID: {job_id})") completed_jobs.append(job_id) except Exception as e: logger.error(f"ステータス取得失敗 (Job ID: {job_id}): {e}") for job_id in completed_jobs: del self.job_status[job_id] class FileManager: def __init__(self, config: Config): self.config = config self.file_log_path = Path(self.config.config['app']['save_path']) / self.config.config['app']['file_log'] # self.processed_files = self._load_processed_files() self.processed_files = [] # 初期化時は空に self._load_processed_files() # 状態をリセット def _load_processed_files(self) -> List[str]: """処理済みファイルの一覧を読み込み""" try: if self.file_log_path.exists(): with open(self.file_log_path, 'r') as f: return json.load(f) return [] except Exception as e: logger.error(f"処理済みファイル一覧の読み込みに失敗: {e}") return [] def save_processed_files(self): """処理済みファイルの一覧を保存""" try: self.file_log_path.parent.mkdir(parents=True, exist_ok=True) with open(self.file_log_path, 'w') as f: json.dump(self.processed_files, f) except Exception as e: logger.error(f"処理済みファイル一覧の保存に失敗: {e}") def get_new_files(self, current_files: List[str]) -> List[str]: #"""新規ファイルを特定""" #return list(set(current_files) - set(self.processed_files)) """新規ファイルを特定""" new_files = list(set(current_files) - set(self.processed_files)) logger.info(f"新規ファイル検出: processed={self.processed_files}, current={current_files}, new={new_files}") return new_files def setup_logging(config: Config): """ロギングの設定""" log_path = Path(config.config['app']['log_path']) log_path.mkdir(parents=True, exist_ok=True) logger.add( log_path / "app.log", rotation="1 day", retention="30 days", level="INFO" ) logger.add( log_path / config.config['app']['error_log'], level="ERROR" ) def main(): config = Config() setup_logging(config) s3_manager = S3Manager(config) printer_manager = PrinterManager(config) file_manager = FileManager(config) logger.info("スコアボード監視システムを開始") while True: try: printer_manager.check_job_status() # 印刷ステータス確認 current_files = s3_manager.list_files() new_files = file_manager.get_new_files(current_files) if not printer_manager.job_status: # 印刷中のジョブがない場合のみ新規印刷 for file_key in new_files: # main関数内 if file_key.endswith('.pdf'): local_path = Path(config.config['app']['save_path']) / 'temp.pdf' local_path.parent.mkdir(parents=True, exist_ok=True) # ディレクトリ作成 if s3_manager.download_file(file_key, str(local_path)): logger.debug(f"ダウンロード完了: {local_path}, サイズ={os.path.getsize(str(local_path))}") if printer_manager.print_file(str(local_path)): logger.debug("印刷ジョブ登録完了") file_manager.processed_files.append(file_key) file_manager.save_processed_files() break else: logger.error(f"ファイルのダウンロードに失敗: {file_key}") else: # PDFでないファイルは処理済みとしてマーク file_manager.processed_files.append(file_key) file_manager.save_processed_files() time.sleep(10) # ポーリング間隔 except Exception as e: logger.error(f"予期せぬエラーが発生: {e}") time.sleep(30) # エラー時は長めの待機 def main_old(): config = Config() setup_logging(config) s3_manager = S3Manager(config) printer_manager = PrinterManager(config) file_manager = FileManager(config) logger.info("スコアボード監視システムを開始") while True: try: # S3のファイル一覧を取得 current_files = s3_manager.list_files() # 新規ファイルを特定 new_files = file_manager.get_new_files(current_files) for file_key in new_files: if not file_key.endswith('.pdf'): file_manager.processed_files.append(file_key) continue # 保存先パスの設定 local_path = Path(config.config['app']['save_path']) / 'temp.pdf' local_path.parent.mkdir(parents=True, exist_ok=True) # ファイルのダウンロードと印刷 if s3_manager.download_file(file_key, str(local_path)): if printer_manager.print_file(str(local_path)): file_manager.processed_files.append(file_key) file_manager.save_processed_files() time.sleep(10) # ポーリング間隔 except Exception as e: logger.error(f"予期せぬエラーが発生: {e}") time.sleep(30) # エラー時は長めの待機 if __name__ == "__main__": main()