basic debugging step 1

This commit is contained in:
2024-11-05 07:46:21 +09:00
parent d6464c1369
commit 0c2dfec7dd
8 changed files with 499 additions and 194 deletions

View File

@ -10,183 +10,360 @@ import shutil
from datetime import datetime
from typing import Optional, Dict, Any, Union, Tuple
from pathlib import Path
# psycopg2のインポートを追加
import psycopg2
import psycopg2.extras
from .config_handler import ConfigHandler # ini file のロード
from .styles import StyleManager
from .merge import MergeManager
from .image import ImageManager
from .conditional import ConditionalFormatManager
from .page import PageManager, PaperSizes
import logging
class SumasenExcel:
"""Enhanced Excel handling class with extended functionality"""
def __init__(self, debug: bool = False):
self.debug = debug
def __init__(self, document: str, variables: Dict[str, Any],
docbase: str = "./docbase") -> Dict[str, str]:
"""Initialize Excel document with basic settings
Args:
document: Document name
variables: Variables for document
docbase: Base directory for documents (default: "./docbase")
Returns:
Dict with status (true/false) and optional error message
"""
self.debug = True
self.workbook = None
self.template_filepath = None
self.output_filepath = None
self.current_sheet = None
self.dbname = None
self.user = None
self.password = None
self.host = None
self.port = None
self._style_manager = None
self._merge_manager = None
self._image_manager = None
self._conditional_manager = None
self._page_manager = None
def init(self, username: str, project_id: str, document: str,
lang: str = "jp", docbase: str = "./docbase") -> Dict[str, str]:
"""Initialize Excel document with basic settings
Args:
username: User name for file operations
project_id: Project identifier
document: Document name
lang: Language code (default: "jp")
docbase: Base directory for documents (default: "./docbase")
Returns:
Dict with status ("ACK"/"NCK") and optional error message
"""
try:
self.username = username
self.project_id = project_id
self.language = lang
# Setup directory structure
# document base を設定
self.docpath = docbase
self.docpath2 = f"{docbase}/{project_id}"
if not os.path.exists(docbase):
logging.error(f"Document base directory not found: {docbase}")
# ini fileをロード
self.ini_file_path = f"{docbase}/{document}.ini"
self.conf = ConfigHandler(self.ini_file_path, variables)
if not os.path.exists(self.ini_file_path):
logging.error(f"INI file not found: {self.ini_file_path}")
# Create directories if they don't exist
for path in [docbase, self.docpath2]:
if not os.path.exists(path):
os.makedirs(path, mode=0o755)
# basic section をロード
basic = self.conf.get_section("basic")
if not basic:
logging.error(f"Basic section not found in INI file: {self.ini_file_path}")
# Load template
inifile = f"{document}.ini"
self.inifilepath = f"{self.docpath2}/{inifile}"
self.basic = basic
if not os.path.exists(self.inifilepath):
return {"status": "NCK", "message": f"INI file not found: {self.inifilepath}"}
# basicセクションから必要なパラメータを取得
template_file = basic.get("template_file")
if not template_file:
logging.error("template_file not found in basic section")
# Load template workbook
template_file = self._get_ini_param("basic", f"templatefile_{lang}")
self.template_filepath = f"{self.docpath2}/{template_file}"
doc_file = basic.get("doc_file")
if not doc_file:
logging.error("doc_file not found in basic section")
self.maxcol = basic.get("maxcol")
if not self.maxcol:
self.maxcol = 100 # デフォルト値を設定
logging.warning("maxcol not found in basic section. Defaulting to 100")
else:
self.maxcol = int(self.maxcol)
sections = basic.get("sections")
if not sections:
logging.error("sections not found in basic section")
# セクションをリストに変換
self.section_list = [s.strip() for s in sections.split(",")]
if not self.section_list:
logging.error("sections not found in basic section")
# 出力ファイルパスを設定
self.output_filepath = f"{self.docpath}/{doc_file}"
# 新規のExcelワークブックを作成
self.workbook = openpyxl.Workbook()
# デフォルトで作成されるシートを削除
self.workbook.remove(self.workbook.active)
# テンプレートワークブックをロード
self.template_filepath = f"{self.docpath}/{template_file}"
if not os.path.exists(self.template_filepath):
# Copy from default if not exists
default_template = f"{self.docpath}/{template_file}"
shutil.copy2(default_template, self.template_filepath)
logging.error(f"Template file not found: {self.template_filepath}")
self.workbook = openpyxl.load_workbook(self.template_filepath)
return {"status": "ACK"}
self.template_workbook = openpyxl.load_workbook(self.template_filepath)
self.template_sheet = self.template_workbook.active
except Exception as e:
return {"status": "NCK", "message": str(e)}
logging.error(f"Error initializing Excel document: {str(e)}")
def make_report(self,variables: Dict[str, Any]):
"""レポートを生成する"""
logging.info("make_report step-1")
try:
ret_status = True
# セクションごとに処理を実行(ワークシート単位)
for section in self.section_list:
ret = self.proceed_section(section,variables)
if ret["status"] == False:
message = ret.get("message", "No message provided")
return {"status": False, "message": f"Fail generating section: {section}...{message}"}
def _get_ini_param(self, section: str, param: str) -> Optional[str]:
"""Get parameter from INI file
# 生成したワークブックを保存
self.workbook.save(self.output_filepath)
return {"status": True, "message": f"Report generated successfully : {self.output_filepath}", "filepath":self.output_filepath}
Args:
section: INI file section
param: Parameter name
except Exception as e:
return {"status": False, "message": f"Exception in make_report: Error generating report: {str(e)}"}
def proceed_section(self, section: str, variables: Dict[str, Any]):
logging.info(f"make_report.proceed_section step-1:section={section}")
Returns:
Parameter value or None if not found
"""
try:
# Use configparser to handle INI files
import configparser
config = configparser.ConfigParser()
config.read(self.inifilepath)
return config[section][param]
except:
return None
# セクションの設定を取得
section_config = self.conf.get_section(section)
# セクションが存在しない場合はスキップ
if not section_config:
return {"status": False, "message": f"Error no section found: {section}"}
def make_report(self, db, data_rec: Dict[str, Any],
out_filename: Optional[str] = None,
screen_index: int = 0) -> None:
"""Generate Excel report from template
# テンプレートシートをコピー
template_sheet_name = section_config.get("template_sheet")
if not template_sheet_name or template_sheet_name not in self.template_workbook.sheetnames:
return {"status": False, "message": f"Error no template sheet found: {template_sheet_name}"}
# シートの名前を設定
new_sheet = self.workbook.create_sheet(title=section_config.get("sheet_name", section))
self.worksheet = new_sheet
self.dbname=variables.get('db'),
self.user=variables.get('username'),
self.password=variables.get('password'),
self.host=variables.get('host'),
self.port=variables.get('port')
if not self.dbname or not self.user or not self.password or not self.host or not self.port:
return {"status": False, "message": f"Error no database connection information"}
# PostgreSQLに接続
self.conn = psycopg2.connect(
self.dbname,
self.user,
self.password,
self.host,
self.port
)
self._style_manager = StyleManager()
self._merge_manager = MergeManager(self.current_sheet)
self._image_manager = ImageManager(self.current_sheet)
self._conditional_manager = ConditionalFormatManager(self.current_sheet)
self._page_manager = PageManager(self.current_sheet)
# シートの幅を設定
fit_to_width = section_config.get("fit_to_width")
if fit_to_width:
new_sheet.sheet_view.zoomScaleNormal = float(fit_to_width)
# シートの向きを設定
orientation = section_config.get("orientation")
new_sheet.sheet_view.orientation = orientation if orientation else "portrait"
self.current_worksheet = new_sheet
# グループ定義を取得
groups = section_config.get("groups")
if not groups:
return {"status": False, "message": f"Error no group definitions found: {section}"}
# グループをリストに変換
group_list = [g.strip() for g in groups.split(",")]
if not group_list:
return {"status": False, "message": f"Error invalid group definitions found: {section}"}
# 各グループの設定を取得
for group in group_list:
ret = self.proceed_group(group,variables)
if ret["status"] == False:
return ret
return {"status": True, "message": f"Success generating section: {section}"}
except Exception as e:
return {"status": False, "message": f"Exception in proceed_section : Error generating report: {str(e)}"}
def proceed_group(self,group:str,variables: Dict[str, Any]):
logging.info(f"make_report.proceed_group step-1:section={group}")
try:
group_config = self.conf.get_section(group)
if not group_config:
return {"status": False, "message": f"Error no group section found: {group}"}
# グループの処理パラメータを取得
group_range = group_config.get("group_range")
table = group_config.get("table")
where = group_config.get("where")
if not where or not table or not group_range:
return {"status": False, "message": f"Error invalid group parameters: {group_config}"}
sort = group_config.get("sort")
if not sort:
ret = self.proceed_one_record(table,where,group_range,variables)
if ret.status == True:
return {"status": True, "message": f"Success generating group: {group}"}
else:
ret = self.proceed_all_records(table,where,sort,group_range,variables)
if ret.status == True:
return {"status": True, "message": f"Success generating group: {group}"}
except Exception as e:
logging.error(f"Error in proceed_group: {str(e)}")
return {"status": False, "message": f"Exception in proceed_group : Error generating report: {str(e)}"}
def proceed_one_record(self,table:str,where:str,group_range:str,variables: Dict[str, Any]):
"""1レコードのデータを取得してシートの値を置き換える
Args:
db: Database connection
data_rec: Data records to populate report
out_filename: Optional output filename
screen_index: Screen index for multi-screen reports
"""
# Get output filename
if out_filename:
outfile = f"{out_filename}_{self._get_ini_param('basic', 'doc_file')}"
else:
outfile = self._get_ini_param('basic', 'doc_file')
table: テーブル名
where: WHERE句
group_range: 処理対象範囲
variables: DB接続情報を含む変数辞書
"""
logging.info(f"make_report.proceed_one_record step-1:table={table},where={where},group_range={group_range}")
self.output_filepath = f"{self.docpath2}/{outfile}"
try:
# まずself.template_sheetの指定範囲のセルをself.current_sheetにコピーする。
self.copy_template_to_current(self.range,self.range)
# Process sections
sections = self._get_ini_param('basic', 'sections')
if not sections:
return
for section in sections.split(','):
self._process_section(section, db, data_rec, screen_index)
# Save workbook
self.workbook.save(self.output_filepath)
def _process_section(self, section: str, db, data_rec: Dict[str, Any],
screen_index: int) -> None:
"""Process individual section of report
cursor = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
# SQLクエリを実行
query = f"SELECT * FROM {table} WHERE {where} LIMIT 1"
cursor.execute(query)
record = cursor.fetchone()
if record:
# group_rangeの範囲内のセルを走査
for row in self.current_worksheet:
for cell in row:
if cell.value and isinstance(cell.value, str):
# [field_name]形式の文字列を検索
import re
matches = re.findall(r'\[(.*?)\]', cell.value)
# マッチした場合、フィールド値で置換
if matches:
new_value = cell.value
for field_name in matches:
if field_name in record:
new_value = new_value.replace(
f'[{field_name}]',
str(record[field_name])
)
cell.value = new_value
cursor.close()
self.conn.close()
return {"status": True, "message": f"Success generating group: "}
except Exception as e:
logging.error(f"Error in proceed_one_record: {str(e)}")
return {"status": False, "message": f"Exception in proceed_one_record:Error generating report: {str(e)}"}
def proceed_all_record(self, table: str, where: str, sort: str, group_range: str, variables: Dict[str, Any]):
"""複数レコードを取得してシートの値を置き換える
Args:
section: Section name
db: Database connection
data_rec: Data records
screen_index: Screen index
"""
# Get template sheet
sheet_orig = self._get_ini_param(section, 'sheet')
sheet_name = self._get_ini_param(section, f"sheetname_{self.language}")
table: テーブル名
where: WHERE句
sort: ORDER BY句
group_range: 処理対象範囲
variables: DB接続情報を含む変数辞書
"""
logging.info(f"make_report.proceed_all_record step-1:table={table},where={where},group_range={group_range}")
if not sheet_orig or not sheet_name:
return
try:
# グループ範囲の行数を取得
start_row, end_row = map(int, group_range.split(':')[0].split(','))
template_rows = end_row - start_row + 1
# Copy template sheet
template_sheet = self.workbook[sheet_orig]
new_sheet = self.workbook.copy_worksheet(template_sheet)
new_sheet.title = sheet_name
cursor = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
# SQLクエリを実行
query = f"SELECT * FROM {table} WHERE {where} ORDER BY {sort}"
cursor.execute(query)
records = cursor.fetchall()
current_row = start_row
for record in records:
# テンプレート範囲をコピー
self.copy_template_to_current(
f"{start_row},{end_row}",
f"{current_row},{current_row + template_rows - 1}"
)
# コピーした範囲内のセルを走査して値を置換
for row in range(current_row, current_row + template_rows):
for cell in self.current_worksheet[row]:
if cell.value and isinstance(cell.value, str):
# [field_name]形式の文字列を検索
import re
matches = re.findall(r'\[(.*?)\]', cell.value)
# マッチした場合、フィールド値で置換
if matches:
new_value = cell.value
for field_name in matches:
if field_name in record:
new_value = new_value.replace(
f'[{field_name}]',
str(record[field_name])
)
cell.value = new_value
current_row += template_rows
cursor.close()
self.conn.close()
return {"status": True, "message": "Success processing all records"}
except Exception as e:
logging.error(f"Error in proceed_all_record: {str(e)}")
return {"status": False, "message": f"Exception in proceed_all_record:Error processing records: {str(e)}"}
# Process groups
groups = self._get_ini_param(section, 'groups')
if groups:
for group in groups.split(','):
self._process_group(new_sheet, section, group, db, data_rec, screen_index)
def _process_group(self, sheet, section: str, group: str,
db, data_rec: Dict[str, Any], screen_index: int) -> None:
"""Process group within section
Args:
sheet: Worksheet to process
section: Section name
group: Group name
db: Database connection
data_rec: Data records
screen_index: Screen index
"""
pass # Implementation details will follow
def copy_template_to_current(self,range,target_range):
# テンプレートシートから現在のシートにデータをコピー
source_range = self.template_sheet.range(range) # コピーする範囲を指定
target_range = self.current_sheet.range(target_range) # 貼り付け先の範囲を指定
# 値、数式、フォーマットをコピー
source_range.copy(target_range)
def init_sheet(self, sheet_name: str) -> None:
"""Initialize worksheet and managers"""
self.current_sheet = self.workbook[sheet_name]
self._style_manager = StyleManager()
self._merge_manager = MergeManager(self.current_sheet)
self._image_manager = ImageManager(self.current_sheet)
self._conditional_manager = ConditionalFormatManager(self.current_sheet)
self._page_manager = PageManager(self.current_sheet)
# Style operations
def apply_style(
self,