# sumaexcel/excel.py import openpyxl from openpyxl.styles import Font, PatternFill, Alignment, Border, Side from openpyxl.utils import get_column_letter import pandas as pd from typing import Optional, Dict, List, Union, Any import os import shutil from datetime import datetime from typing import Optional, Dict, Any, Union, Tuple from pathlib import Path # psycopg2のインポートを追加 import psycopg2 import psycopg2.extras from .config_handler import ConfigHandler # ini file のロード from .styles import StyleManager from .merge import MergeManager from .image import ImageManager from .conditional import ConditionalFormatManager from .page import PageManager, PaperSizes import logging class SumasenExcel: """Enhanced Excel handling class with extended functionality""" def __init__(self, document: str, variables: Dict[str, Any], docbase: str = "./docbase") -> Dict[str, str]: """Initialize Excel document with basic settings Args: document: Document name variables: Variables for document docbase: Base directory for documents (default: "./docbase") Returns: Dict with status (true/false) and optional error message """ self.debug = True self.workbook = None self.template_filepath = None self.output_filepath = None self.current_sheet = None self.dbname = None self.user = None self.password = None self.host = None self.port = None self._style_manager = None self._merge_manager = None self._image_manager = None self._conditional_manager = None self._page_manager = None try: # document base を設定 self.docpath = docbase if not os.path.exists(docbase): logging.error(f"Document base directory not found: {docbase}") # ini fileをロード self.ini_file_path = f"{docbase}/{document}.ini" self.conf = ConfigHandler(self.ini_file_path, variables) if not os.path.exists(self.ini_file_path): logging.error(f"INI file not found: {self.ini_file_path}") # basic section をロード basic = self.conf.get_section("basic") if not basic: logging.error(f"Basic section not found in INI file: {self.ini_file_path}") self.basic = basic # basicセクションから必要なパラメータを取得 template_file = basic.get("template_file") if not template_file: logging.error("template_file not found in basic section") doc_file = basic.get("doc_file") if not doc_file: logging.error("doc_file not found in basic section") self.maxcol = basic.get("maxcol") if not self.maxcol: self.maxcol = 100 # デフォルト値を設定 logging.warning("maxcol not found in basic section. Defaulting to 100") else: self.maxcol = int(self.maxcol) sections = basic.get("sections") if not sections: logging.error("sections not found in basic section") # セクションをリストに変換 self.section_list = [s.strip() for s in sections.split(",")] if not self.section_list: logging.error("sections not found in basic section") # 出力ファイルパスを設定 self.output_filepath = f"{self.docpath}/{doc_file}" # 新規のExcelワークブックを作成 self.workbook = openpyxl.Workbook() # デフォルトで作成されるシートを削除 self.workbook.remove(self.workbook.active) # テンプレートワークブックをロード self.template_filepath = f"{self.docpath}/{template_file}" if not os.path.exists(self.template_filepath): logging.error(f"Template file not found: {self.template_filepath}") self.template_workbook = openpyxl.load_workbook(self.template_filepath) self.template_sheet = self.template_workbook.active except Exception as e: logging.error(f"Error initializing Excel document: {str(e)}") def make_report(self,variables: Dict[str, Any]): """レポートを生成する""" logging.info("make_report step-1") try: ret_status = True # セクションごとに処理を実行(ワークシート単位) for section in self.section_list: ret = self.proceed_section(section,variables) if ret["status"] == False: message = ret.get("message", "No message provided") return {"status": False, "message": f"Fail generating section: {section}...{message}"} # 生成したワークブックを保存 self.workbook.save(self.output_filepath) return {"status": True, "message": f"Report generated successfully : {self.output_filepath}", "filepath":self.output_filepath} except Exception as e: return {"status": False, "message": f"Exception in make_report: Error generating report: {str(e)}"} def proceed_section(self, section: str, variables: Dict[str, Any]): logging.info(f"make_report.proceed_section step-1:section={section}") try: # セクションの設定を取得 section_config = self.conf.get_section(section) # セクションが存在しない場合はスキップ if not section_config: return {"status": False, "message": f"Error no section found: {section}"} # テンプレートシートをコピー template_sheet_name = section_config.get("template_sheet") if not template_sheet_name or template_sheet_name not in self.template_workbook.sheetnames: return {"status": False, "message": f"Error no template sheet found: {template_sheet_name}"} # シートの名前を設定 new_sheet = self.workbook.create_sheet(title=section_config.get("sheet_name", section)) self.worksheet = new_sheet self.dbname=variables.get('db'), self.user=variables.get('username'), self.password=variables.get('password'), self.host=variables.get('host'), self.port=variables.get('port') if not self.dbname or not self.user or not self.password or not self.host or not self.port: return {"status": False, "message": f"Error no database connection information"} # PostgreSQLに接続 self.conn = psycopg2.connect( self.dbname, self.user, self.password, self.host, self.port ) self._style_manager = StyleManager() self._merge_manager = MergeManager(self.current_sheet) self._image_manager = ImageManager(self.current_sheet) self._conditional_manager = ConditionalFormatManager(self.current_sheet) self._page_manager = PageManager(self.current_sheet) # シートの幅を設定 fit_to_width = section_config.get("fit_to_width") if fit_to_width: new_sheet.sheet_view.zoomScaleNormal = float(fit_to_width) # シートの向きを設定 orientation = section_config.get("orientation") new_sheet.sheet_view.orientation = orientation if orientation else "portrait" self.current_worksheet = new_sheet # グループ定義を取得 groups = section_config.get("groups") if not groups: return {"status": False, "message": f"Error no group definitions found: {section}"} # グループをリストに変換 group_list = [g.strip() for g in groups.split(",")] if not group_list: return {"status": False, "message": f"Error invalid group definitions found: {section}"} # 各グループの設定を取得 for group in group_list: ret = self.proceed_group(group,variables) if ret["status"] == False: return ret return {"status": True, "message": f"Success generating section: {section}"} except Exception as e: return {"status": False, "message": f"Exception in proceed_section : Error generating report: {str(e)}"} def proceed_group(self,group:str,variables: Dict[str, Any]): logging.info(f"make_report.proceed_group step-1:section={group}") try: group_config = self.conf.get_section(group) if not group_config: return {"status": False, "message": f"Error no group section found: {group}"} # グループの処理パラメータを取得 group_range = group_config.get("group_range") table = group_config.get("table") where = group_config.get("where") if not where or not table or not group_range: return {"status": False, "message": f"Error invalid group parameters: {group_config}"} sort = group_config.get("sort") if not sort: ret = self.proceed_one_record(table,where,group_range,variables) if ret.status == True: return {"status": True, "message": f"Success generating group: {group}"} else: ret = self.proceed_all_records(table,where,sort,group_range,variables) if ret.status == True: return {"status": True, "message": f"Success generating group: {group}"} except Exception as e: logging.error(f"Error in proceed_group: {str(e)}") return {"status": False, "message": f"Exception in proceed_group : Error generating report: {str(e)}"} def proceed_one_record(self,table:str,where:str,group_range:str,variables: Dict[str, Any]): """1レコードのデータを取得してシートの値を置き換える Args: table: テーブル名 where: WHERE句 group_range: 処理対象範囲 variables: DB接続情報を含む変数辞書 """ logging.info(f"make_report.proceed_one_record step-1:table={table},where={where},group_range={group_range}") try: # まずself.template_sheetの指定範囲のセルをself.current_sheetにコピーする。 self.copy_template_to_current(self.range,self.range) cursor = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) # SQLクエリを実行 query = f"SELECT * FROM {table} WHERE {where} LIMIT 1" cursor.execute(query) record = cursor.fetchone() if record: # group_rangeの範囲内のセルを走査 for row in self.current_worksheet: for cell in row: if cell.value and isinstance(cell.value, str): # [field_name]形式の文字列を検索 import re matches = re.findall(r'\[(.*?)\]', cell.value) # マッチした場合、フィールド値で置換 if matches: new_value = cell.value for field_name in matches: if field_name in record: new_value = new_value.replace( f'[{field_name}]', str(record[field_name]) ) cell.value = new_value cursor.close() self.conn.close() return {"status": True, "message": f"Success generating group: "} except Exception as e: logging.error(f"Error in proceed_one_record: {str(e)}") return {"status": False, "message": f"Exception in proceed_one_record:Error generating report: {str(e)}"} def proceed_all_record(self, table: str, where: str, sort: str, group_range: str, variables: Dict[str, Any]): """複数レコードを取得してシートの値を置き換える Args: table: テーブル名 where: WHERE句 sort: ORDER BY句 group_range: 処理対象範囲 variables: DB接続情報を含む変数辞書 """ logging.info(f"make_report.proceed_all_record step-1:table={table},where={where},group_range={group_range}") try: # グループ範囲の行数を取得 start_row, end_row = map(int, group_range.split(':')[0].split(',')) template_rows = end_row - start_row + 1 cursor = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) # SQLクエリを実行 query = f"SELECT * FROM {table} WHERE {where} ORDER BY {sort}" cursor.execute(query) records = cursor.fetchall() current_row = start_row for record in records: # テンプレート範囲をコピー self.copy_template_to_current( f"{start_row},{end_row}", f"{current_row},{current_row + template_rows - 1}" ) # コピーした範囲内のセルを走査して値を置換 for row in range(current_row, current_row + template_rows): for cell in self.current_worksheet[row]: if cell.value and isinstance(cell.value, str): # [field_name]形式の文字列を検索 import re matches = re.findall(r'\[(.*?)\]', cell.value) # マッチした場合、フィールド値で置換 if matches: new_value = cell.value for field_name in matches: if field_name in record: new_value = new_value.replace( f'[{field_name}]', str(record[field_name]) ) cell.value = new_value current_row += template_rows cursor.close() self.conn.close() return {"status": True, "message": "Success processing all records"} except Exception as e: logging.error(f"Error in proceed_all_record: {str(e)}") return {"status": False, "message": f"Exception in proceed_all_record:Error processing records: {str(e)}"} def copy_template_to_current(self,range,target_range): # テンプレートシートから現在のシートにデータをコピー source_range = self.template_sheet.range(range) # コピーする範囲を指定 target_range = self.current_sheet.range(target_range) # 貼り付け先の範囲を指定 # 値、数式、フォーマットをコピー source_range.copy(target_range) # Style operations def apply_style( self, cell_range: str, font: Dict[str, Any] = None, fill: Dict[str, Any] = None, border: Dict[str, Any] = None, alignment: Dict[str, Any] = None ) -> None: """Apply styles to cell range""" for row in self.current_sheet[cell_range]: for cell in row: if font: cell.font = self._style_manager.create_font(**font) if fill: cell.fill = self._style_manager.create_fill(**fill) if border: cell.border = self._style_manager.create_border(**border) if alignment: cell.alignment = self._style_manager.create_alignment(**alignment) # Merge operations def merge_range( self, start_row: int, start_col: int, end_row: int, end_col: int ) -> None: """Merge cell range""" self._merge_manager.merge_cells(start_row, start_col, end_row, end_col) # Image operations def add_image( self, image_path: Union[str, Path], position: Tuple[int, int], size: Optional[Tuple[int, int]] = None ) -> None: """Add image to worksheet""" self._image_manager.add_image(image_path, position, size) # Conditional formatting def add_conditional_format( self, cell_range: str, format_type: str, **kwargs ) -> None: """Add conditional formatting""" if format_type == 'color_scale': self._conditional_manager.add_color_scale(cell_range, **kwargs) elif format_type == 'data_bar': self._conditional_manager.add_data_bar(cell_range, **kwargs) elif format_type == 'icon_set': self._conditional_manager.add_icon_set(cell_range, **kwargs) elif format_type == 'custom': self._conditional_manager.add_custom_rule(cell_range, **kwargs) # Page setup def setup_page( self, orientation: str = 'portrait', paper_size: int = PaperSizes.A4, margins: Dict[str, float] = None, header_footer: Dict[str, Any] = None ) -> None: """Configure page setup""" self._page_manager.set_page_setup( orientation=orientation, paper_size=paper_size ) if margins: self._page_manager.set_margins(**margins) if header_footer: self._page_manager.set_header_footer(**header_footer) def cleanup(self) -> None: """Cleanup temporary files""" if self._image_manager: self._image_manager.cleanup() def __del__(self): """Destructor""" self.cleanup()