1095 lines
47 KiB
Python
1095 lines
47 KiB
Python
# sumaexcel/excel.py
|
||
|
||
import openpyxl
|
||
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
|
||
from openpyxl.utils import get_column_letter
|
||
import pandas as pd
|
||
from typing import Optional, Dict, List, Union, Any
|
||
import os
|
||
import shutil
|
||
from datetime import datetime
|
||
from typing import Optional, Dict, Any, Union, Tuple
|
||
from pathlib import Path
|
||
# psycopg2のインポートを追加
|
||
import psycopg2
|
||
import psycopg2.extras
|
||
from copy import copy
|
||
from openpyxl.utils import range_boundaries
|
||
|
||
from .config_handler import ConfigHandler # ini file のロード
|
||
|
||
#from .styles import StyleManager
|
||
#from .merge import MergeManager
|
||
#from .image import ImageManager
|
||
#from .conditional import ConditionalFormatManager
|
||
from .page import PageManager, PaperSizes
|
||
|
||
from datetime import datetime
|
||
import pytz
|
||
from urllib.parse import urlparse
|
||
import requests
|
||
from io import BytesIO
|
||
from PIL import Image
|
||
import re
|
||
from django.http import HttpResponse
|
||
import uuid
|
||
import unicodedata
|
||
import tempfile
|
||
import os
|
||
import logging
|
||
from openpyxl.utils import get_column_letter
|
||
from openpyxl.drawing.image import Image as XLImage
|
||
from openpyxl.drawing.spreadsheet_drawing import OneCellAnchor, AnchorMarker
|
||
from openpyxl.utils.units import pixels_to_EMU
|
||
from PIL import Image
|
||
|
||
import logging
|
||
|
||
logging.getLogger('PIL').setLevel(logging.WARNING)
|
||
logging.getLogger('openpyxl').setLevel(logging.WARNING)
|
||
logging.getLogger('PIL.PngImagePlugin').setLevel(logging.WARNING)
|
||
logging.getLogger('PIL.TiffImagePlugin').setLevel(logging.WARNING)
|
||
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO, # INFOレベル以上のログを表示
|
||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||
)
|
||
|
||
class SumasenExcel:
|
||
"""Enhanced Excel handling class with extended functionality"""
|
||
|
||
def __init__(self, document: str, variables: Dict[str, Any],
|
||
docbase: str = "./docbase") -> Dict[str, str]:
|
||
|
||
"""Initialize Excel document with basic settings
|
||
|
||
Args:
|
||
document: Document name
|
||
variables: Variables for document
|
||
docbase: Base directory for documents (default: "./docbase")
|
||
|
||
Returns:
|
||
Dict with status (true/false) and optional error message
|
||
|
||
"""
|
||
self.debug = True
|
||
|
||
self.workbook = None
|
||
self.template_filepath = None
|
||
self.output_filepath = None
|
||
self.current_sheet = None
|
||
|
||
self.dbname = None
|
||
self.user = None
|
||
self.password = None
|
||
self.host = None
|
||
self.port = None
|
||
|
||
self._style_manager = None
|
||
self._merge_manager = None
|
||
self._image_manager = None
|
||
self._conditional_manager = None
|
||
self._page_manager = None
|
||
|
||
try:
|
||
logging.info("step-1")
|
||
|
||
# document base を設定
|
||
self.docpath = docbase
|
||
if not os.path.exists(docbase):
|
||
logging.error(f"Document base directory not found: {docbase}")
|
||
|
||
# ini fileをロード
|
||
self.ini_file_path = f"{docbase}/{document}.ini"
|
||
self.conf = ConfigHandler(self.ini_file_path, variables)
|
||
if not os.path.exists(self.ini_file_path):
|
||
logging.error(f"INI file not found: {self.ini_file_path}")
|
||
|
||
# basic section をロード
|
||
basic = self.conf.get_section("basic")
|
||
if not basic:
|
||
logging.error(f"Basic section not found in INI file: {self.ini_file_path}")
|
||
|
||
self.basic = basic
|
||
|
||
self.output_path = basic.get("output_path")
|
||
if not os.path.exists(self.output_path):
|
||
os.makedirs(self.output_path, exist_ok=True)
|
||
|
||
logging.info("step-2")
|
||
|
||
# basicセクションから必要なパラメータを取得
|
||
template_file = basic.get("template_file")
|
||
if not template_file:
|
||
logging.error("template_file not found in basic section")
|
||
|
||
doc_file = basic.get("doc_file")
|
||
if not doc_file:
|
||
logging.error("doc_file not found in basic section")
|
||
|
||
self.maxcol = basic.get("maxcol")
|
||
if not self.maxcol:
|
||
self.maxcol = 100 # デフォルト値を設定
|
||
logging.warning("maxcol not found in basic section. Defaulting to 100")
|
||
else:
|
||
self.maxcol = int(self.maxcol)
|
||
|
||
sections = basic.get("sections")
|
||
if not sections:
|
||
logging.error("sections not found in basic section")
|
||
|
||
logging.info("step-3")
|
||
|
||
# セクションをリストに変換
|
||
self.section_list = [s.strip() for s in sections.split(",")]
|
||
if not self.section_list:
|
||
logging.error("sections not found in basic section")
|
||
|
||
# 出力ファイルパスを設定
|
||
self.output_filepath = f"{self.output_path}/{doc_file}"
|
||
# 新規のExcelワークブックを作成
|
||
self.workbook = openpyxl.Workbook()
|
||
# デフォルトで作成されるシートを削除
|
||
self.workbook.remove(self.workbook.active)
|
||
|
||
logging.info("step-4")
|
||
|
||
# テンプレートワークブックをロード
|
||
self.template_filepath = f"{self.docpath}/{template_file}"
|
||
if not os.path.exists(self.template_filepath):
|
||
logging.error(f"Template file not found: {self.template_filepath}")
|
||
|
||
logging.info("step-5")
|
||
|
||
self.template_workbook = openpyxl.load_workbook(self.template_filepath)
|
||
self.template_sheet = self.template_workbook.active
|
||
|
||
except Exception as e:
|
||
logging.error(f"Error initializing Excel document: {str(e)}")
|
||
|
||
def make_report(self,variables: Dict[str, Any]):
|
||
"""レポートを生成する"""
|
||
logging.info("make_report step-1")
|
||
try:
|
||
ret_status = True
|
||
# セクションごとに処理を実行(ワークシート単位)
|
||
for section in self.section_list:
|
||
ret = self.proceed_section(section,variables)
|
||
if ret["status"] == False:
|
||
message = ret.get("message", "No message provided")
|
||
return {"status": False, "message": f"Fail generating section: {section}...{message}"}
|
||
|
||
# 生成したワークブックを保存
|
||
self.workbook.save(self.output_filepath)
|
||
return {"status": True, "message": f"Report generated successfully : {self.output_filepath}", "filepath":self.output_filepath}
|
||
|
||
except Exception as e:
|
||
return {"status": False, "message": f"Exception in make_report: Error generating report: {str(e)}"}
|
||
|
||
def proceed_section(self, section: str, variables: Dict[str, Any]):
|
||
print(f"make_report.proceed_section step-1:section={section}")
|
||
|
||
try:
|
||
# セクションの設定を取得
|
||
section_config = self.conf.get_section(section)
|
||
# セクションが存在しない場合はスキップ
|
||
if not section_config:
|
||
return {"status": False, "message": f"Error no section found: {section}"}
|
||
|
||
# テンプレートシートをコピー
|
||
template_sheet_name = section_config.get("template_sheet")
|
||
if not template_sheet_name or template_sheet_name not in self.template_workbook.sheetnames:
|
||
return {"status": False, "message": f"Error no template sheet found: {template_sheet_name}"}
|
||
|
||
# シートの名前を設定
|
||
new_sheet_name = section_config.get("sheet_name", section)
|
||
# Create new sheet with template name if it doesn't exist
|
||
if new_sheet_name not in self.workbook.sheetnames:
|
||
self.current_sheet = self.workbook.create_sheet(new_sheet_name)
|
||
else:
|
||
self.current_sheet = self.workbook[new_sheet_name]
|
||
|
||
|
||
# Remove default sheet if it exists
|
||
if 'Sheet' in self.workbook.sheetnames:
|
||
del self.workbook['Sheet']
|
||
|
||
self.dbname=variables.get('db')
|
||
self.user=variables.get('username')
|
||
self.password=variables.get('password')
|
||
self.host=variables.get('host','postgres')
|
||
self.port=variables.get('port','5432')
|
||
if not self.dbname or not self.user or not self.password or not self.host or not self.port:
|
||
return {"status": False, "message": f"Error no database connection information"}
|
||
|
||
print(f"db={self.dbname},user={self.user},pass={self.password},host={self.host},port={self.port}")
|
||
# PostgreSQLに接続
|
||
self.conn = psycopg2.connect(
|
||
dbname=self.dbname,
|
||
user=self.user,
|
||
password=self.password,
|
||
host=self.host,
|
||
port=self.port
|
||
)
|
||
|
||
#self._style_manager = StyleManager()
|
||
#self._merge_manager = MergeManager(self.current_sheet)
|
||
#self._image_manager = ImageManager(self.current_sheet)
|
||
#self._conditional_manager = ConditionalFormatManager(self.current_sheet)
|
||
#self._page_manager = PageManager(self.current_sheet)
|
||
|
||
# シートの幅を設定
|
||
fit_to_width = section_config.get("fit_to_width")
|
||
if fit_to_width:
|
||
self.current_sheet.sheet_view.zoomScaleNormal = float(fit_to_width)
|
||
|
||
# シートの向きを設定
|
||
orientation = section_config.get("orientation")
|
||
self.current_sheet.sheet_view.orientation = orientation if orientation else "portrait"
|
||
|
||
max_col = self.basic.get("maxcol",20)
|
||
if not max_col:
|
||
return {"status": False, "message": f"Error no maxcol found: basic"}
|
||
#self.set_column_width(1,int(max_col))
|
||
self.set_column_width_from_config(self.current_sheet)
|
||
|
||
# グループ定義を取得
|
||
groups = section_config.get("groups")
|
||
if not groups:
|
||
return {"status": False, "message": f"Error no group definitions found: {section}"}
|
||
|
||
# グループをリストに変換
|
||
group_list = [g.strip() for g in groups.split(",")]
|
||
if not group_list:
|
||
return {"status": False, "message": f"Error invalid group definitions found: {section}"}
|
||
|
||
# 各グループの設定を取得
|
||
for group in group_list:
|
||
section_group = f"{section}.{group}"
|
||
ret = self.proceed_group(section_group,variables)
|
||
if ret["status"] == False:
|
||
return ret
|
||
|
||
return {"status": True, "message": f"Success generating section: {section}"}
|
||
|
||
except Exception as e:
|
||
return {"status": False, "message": f"Exception in proceed_section : Error generating report: {str(e)}"}
|
||
|
||
def proceed_group(self,group:str,variables: Dict[str, Any]):
|
||
logging.info(f"make_report.proceed_group step-1:section={group}")
|
||
|
||
try:
|
||
group_config = self.conf.get_section(group)
|
||
if not group_config:
|
||
return {"status": False, "message": f"Error no group section found: {group}"}
|
||
|
||
# グループの処理パラメータを取得
|
||
group_range = group_config.get("group_range")
|
||
table = group_config.get("table_name")
|
||
where = group_config.get("where")
|
||
if not where or not table or not group_range:
|
||
return {"status": False, "message": f"Error invalid group parameters: {group_config}"}
|
||
|
||
sort = group_config.get("sort")
|
||
if not sort:
|
||
ret = self.proceed_one_record(table,where,group_range,variables)
|
||
if ret.get("status") == True:
|
||
return {"status": True, "message": f"Success generating group: {group}"}
|
||
else:
|
||
ret = self.proceed_all_records(table,where,sort,group_range,variables)
|
||
if ret.get("status") == True:
|
||
return {"status": True, "message": f"Success generating group: {group}"}
|
||
|
||
except Exception as e:
|
||
logging.error(f"Error in proceed_group: {str(e)}")
|
||
return {"status": False, "message": f"Exception in proceed_group : Error generating report: {str(e)}"}
|
||
|
||
def pixels_to_EMU(pixels):
|
||
"""
|
||
Convert pixels to EMU (English Metric Units)
|
||
EMU = pixels * 9525
|
||
"""
|
||
return int(pixels * 9525)
|
||
|
||
def format_cell_value(self, field_value, cell):
|
||
"""セルの値を適切な形式に変換する
|
||
|
||
Args:
|
||
field_value: DBから取得した値
|
||
cell: 対象のExcelセル
|
||
|
||
Returns:
|
||
変換後の値
|
||
"""
|
||
# Noneの場合は空文字を返す
|
||
if field_value is None:
|
||
return ""
|
||
|
||
# 真偽値の場合
|
||
if isinstance(field_value, bool):
|
||
return "OK" if field_value else "-"
|
||
|
||
# 日時型の場合
|
||
if isinstance(field_value, datetime):
|
||
jst = pytz.timezone('Asia/Tokyo')
|
||
# UTC -> JST変換
|
||
jst_time = field_value.astimezone(jst)
|
||
return jst_time.strftime('%H:%M:%S')
|
||
|
||
# 文字列の場合の処理
|
||
if isinstance(field_value, str):
|
||
try:
|
||
# 画像ファイルパスを取得する関数
|
||
def get_image_path(value):
|
||
# URLの場合
|
||
if value.startswith('https://'):
|
||
# URLからファイル名を抽出
|
||
filename = os.path.basename(urlparse(value).path)
|
||
return os.path.join("/app/media/compressed", filename)
|
||
# checkinパスの場合
|
||
elif value.startswith('checkin/'):
|
||
return os.path.join("/app/media", value)
|
||
# ファイル名のみの場合
|
||
elif re.search(r'\.(jpg|jpeg|png|gif|bmp|mpo)$', value, re.I):
|
||
return os.path.join("/app/media/compressed", value)
|
||
return None
|
||
|
||
# 画像パスを取得
|
||
image_path = get_image_path(field_value)
|
||
|
||
if image_path and os.path.exists(image_path):
|
||
try:
|
||
from PIL import Image
|
||
import io
|
||
|
||
# 画像を開く
|
||
with Image.open(image_path) as img:
|
||
# RGBAの場合はRGBに変換
|
||
if img.mode in ('RGBA', 'LA'):
|
||
background = Image.new('RGB', img.size, (255, 255, 255))
|
||
background.paste(img, mask=img.split()[-1])
|
||
img = background
|
||
elif img.mode not in ('RGB', 'L'):
|
||
img = img.convert('RGB')
|
||
|
||
# ワークシートを取得
|
||
worksheet = cell.parent
|
||
|
||
logging.info('step-1')
|
||
try:
|
||
# 列の幅を取得(文字単位からピクセルに変換)
|
||
column_letter = get_column_letter(cell.column)
|
||
column_width = worksheet.column_dimensions[column_letter].width
|
||
cell_width = int((column_width or 8.43) * 7.5) # 8.43は標準の文字幅
|
||
except Exception:
|
||
cell_width = 100 # デフォルト値
|
||
|
||
logging.info('step-2')
|
||
try:
|
||
# 行の高さを取得(ポイント単位からピクセルに変換)
|
||
row_height = worksheet.row_dimensions[cell.row].height
|
||
cell_height = int((row_height or 15) * 1.33) # 15は標準の行の高さ
|
||
except Exception:
|
||
cell_height = 20 # デフォルト値
|
||
|
||
logging.info('step-3')
|
||
# マージンの設定(ピクセル単位)
|
||
margin_horizontal = 4 # 左右のマージン
|
||
margin_vertical = 2 # 上下のマージン
|
||
|
||
# マージンを考慮したセルの実効サイズを計算
|
||
effective_cell_width = cell_width - (margin_horizontal * 2)
|
||
effective_cell_height = cell_height - (margin_vertical * 2)
|
||
|
||
logging.info('step-4')
|
||
# 最小サイズを設定(マージンを考慮)
|
||
effective_cell_width = max(effective_cell_width, 92) # 100 - (4 * 2)
|
||
effective_cell_height = max(effective_cell_height, 16) # 20 - (2 * 2)
|
||
|
||
# 最大サイズを設定(マージンを考慮)
|
||
max_width = 792 # 800 - (4 * 2)
|
||
max_height = 596 # 600 - (2 * 2)
|
||
effective_cell_width = min(effective_cell_width, max_width)
|
||
effective_cell_height = min(effective_cell_height, max_height)
|
||
|
||
logging.info('step-5')
|
||
# アスペクト比を保持しながらリサイズ
|
||
img_width, img_height = img.size
|
||
img_aspect = img_width / img_height
|
||
cell_aspect = effective_cell_width / effective_cell_height
|
||
|
||
if img_aspect > cell_aspect:
|
||
width = effective_cell_width
|
||
height = int(width / img_aspect)
|
||
else:
|
||
height = effective_cell_height
|
||
width = int(height * img_aspect)
|
||
|
||
# 画像処理部分の修正
|
||
#from openpyxl.utils.units import pixels_to_EMU
|
||
|
||
logging.info('step-6')
|
||
# 画像をリサイズ
|
||
img_resized = img.resize((width, height), Image.BICUBIC)
|
||
|
||
# BytesIOオブジェクトを作成して画像を保存
|
||
img_byte_arr = io.BytesIO()
|
||
img_resized.save(img_byte_arr, format='JPEG', quality=85, optimize=True)
|
||
img_byte_arr.seek(0)
|
||
|
||
|
||
logging.info('step-7')
|
||
# OpenPyXLのImageオブジェクトを作成
|
||
excel_image = XLImage(img_byte_arr)
|
||
|
||
# EMUユニットでのサイズを設定
|
||
excel_image.width = pixels_to_EMU(width)
|
||
excel_image.height = pixels_to_EMU(height)
|
||
|
||
# 正しいアンカー設定
|
||
from openpyxl.drawing.spreadsheet_drawing import OneCellAnchor, AnchorMarker
|
||
from openpyxl.drawing.xdr import XDRPositiveSize2D
|
||
|
||
logging.info('step-8')
|
||
marker = AnchorMarker(
|
||
col=cell.column - 1,
|
||
colOff=pixels_to_EMU(margin_horizontal),
|
||
row=cell.row - 1,
|
||
rowOff=pixels_to_EMU(margin_vertical)
|
||
)
|
||
|
||
# XDRPositiveSize2Dを使用して画像サイズを設定
|
||
size = XDRPositiveSize2D(
|
||
cx=pixels_to_EMU(width),
|
||
cy=pixels_to_EMU(height)
|
||
)
|
||
|
||
anchor = OneCellAnchor(_from=marker, ext=size)
|
||
excel_image.anchor = anchor
|
||
|
||
|
||
logging.info('step-9')
|
||
# 画像をワークシートに追加
|
||
worksheet.add_image(excel_image)
|
||
#cell.parent.add_image(excel_image)
|
||
|
||
logging.info('step-A')
|
||
# メモリ解放
|
||
#img_byte_arr.close()
|
||
|
||
return ""
|
||
|
||
except Exception as e:
|
||
logging.error(f"画像の処理に失敗: {str(e)}, image_path={image_path}")
|
||
import traceback
|
||
logging.error(traceback.format_exc())
|
||
return field_value
|
||
elif image_path and not os.path.exists(image_path):
|
||
logging.warning(f"画像ファイルが存在しません: {image_path}")
|
||
return "" #空欄にする
|
||
else:
|
||
return field_value
|
||
|
||
except Exception as e:
|
||
logging.error(f"形式変換の処理に失敗: {str(e)}")
|
||
import traceback
|
||
logging.error(traceback.format_exc())
|
||
return field_value
|
||
|
||
# その他の場合は文字列に変換
|
||
return str(field_value)
|
||
|
||
def verify_image_insertion(self, worksheet, cell, image):
|
||
"""画像の挿入を検証するためのヘルパーメソッド"""
|
||
try:
|
||
# 画像が実際にワークシートに追加されているか確認
|
||
images_in_sheet = worksheet._images
|
||
if not images_in_sheet:
|
||
logging.warning(f"No images found in worksheet at cell {cell.coordinate}")
|
||
return False
|
||
|
||
# 画像のアンカー位置を確認
|
||
last_image = images_in_sheet[-1]
|
||
anchor = last_image.anchor
|
||
if not anchor:
|
||
logging.warning(f"Image anchor not set properly at cell {cell.coordinate}")
|
||
return False
|
||
|
||
logging.info(f"Image successfully inserted at cell {cell.coordinate}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logging.error(f"Error verifying image insertion: {str(e)}")
|
||
return False
|
||
|
||
|
||
def proceed_one_record(self,table:str,where:str,group_range:str,variables: Dict[str, Any]):
|
||
"""1レコードのデータを取得してシートの値を置き換える
|
||
|
||
Args:
|
||
table: テーブル名
|
||
where: WHERE句
|
||
group_range: 処理対象範囲
|
||
variables: DB接続情報を含む変数辞書
|
||
"""
|
||
|
||
try:
|
||
print(f"make_report.proceed_one_record step-1:table={table},where={where},group_range={group_range}")
|
||
# まずself.template_sheetの指定範囲のセルをself.current_sheetにコピーする。
|
||
self.copy_template_to_current(group_range,group_range)
|
||
|
||
|
||
print(f"step-1")
|
||
cursor = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
|
||
|
||
# SQLクエリを実行
|
||
query = f"SELECT * FROM {table} WHERE {where} LIMIT 1"
|
||
cursor.execute(query)
|
||
record = cursor.fetchone()
|
||
|
||
print(f"query={query}")
|
||
print(f"record={record}")
|
||
if record:
|
||
# group_rangeの範囲内のセルを走査
|
||
for row in self.current_sheet:
|
||
for cell in row:
|
||
if cell.value and isinstance(cell.value, str):
|
||
# [field_name]形式の文字列を検索
|
||
import re
|
||
matches = re.findall(r'\[(.*?)\]', cell.value)
|
||
|
||
# マッチした場合、フィールド値で置換
|
||
if matches:
|
||
new_value = cell.value
|
||
for field_name in matches:
|
||
if field_name in record:
|
||
# 新しい形式変換関数を使用
|
||
formatted_value = self.format_cell_value(record[field_name], cell)
|
||
new_value = new_value.replace(
|
||
f'[{field_name}]',
|
||
formatted_value
|
||
)
|
||
cell.value = new_value
|
||
|
||
cursor.close()
|
||
return {"status": True, "message": f"Success generating group: "}
|
||
|
||
except Exception as e:
|
||
logging.error(f"Error in proceed_one_record: {str(e)}")
|
||
return {"status": False, "message": f"Exception in proceed_one_record:Error generating report: {str(e)}"}
|
||
|
||
def get_column_letter(self, cell_reference):
|
||
"""
|
||
セル参照(例:'A12')から列文字を取得
|
||
"""
|
||
if isinstance(cell_reference, str):
|
||
# アルファベット部分を抽出
|
||
column = ''.join(c for c in cell_reference if c.isalpha())
|
||
if column:
|
||
return column
|
||
return 'A' # デフォルト値
|
||
|
||
def get_row_number(self,cell_reference):
|
||
"""
|
||
セル参照(例:'A12')から行番号を取得
|
||
"""
|
||
if isinstance(cell_reference, str):
|
||
# 数字部分のみを抽出
|
||
digits = ''.join(c for c in cell_reference if c.isdigit())
|
||
if digits:
|
||
return int(digits)
|
||
return int(cell_reference)
|
||
|
||
|
||
def proceed_all_records(self, table: str, where: str, sort: str, group_range: str, variables: Dict[str, Any]):
|
||
"""複数レコードを取得してシートの値を置き換える
|
||
|
||
Args:
|
||
table: テーブル名
|
||
where: WHERE句
|
||
sort: ORDER BY句
|
||
group_range: 処理対象範囲
|
||
variables: DB接続情報を含む変数辞書
|
||
"""
|
||
print(f"make_report.proceed_all_record step-1:table={table},where={where},group_range={group_range}")
|
||
|
||
try:
|
||
# グループ範囲の行数を取得(セル参照対応)
|
||
if not group_range or ':' not in group_range:
|
||
raise ValueError(f"Invalid group_range format: {group_range}")
|
||
|
||
# グループ範囲の行数を取得
|
||
range_parts = group_range.split(':')
|
||
|
||
logging.info(f"Processing range_parts: {range_parts}") # デバッグ用ログ
|
||
|
||
start_row = self.get_row_number(range_parts[0].strip())
|
||
start_col = self.get_column_letter(range_parts[0].strip())
|
||
|
||
end_row = self.get_row_number(range_parts[1].strip())
|
||
end_col = self.get_column_letter(range_parts[1].strip())
|
||
|
||
if start_row > end_row:
|
||
raise ValueError(f"Invalid row range: start_row ({start_row}) > end_row ({end_row})")
|
||
|
||
template_rows = end_row - start_row + 1
|
||
|
||
cursor = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
|
||
|
||
# SQLクエリを実行
|
||
query = f"SELECT * FROM {table} WHERE {where} ORDER BY {sort}"
|
||
cursor.execute(query)
|
||
records = cursor.fetchall()
|
||
print(f"query={query}, records={len(records)}")
|
||
|
||
current_row = start_row
|
||
for record in records:
|
||
# テンプレート範囲をコピー
|
||
self.copy_template_to_current(f"{start_col}{start_row}:{end_col}{end_row}",
|
||
f"{start_col}{current_row}:{end_col}{current_row + template_rows - 1}"
|
||
)
|
||
|
||
# コピーした範囲内のセルを走査して値を置換
|
||
for row in range(current_row, current_row + template_rows):
|
||
for cell in self.current_sheet[row]:
|
||
if cell.value and isinstance(cell.value, str):
|
||
# [field_name]形式の文字列を検索
|
||
import re
|
||
matches = re.findall(r'\[(.*?)\]', cell.value)
|
||
|
||
# マッチした場合、フィールド値で置換
|
||
if matches:
|
||
new_value = cell.value
|
||
for field_name in matches:
|
||
if field_name in record:
|
||
# 新しい形式変換関数を使用
|
||
formatted_value = self.format_cell_value(record[field_name], cell)
|
||
new_value = new_value.replace(
|
||
f'[{field_name}]',
|
||
formatted_value
|
||
)
|
||
cell.value = new_value
|
||
|
||
current_row += template_rows
|
||
|
||
cursor.close()
|
||
self.conn.close()
|
||
return {"status": True, "message": "Success processing all records"}
|
||
|
||
except Exception as e:
|
||
logging.error(f"Error in proceed_all_record: {str(e)}")
|
||
return {"status": False, "message": f"Exception in proceed_all_record:Error processing records: {str(e)}"}
|
||
|
||
|
||
def set_column_width_from_config(self, worksheet):
|
||
"""
|
||
INIファイルの設定に基づいて列幅を設定する
|
||
|
||
Parameters:
|
||
worksheet: 設定対象のワークシート
|
||
|
||
Returns:
|
||
dict: 処理結果を示す辞書
|
||
"""
|
||
try:
|
||
section_config = self.conf.get_section('basic')
|
||
|
||
# basic セクションから column_width を読み取る
|
||
if not section_config.get('column_width'):
|
||
raise ValueError("column_width setting not found in [basic] section")
|
||
|
||
# カンマ区切りの文字列を数値のリストに変換
|
||
width_str = section_config.get('column_width')
|
||
widths = [float(w.strip()) for w in width_str.split(',')]
|
||
|
||
# 各列に幅を設定
|
||
for col_index, width in enumerate(widths, start=1):
|
||
col_letter = get_column_letter(col_index)
|
||
worksheet.column_dimensions[col_letter].width = width
|
||
logging.info(f"Set column {col_letter} width to {width}")
|
||
|
||
return {
|
||
"status": True,
|
||
"message": "Column widths set successfully from config",
|
||
"details": {
|
||
"num_columns": len(widths),
|
||
"widths": widths
|
||
}
|
||
}
|
||
|
||
except ValueError as ve:
|
||
logging.error(f"Invalid column width value in config: {str(ve)}")
|
||
return {
|
||
"status": False,
|
||
"message": f"Invalid column width configuration: {str(ve)}"
|
||
}
|
||
except Exception as e:
|
||
logging.error(f"Error setting column widths: {str(e)}")
|
||
return {
|
||
"status": False,
|
||
"message": f"Error setting column widths: {str(e)}"
|
||
}
|
||
|
||
def verify_column_widths(self, worksheet, expected_widths=None):
|
||
"""
|
||
列幅が正しく設定されているか検証する
|
||
|
||
Parameters:
|
||
worksheet: 検証対象のワークシート
|
||
expected_widths: 期待される幅のリスト(指定がない場合はINIファイルから読み取り)
|
||
|
||
Returns:
|
||
list: 検証結果のリスト
|
||
"""
|
||
if expected_widths is None:
|
||
width_str = self.config.get('basic', 'column_width')
|
||
expected_widths = [float(w.strip()) for w in width_str.split(',')]
|
||
|
||
verification_results = []
|
||
for col_index, expected_width in enumerate(expected_widths, start=1):
|
||
col_letter = get_column_letter(col_index)
|
||
actual_width = worksheet.column_dimensions[col_letter].width
|
||
|
||
matches = abs(float(actual_width) - float(expected_width)) < 0.01
|
||
verification_results.append({
|
||
'column': col_letter,
|
||
'expected_width': expected_width,
|
||
'actual_width': actual_width,
|
||
'matches': matches
|
||
})
|
||
|
||
if not matches:
|
||
logging.warning(
|
||
f"Column {col_letter} width mismatch: "
|
||
f"expected={expected_width}, actual={actual_width}"
|
||
)
|
||
|
||
return verification_results
|
||
|
||
def copy_template_range(self, orig_min_row, orig_min_col, orig_max_row, orig_max_col, target_min_row):
|
||
"""
|
||
テンプレートの範囲をコピーし、マージセルの罫線を正しく設定する
|
||
"""
|
||
try:
|
||
# マージセルの情報を保存
|
||
merged_ranges = []
|
||
|
||
# マージセルをコピー
|
||
for merged_range in self.template_sheet.merged_cells:
|
||
min_col, min_row, max_col, max_row = range_boundaries(str(merged_range))
|
||
|
||
# コピー範囲と重なるマージセルをチェック
|
||
if (min_col >= orig_min_col and max_col <= orig_max_col and
|
||
min_row >= orig_min_row and max_row <= orig_max_row):
|
||
# ターゲットのマージ範囲を計算
|
||
target_merge_min_row = target_min_row + (min_row - orig_min_row)
|
||
target_merge_max_row = target_min_row + (max_row - orig_min_row)
|
||
target_merge_range = f"{get_column_letter(min_col)}{target_merge_min_row}:" \
|
||
f"{get_column_letter(max_col)}{target_merge_max_row}"
|
||
|
||
# マージ情報を保存
|
||
merged_ranges.append({
|
||
'range': target_merge_range,
|
||
'min_col': min_col,
|
||
'max_col': max_col,
|
||
'min_row': target_merge_min_row,
|
||
'max_row': target_merge_max_row,
|
||
'source_cell': self.template_sheet.cell(row=min_row, column=min_col)
|
||
})
|
||
|
||
# セルをマージ
|
||
self.current_sheet.merge_cells(target_merge_range)
|
||
|
||
# セルの内容とスタイルをコピー
|
||
row_offset = target_min_row - orig_min_row
|
||
for row in range(orig_min_row, orig_max_row + 1):
|
||
for col in range(orig_min_col, orig_max_col + 1):
|
||
source_cell = self.template_sheet.cell(row=row, column=col)
|
||
target_cell = self.current_sheet.cell(row=row+row_offset, column=col)
|
||
|
||
# 値とスタイルをコピー
|
||
if source_cell.value is not None:
|
||
target_cell.value = source_cell.value
|
||
|
||
if source_cell.has_style:
|
||
target_cell.font = copy(source_cell.font)
|
||
target_cell.fill = copy(source_cell.fill)
|
||
target_cell.number_format = source_cell.number_format
|
||
target_cell.protection = copy(source_cell.protection)
|
||
target_cell.alignment = copy(source_cell.alignment)
|
||
|
||
# マージセルの罫線を設定
|
||
for merge_info in merged_ranges:
|
||
source_cell = merge_info['source_cell']
|
||
source_border = source_cell.border
|
||
|
||
# マージ範囲内の各セルに罫線を設定
|
||
for row in range(merge_info['min_row'], merge_info['max_row'] + 1):
|
||
for col in range(merge_info['min_col'], merge_info['max_col'] + 1):
|
||
target_cell = self.current_sheet.cell(row=row, column=col)
|
||
|
||
# 新しい罫線スタイルを作成
|
||
new_border = copy(source_border)
|
||
|
||
# 範囲の端のセルの場合のみ、対応する辺の罫線を設定
|
||
if col == merge_info['min_col']: # 左端
|
||
new_border.left = copy(source_border.left)
|
||
if col == merge_info['max_col']: # 右端
|
||
new_border.right = copy(source_border.right)
|
||
if row == merge_info['min_row']: # 上端
|
||
new_border.top = copy(source_border.top)
|
||
if row == merge_info['max_row']: # 下端
|
||
new_border.bottom = copy(source_border.bottom)
|
||
|
||
target_cell.border = new_border
|
||
|
||
return {"status": True, "message": "Range copied successfully with merged cell borders"}
|
||
|
||
except Exception as e:
|
||
logging.error(f"Error copying template range: {str(e)}")
|
||
return {"status": False, "message": f"Error copying template range: {str(e)}"}
|
||
|
||
def verify_merged_cell_borders(self, worksheet, merge_range):
|
||
"""
|
||
マージセルの罫線が正しく設定されているか検証する
|
||
"""
|
||
try:
|
||
min_col, min_row, max_col, max_row = range_boundaries(merge_range)
|
||
|
||
border_status = {
|
||
'top': [],
|
||
'bottom': [],
|
||
'left': [],
|
||
'right': []
|
||
}
|
||
|
||
# 各辺の罫線をチェック
|
||
for row in range(min_row, max_row + 1):
|
||
for col in range(min_col, max_col + 1):
|
||
cell = worksheet.cell(row=row, column=col)
|
||
|
||
if col == min_col: # 左辺
|
||
border_status['left'].append(cell.border.left)
|
||
if col == max_col: # 右辺
|
||
border_status['right'].append(cell.border.right)
|
||
if row == min_row: # 上辺
|
||
border_status['top'].append(cell.border.top)
|
||
if row == max_row: # 下辺
|
||
border_status['bottom'].append(cell.border.bottom)
|
||
|
||
return border_status
|
||
|
||
except Exception as e:
|
||
logging.error(f"Error verifying merged cell borders: {str(e)}")
|
||
return None
|
||
|
||
|
||
|
||
def copy_template_to_current(self, orig_range, target_range):
|
||
try:
|
||
#print(f"copy_template_to_current : orig_range={orig_range},target_range={target_range}")
|
||
|
||
# 範囲をパースする
|
||
orig_min_col, orig_min_row, orig_max_col, orig_max_row = range_boundaries(orig_range)
|
||
target_min_col, target_min_row, target_max_col, target_max_row = range_boundaries(target_range)
|
||
#print(f"min_col, min_row, max_col, max_row = {orig_min_col}, {orig_min_row}, {orig_max_col}, {orig_max_row}")
|
||
|
||
# Get template sheet name from ini file
|
||
section_config = self.conf.get_section(self.section_list[0]) # 現在の処理中のセクション
|
||
template_sheet_name = section_config.get("template_sheet")
|
||
new_sheet_name = section_config.get("sheet_name",template_sheet_name)
|
||
|
||
# Copy row heights
|
||
for row in range(orig_min_row, orig_max_row + 1):
|
||
target_row = row - orig_min_row + target_min_row
|
||
if row in self.template_sheet.row_dimensions:
|
||
source_height = self.template_sheet.row_dimensions[row].height
|
||
if source_height is not None:
|
||
if target_row not in self.current_sheet.row_dimensions:
|
||
self.current_sheet.row_dimensions[target_row] = openpyxl.worksheet.dimensions.RowDimension(target_row)
|
||
self.current_sheet.row_dimensions[target_row].height = source_height
|
||
#print(f"Row(target_row).height = {source_height}")
|
||
|
||
# Copy merged cells
|
||
self.copy_template_range(orig_min_row, orig_min_col, orig_max_row, orig_max_col, target_min_row)
|
||
|
||
#for merged_range in self.template_sheet.merged_cells:
|
||
# min_col, min_row, max_col, max_row = range_boundaries(str(merged_range))
|
||
# #print(f"Merge cell: min_col, min_row, max_col, max_row = {min_col}, {min_row}, {max_col}, {max_row}")
|
||
# # Check if merge range intersects with our copy range
|
||
# if (min_col >= orig_min_col and max_col <= orig_max_col and
|
||
# min_row >= orig_min_row and max_row <= orig_max_row):
|
||
# # Calculate target merge range
|
||
# target_merge_min_row = target_min_row + (min_row - orig_min_row)
|
||
# target_merge_max_row = target_min_row + (max_row - orig_min_row)
|
||
# target_merge_range = f"{get_column_letter(min_col)}{target_merge_min_row}:" \
|
||
# f"{get_column_letter(max_col)}{target_merge_max_row}"
|
||
# self.current_sheet.merge_cells(target_merge_range)
|
||
# #print(f"Merge : {target_merge_range}")
|
||
|
||
# Copy cell contents and styles
|
||
row_offset = target_min_row - orig_min_row
|
||
for row in range(orig_min_row, orig_max_row + 1):
|
||
for col in range(orig_min_col, orig_max_col + 1):
|
||
source_cell = self.template_sheet.cell(row=row, column=col)
|
||
target_cell = self.current_sheet.cell(row=row+row_offset, column=col)
|
||
|
||
if source_cell.value:
|
||
# Copy value
|
||
target_cell.value = source_cell.value
|
||
#print(f"({col},{row}) : {target_cell.value}")
|
||
# Copy styles
|
||
if source_cell.has_style:
|
||
target_cell.font = copy(source_cell.font)
|
||
target_cell.border = copy(source_cell.border)
|
||
target_cell.fill = copy(source_cell.fill)
|
||
target_cell.number_format = source_cell.number_format
|
||
target_cell.protection = copy(source_cell.protection)
|
||
target_cell.alignment = copy(source_cell.alignment)
|
||
|
||
# Copy page setup
|
||
target_page_setup = self.current_sheet.page_setup
|
||
source_page_setup = self.template_sheet.page_setup
|
||
|
||
# Copy supported page setup attributes
|
||
copyable_attrs = [
|
||
'paperSize',
|
||
'orientation',
|
||
'fitToHeight',
|
||
'fitToWidth',
|
||
'scale'
|
||
]
|
||
|
||
for attr in copyable_attrs:
|
||
try:
|
||
if hasattr(source_page_setup, attr):
|
||
setattr(target_page_setup, attr, getattr(source_page_setup, attr))
|
||
except Exception as e:
|
||
logging.warning(f"Could not copy page setup attribute {attr}: {str(e)}")
|
||
|
||
|
||
# Copy margins
|
||
target_margins = self.current_sheet.page_margins
|
||
source_margins = self.template_sheet.page_margins
|
||
|
||
margin_attrs = ['left', 'right', 'top', 'bottom', 'header', 'footer']
|
||
for attr in margin_attrs:
|
||
try:
|
||
if hasattr(source_margins, attr):
|
||
setattr(target_margins, attr, getattr(source_margins, attr))
|
||
except Exception as e:
|
||
logging.warning(f"Could not copy margin attribute {attr}: {str(e)}")
|
||
|
||
# Copy print options
|
||
target_print = self.current_sheet.print_options
|
||
source_print = self.template_sheet.print_options
|
||
|
||
print_attrs = [
|
||
'horizontalCentered',
|
||
'verticalCentered',
|
||
'gridLines',
|
||
'gridLinesSet'
|
||
]
|
||
|
||
for attr in print_attrs:
|
||
try:
|
||
if hasattr(source_print, attr):
|
||
setattr(target_print, attr, getattr(source_print, attr))
|
||
except Exception as e:
|
||
logging.warning(f"Could not copy print option {attr}: {str(e)}")
|
||
|
||
return {"status": True, "message": "Successfully copied template range"}
|
||
|
||
except Exception as e:
|
||
logging.error(f"Error in copy_template_to_current: {str(e)}")
|
||
return {"status": False, "message": f"Exception in copy_template_to_current: {str(e)}"}
|
||
|
||
# Style operations
|
||
def apply_style(
|
||
self,
|
||
cell_range: str,
|
||
font: Dict[str, Any] = None,
|
||
fill: Dict[str, Any] = None,
|
||
border: Dict[str, Any] = None,
|
||
alignment: Dict[str, Any] = None
|
||
) -> None:
|
||
"""Apply styles to cell range"""
|
||
for row in self.current_sheet[cell_range]:
|
||
for cell in row:
|
||
if font:
|
||
cell.font = self._style_manager.create_font(**font)
|
||
if fill:
|
||
cell.fill = self._style_manager.create_fill(**fill)
|
||
if border:
|
||
cell.border = self._style_manager.create_border(**border)
|
||
if alignment:
|
||
cell.alignment = self._style_manager.create_alignment(**alignment)
|
||
|
||
# Merge operations
|
||
def merge_range(
|
||
self,
|
||
start_row: int,
|
||
start_col: int,
|
||
end_row: int,
|
||
end_col: int
|
||
) -> None:
|
||
"""Merge cell range"""
|
||
self._merge_manager.merge_cells(start_row, start_col, end_row, end_col)
|
||
|
||
# Image operations
|
||
def add_image(
|
||
self,
|
||
image_path: Union[str, Path],
|
||
position: Tuple[int, int],
|
||
size: Optional[Tuple[int, int]] = None
|
||
) -> None:
|
||
"""Add image to worksheet"""
|
||
self._image_manager.add_image(image_path, position, size)
|
||
|
||
# Conditional formatting
|
||
def add_conditional_format(
|
||
self,
|
||
cell_range: str,
|
||
format_type: str,
|
||
**kwargs
|
||
) -> None:
|
||
"""Add conditional formatting"""
|
||
if format_type == 'color_scale':
|
||
self._conditional_manager.add_color_scale(cell_range, **kwargs)
|
||
elif format_type == 'data_bar':
|
||
self._conditional_manager.add_data_bar(cell_range, **kwargs)
|
||
elif format_type == 'icon_set':
|
||
self._conditional_manager.add_icon_set(cell_range, **kwargs)
|
||
elif format_type == 'custom':
|
||
self._conditional_manager.add_custom_rule(cell_range, **kwargs)
|
||
|
||
# Page setup
|
||
def setup_page(
|
||
self,
|
||
orientation: str = 'portrait',
|
||
paper_size: int = PaperSizes.A4,
|
||
margins: Dict[str, float] = None,
|
||
header_footer: Dict[str, Any] = None
|
||
) -> None:
|
||
"""Configure page setup"""
|
||
self._page_manager.set_page_setup(
|
||
orientation=orientation,
|
||
paper_size=paper_size
|
||
)
|
||
|
||
if margins:
|
||
self._page_manager.set_margins(**margins)
|
||
|
||
if header_footer:
|
||
self._page_manager.set_header_footer(**header_footer)
|
||
|
||
def cleanup(self) -> None:
|
||
"""Cleanup temporary files"""
|
||
if self._image_manager:
|
||
self._image_manager.cleanup()
|
||
|
||
def __del__(self):
|
||
"""Destructor"""
|
||
self.cleanup()
|