# sumaexcel/excel.py import openpyxl from openpyxl.styles import Font, PatternFill, Alignment, Border, Side from openpyxl.utils import get_column_letter import pandas as pd from typing import Optional, Dict, List, Union, Any import os import shutil from datetime import datetime from typing import Optional, Dict, Any, Union, Tuple from pathlib import Path # psycopg2のインポートを追加 import psycopg2 import psycopg2.extras from copy import copy from openpyxl.utils import range_boundaries from .config_handler import ConfigHandler # ini file のロード #from .styles import StyleManager #from .merge import MergeManager #from .image import ImageManager #from .conditional import ConditionalFormatManager from .page import PageManager, PaperSizes import logging logging.basicConfig( level=logging.INFO, # INFOレベル以上のログを表示 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) class SumasenExcel: """Enhanced Excel handling class with extended functionality""" def __init__(self, document: str, variables: Dict[str, Any], docbase: str = "./docbase") -> Dict[str, str]: """Initialize Excel document with basic settings Args: document: Document name variables: Variables for document docbase: Base directory for documents (default: "./docbase") Returns: Dict with status (true/false) and optional error message """ self.debug = True self.workbook = None self.template_filepath = None self.output_filepath = None self.current_sheet = None self.dbname = None self.user = None self.password = None self.host = None self.port = None self._style_manager = None self._merge_manager = None self._image_manager = None self._conditional_manager = None self._page_manager = None try: # document base を設定 self.docpath = docbase if not os.path.exists(docbase): logging.error(f"Document base directory not found: {docbase}") # ini fileをロード self.ini_file_path = f"{docbase}/{document}.ini" self.conf = ConfigHandler(self.ini_file_path, variables) if not os.path.exists(self.ini_file_path): logging.error(f"INI file not found: {self.ini_file_path}") # basic section をロード basic = self.conf.get_section("basic") if not basic: logging.error(f"Basic section not found in INI file: {self.ini_file_path}") self.basic = basic # basicセクションから必要なパラメータを取得 template_file = basic.get("template_file") if not template_file: logging.error("template_file not found in basic section") doc_file = basic.get("doc_file") if not doc_file: logging.error("doc_file not found in basic section") self.maxcol = basic.get("maxcol") if not self.maxcol: self.maxcol = 100 # デフォルト値を設定 logging.warning("maxcol not found in basic section. Defaulting to 100") else: self.maxcol = int(self.maxcol) sections = basic.get("sections") if not sections: logging.error("sections not found in basic section") # セクションをリストに変換 self.section_list = [s.strip() for s in sections.split(",")] if not self.section_list: logging.error("sections not found in basic section") # 出力ファイルパスを設定 self.output_filepath = f"{self.docpath}/{doc_file}" # 新規のExcelワークブックを作成 self.workbook = openpyxl.Workbook() # デフォルトで作成されるシートを削除 self.workbook.remove(self.workbook.active) # テンプレートワークブックをロード self.template_filepath = f"{self.docpath}/{template_file}" if not os.path.exists(self.template_filepath): logging.error(f"Template file not found: {self.template_filepath}") self.template_workbook = openpyxl.load_workbook(self.template_filepath) self.template_sheet = self.template_workbook.active except Exception as e: logging.error(f"Error initializing Excel document: {str(e)}") def make_report(self,variables: Dict[str, Any]): """レポートを生成する""" logging.info("make_report step-1") try: ret_status = True # セクションごとに処理を実行(ワークシート単位) for section in self.section_list: ret = self.proceed_section(section,variables) if ret["status"] == False: message = ret.get("message", "No message provided") return {"status": False, "message": f"Fail generating section: {section}...{message}"} # 生成したワークブックを保存 self.workbook.save(self.output_filepath) return {"status": True, "message": f"Report generated successfully : {self.output_filepath}", "filepath":self.output_filepath} except Exception as e: return {"status": False, "message": f"Exception in make_report: Error generating report: {str(e)}"} def proceed_section(self, section: str, variables: Dict[str, Any]): print(f"make_report.proceed_section step-1:section={section}") try: # セクションの設定を取得 section_config = self.conf.get_section(section) # セクションが存在しない場合はスキップ if not section_config: return {"status": False, "message": f"Error no section found: {section}"} # テンプレートシートをコピー template_sheet_name = section_config.get("template_sheet") if not template_sheet_name or template_sheet_name not in self.template_workbook.sheetnames: return {"status": False, "message": f"Error no template sheet found: {template_sheet_name}"} # シートの名前を設定 new_sheet = self.workbook.create_sheet(title=section_config.get("sheet_name", section)) self.worksheet = new_sheet self.dbname=variables.get('db') self.user=variables.get('username') self.password=variables.get('password') self.host=variables.get('host','postgres') self.port=variables.get('port','5432') if not self.dbname or not self.user or not self.password or not self.host or not self.port: return {"status": False, "message": f"Error no database connection information"} print(f"db={self.dbname},user={self.user},pass={self.password},host={self.host},port={self.port}") # PostgreSQLに接続 self.conn = psycopg2.connect( dbname=self.dbname, user=self.user, password=self.password, host=self.host, port=self.port ) #self._style_manager = StyleManager() #self._merge_manager = MergeManager(self.current_sheet) #self._image_manager = ImageManager(self.current_sheet) #self._conditional_manager = ConditionalFormatManager(self.current_sheet) #self._page_manager = PageManager(self.current_sheet) # シートの幅を設定 fit_to_width = section_config.get("fit_to_width") if fit_to_width: new_sheet.sheet_view.zoomScaleNormal = float(fit_to_width) # シートの向きを設定 orientation = section_config.get("orientation") new_sheet.sheet_view.orientation = orientation if orientation else "portrait" self.current_worksheet = new_sheet # グループ定義を取得 groups = section_config.get("groups") if not groups: return {"status": False, "message": f"Error no group definitions found: {section}"} # グループをリストに変換 group_list = [g.strip() for g in groups.split(",")] if not group_list: return {"status": False, "message": f"Error invalid group definitions found: {section}"} # 各グループの設定を取得 for group in group_list: section_group = f"{section}.{group}" ret = self.proceed_group(section_group,variables) if ret["status"] == False: return ret return {"status": True, "message": f"Success generating section: {section}"} except Exception as e: return {"status": False, "message": f"Exception in proceed_section : Error generating report: {str(e)}"} def proceed_group(self,group:str,variables: Dict[str, Any]): logging.info(f"make_report.proceed_group step-1:section={group}") try: group_config = self.conf.get_section(group) if not group_config: return {"status": False, "message": f"Error no group section found: {group}"} # グループの処理パラメータを取得 group_range = group_config.get("group_range") table = group_config.get("table_name") where = group_config.get("where") if not where or not table or not group_range: return {"status": False, "message": f"Error invalid group parameters: {group_config}"} sort = group_config.get("sort") if not sort: ret = self.proceed_one_record(table,where,group_range,variables) if ret.get("status") == True: return {"status": True, "message": f"Success generating group: {group}"} else: ret = self.proceed_all_records(table,where,sort,group_range,variables) if ret.get("status") == True: return {"status": True, "message": f"Success generating group: {group}"} except Exception as e: logging.error(f"Error in proceed_group: {str(e)}") return {"status": False, "message": f"Exception in proceed_group : Error generating report: {str(e)}"} def proceed_one_record(self,table:str,where:str,group_range:str,variables: Dict[str, Any]): """1レコードのデータを取得してシートの値を置き換える Args: table: テーブル名 where: WHERE句 group_range: 処理対象範囲 variables: DB接続情報を含む変数辞書 """ try: print(f"make_report.proceed_one_record step-1:table={table},where={where},group_range={group_range}") # まずself.template_sheetの指定範囲のセルをself.current_sheetにコピーする。 self.copy_template_to_current(group_range,group_range) print(f"step-1") cursor = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) # SQLクエリを実行 query = f"SELECT * FROM {table} WHERE {where} LIMIT 1" cursor.execute(query) record = cursor.fetchone() print(f"query={query}") print(f"record={record}") if record: # group_rangeの範囲内のセルを走査 for row in self.current_worksheet: for cell in row: if cell.value and isinstance(cell.value, str): # [field_name]形式の文字列を検索 import re matches = re.findall(r'\[(.*?)\]', cell.value) # マッチした場合、フィールド値で置換 if matches: new_value = cell.value for field_name in matches: if field_name in record: new_value = new_value.replace( f'[{field_name}]', str(record[field_name]) ) cell.value = new_value cursor.close() return {"status": True, "message": f"Success generating group: "} except Exception as e: logging.error(f"Error in proceed_one_record: {str(e)}") return {"status": False, "message": f"Exception in proceed_one_record:Error generating report: {str(e)}"} def get_column_letter(self, cell_reference): """ セル参照(例:'A12')から列文字を取得 """ if isinstance(cell_reference, str): # アルファベット部分を抽出 column = ''.join(c for c in cell_reference if c.isalpha()) if column: return column return 'A' # デフォルト値 def get_row_number(self,cell_reference): """ セル参照(例:'A12')から行番号を取得 """ if isinstance(cell_reference, str): # 数字部分のみを抽出 digits = ''.join(c for c in cell_reference if c.isdigit()) if digits: return int(digits) return int(cell_reference) def proceed_all_records(self, table: str, where: str, sort: str, group_range: str, variables: Dict[str, Any]): """複数レコードを取得してシートの値を置き換える Args: table: テーブル名 where: WHERE句 sort: ORDER BY句 group_range: 処理対象範囲 variables: DB接続情報を含む変数辞書 """ print(f"make_report.proceed_all_record step-1:table={table},where={where},group_range={group_range}") try: # グループ範囲の行数を取得(セル参照対応) if not group_range or ':' not in group_range: raise ValueError(f"Invalid group_range format: {group_range}") # グループ範囲の行数を取得 range_parts = group_range.split(':') logging.info(f"Processing range_parts: {range_parts}") # デバッグ用ログ start_row = self.get_row_number(range_parts[0].strip()) start_col = self.get_column_letter(range_parts[0].strip()) end_row = self.get_row_number(range_parts[1].strip()) end_col = self.get_column_letter(range_parts[1].strip()) if start_row > end_row: raise ValueError(f"Invalid row range: start_row ({start_row}) > end_row ({end_row})") template_rows = end_row - start_row + 1 cursor = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) # SQLクエリを実行 query = f"SELECT * FROM {table} WHERE {where} ORDER BY {sort}" cursor.execute(query) records = cursor.fetchall() print(f"query={query}, records={len(records)}") current_row = start_row for record in records: # テンプレート範囲をコピー self.copy_template_to_current(f"{start_col}{start_row}:{end_col}{end_row}", f"{start_col}{current_row}:{end_col}{current_row + template_rows - 1}" ) # コピーした範囲内のセルを走査して値を置換 for row in range(current_row, current_row + template_rows): for cell in self.current_worksheet[row]: if cell.value and isinstance(cell.value, str): # [field_name]形式の文字列を検索 import re matches = re.findall(r'\[(.*?)\]', cell.value) # マッチした場合、フィールド値で置換 if matches: new_value = cell.value for field_name in matches: if field_name in record: new_value = new_value.replace( f'[{field_name}]', str(record[field_name]) ) cell.value = new_value current_row += template_rows cursor.close() self.conn.close() return {"status": True, "message": "Success processing all records"} except Exception as e: logging.error(f"Error in proceed_all_record: {str(e)}") return {"status": False, "message": f"Exception in proceed_all_record:Error processing records: {str(e)}"} def copy_template_to_current(self, orig_range, target_range): try: print(f"orig_rage={orig_range},target_range={target_range}") # 範囲をパースする orig_min_col, orig_min_row, orig_max_col, orig_max_row = range_boundaries(orig_range) target_min_col, target_min_row, target_max_col, target_max_row = range_boundaries(target_range) print(f"min_col, min_row, max_col, max_row = {orig_min_col}, {orig_min_row}, {orig_max_col}, {orig_max_row}") print(f"min_col, min_row, max_col, max_row = {target_min_col}, {target_min_row}, {target_max_col}, {target_max_row}") # Get template sheet name from ini file section_config = self.conf.get_section(self.section_list[0]) # 現在の処理中のセクション template_sheet_name = section_config.get("template_sheet") if not template_sheet_name: raise ValueError("Template sheet name not found in configuration") # Create new sheet with template name if it doesn't exist if template_sheet_name not in self.workbook.sheetnames: self.current_sheet = self.workbook.create_sheet(template_sheet_name) else: self.current_sheet = self.workbook[template_sheet_name] # Remove default sheet if it exists if 'Sheet' in self.workbook.sheetnames: del self.workbook['Sheet'] # Copy column widths for col in range(orig_min_col, orig_max_col + 1): col_letter = get_column_letter(col) if col_letter in self.template_sheet.column_dimensions: self.current_sheet.column_dimensions[col_letter].width = \ self.template_sheet.column_dimensions[col_letter].width # Copy row heights for row in range(orig_min_row, orig_max_row + 1): target_row = row - orig_min_row + target_min_row if row in self.template_sheet.row_dimensions: source_height = self.template_sheet.row_dimensions[row].height if source_height is not None: if target_row not in self.current_sheet.row_dimensions: self.current_sheet.row_dimensions[target_row] = openpyxl.worksheet.dimensions.RowDimension(target_row) self.current_sheet.row_dimensions[target_row].height = source_height # Copy merged cells for merged_range in self.template_sheet.merged_cells: min_col, min_row, max_col, max_row = range_boundaries(str(merged_range)) # Check if merge range intersects with our copy range if (min_col >= orig_min_col and max_col <= orig_max_col and min_row >= orig_min_row and max_row <= orig_max_row): # Calculate target merge range target_merge_min_row = target_min_row + (min_row - orig_min_row) target_merge_max_row = target_min_row + (max_row - orig_min_row) target_merge_range = f"{get_column_letter(min_col)}{target_merge_min_row}:" \ f"{get_column_letter(max_col)}{target_merge_max_row}" self.current_sheet.merge_cells(target_merge_range) # Copy cell contents and styles row_offset = target_min_row - orig_min_row for row in range(orig_min_row, orig_max_row + 1): for col in range(orig_min_col, orig_max_col + 1): source_cell = self.template_sheet.cell(row=row, column=col) target_cell = self.current_sheet.cell(row=row+row_offset, column=col) # Copy value target_cell.value = source_cell.value # Copy styles if source_cell.has_style: target_cell.font = copy(source_cell.font) target_cell.border = copy(source_cell.border) target_cell.fill = copy(source_cell.fill) target_cell.number_format = source_cell.number_format target_cell.protection = copy(source_cell.protection) target_cell.alignment = copy(source_cell.alignment) # Copy page setup target_page_setup = self.current_sheet.page_setup source_page_setup = self.template_sheet.page_setup # Copy supported page setup attributes copyable_attrs = [ 'paperSize', 'orientation', 'fitToHeight', 'fitToWidth', 'scale' ] for attr in copyable_attrs: try: if hasattr(source_page_setup, attr): setattr(target_page_setup, attr, getattr(source_page_setup, attr)) except Exception as e: logging.warning(f"Could not copy page setup attribute {attr}: {str(e)}") # Copy margins target_margins = self.current_sheet.page_margins source_margins = self.template_sheet.page_margins margin_attrs = ['left', 'right', 'top', 'bottom', 'header', 'footer'] for attr in margin_attrs: try: if hasattr(source_margins, attr): setattr(target_margins, attr, getattr(source_margins, attr)) except Exception as e: logging.warning(f"Could not copy margin attribute {attr}: {str(e)}") # Copy print options target_print = self.current_sheet.print_options source_print = self.template_sheet.print_options print_attrs = [ 'horizontalCentered', 'verticalCentered', 'gridLines', 'gridLinesSet' ] for attr in print_attrs: try: if hasattr(source_print, attr): setattr(target_print, attr, getattr(source_print, attr)) except Exception as e: logging.warning(f"Could not copy print option {attr}: {str(e)}") return {"status": True, "message": "Successfully copied template range"} except Exception as e: logging.error(f"Error in copy_template_to_current: {str(e)}") return {"status": False, "message": f"Exception in copy_template_to_current: {str(e)}"} # Style operations def apply_style( self, cell_range: str, font: Dict[str, Any] = None, fill: Dict[str, Any] = None, border: Dict[str, Any] = None, alignment: Dict[str, Any] = None ) -> None: """Apply styles to cell range""" for row in self.current_sheet[cell_range]: for cell in row: if font: cell.font = self._style_manager.create_font(**font) if fill: cell.fill = self._style_manager.create_fill(**fill) if border: cell.border = self._style_manager.create_border(**border) if alignment: cell.alignment = self._style_manager.create_alignment(**alignment) # Merge operations def merge_range( self, start_row: int, start_col: int, end_row: int, end_col: int ) -> None: """Merge cell range""" self._merge_manager.merge_cells(start_row, start_col, end_row, end_col) # Image operations def add_image( self, image_path: Union[str, Path], position: Tuple[int, int], size: Optional[Tuple[int, int]] = None ) -> None: """Add image to worksheet""" self._image_manager.add_image(image_path, position, size) # Conditional formatting def add_conditional_format( self, cell_range: str, format_type: str, **kwargs ) -> None: """Add conditional formatting""" if format_type == 'color_scale': self._conditional_manager.add_color_scale(cell_range, **kwargs) elif format_type == 'data_bar': self._conditional_manager.add_data_bar(cell_range, **kwargs) elif format_type == 'icon_set': self._conditional_manager.add_icon_set(cell_range, **kwargs) elif format_type == 'custom': self._conditional_manager.add_custom_rule(cell_range, **kwargs) # Page setup def setup_page( self, orientation: str = 'portrait', paper_size: int = PaperSizes.A4, margins: Dict[str, float] = None, header_footer: Dict[str, Any] = None ) -> None: """Configure page setup""" self._page_manager.set_page_setup( orientation=orientation, paper_size=paper_size ) if margins: self._page_manager.set_margins(**margins) if header_footer: self._page_manager.set_header_footer(**header_footer) def cleanup(self) -> None: """Cleanup temporary files""" if self._image_manager: self._image_manager.cleanup() def __del__(self): """Destructor""" self.cleanup()