import pathlib
import time
from loguru import logger
import pandas as pd
from ebm.cmd.run_calculation import (calculate_building_category_area_forecast,
calculate_building_category_energy_requirements,
calculate_heating_systems)
from ebm.model import bema
from ebm.model.calibrate_heating_systems import group_heating_systems_by_energy_carrier
from ebm.model.building_category import BuildingCategory
from ebm.model.data_classes import YearRange
from ebm.model.database_manager import DatabaseManager
from ebm.services.spreadsheet import detect_format_from_values, find_max_column_width
[docs]
def write_result(output_file, csv_delimiter, output, sheet_name='area forecast'):
logger.debug(f'Writing to {output_file}')
write_start = time.time()
if str(output_file) == '-':
try:
print(output.to_markdown())
except ImportError:
print(output.to_string())
elif output_file.suffix == '.csv':
output.to_csv(output_file, sep=csv_delimiter)
logger.success('Wrote {filename}', filename=output_file)
else:
excel_writer = pd.ExcelWriter(output_file, engine='openpyxl')
output.to_excel(excel_writer, sheet_name=sheet_name, merge_cells=False, freeze_panes=(1, 3))
excel_writer.close()
logger.success('Wrote {filename}', filename=output_file)
logger.debug(f' wrote {output_file.stat().st_size/1000:.0} in {time.time() - write_start:.4} seconds')
[docs]
def append_result(output_file: pathlib.Path, df: pd.DataFrame, sheet_name='Sheet 1'):
"""
Write df to output_file using sheet_name. If output_file already exists the sheet will be added tp ouput_file rather
than replacing the entire content.
Parameters
----------
output_file :
df :
sheet_name :
Returns
-------
"""
more_options = {'mode': 'w'}
if output_file.is_file():
more_options = {'if_sheet_exists': 'replace', 'mode': 'a'}
with pd.ExcelWriter(output_file, engine='openpyxl', **more_options) as writer:
df.to_excel(writer, sheet_name=sheet_name, index=False)
sheet = writer.sheets[sheet_name]
columns = sheet.iter_cols(min_row=1, max_row=sheet.max_row, min_col=1, max_col=sheet.max_column)
logger.debug('Formatting cell')
for col, (col_name, col_values) in zip(columns, df.items()):
cell_format = detect_format_from_values(col_name, col_values, df)
if cell_format:
for row in col[1:]:
row.number_format = cell_format
logger.debug('Adjust columns width')
for col in sheet.iter_cols(min_col=1):
adjusted_width = find_max_column_width(col)
sheet.column_dimensions[col[0].column_letter].width = adjusted_width
logger.debug(f'Closing {output_file} {sheet_name}')
logger.debug(f'Wrote {output_file} {sheet_name}')
[docs]
class EbmDefaultHandler:
[docs]
def extract_model(self,
year_range: YearRange,
building_categories: list[BuildingCategory] | None,
database_manager: DatabaseManager,
step_choice: str='energy-use') -> pd.DataFrame:
"""
Extract dataframe for a certain step in the ebm model.
Possible steps are energy_use (default), heating-systems, energy-use, area-forecast
Parameters
----------
year_range : ebm.model.dataclasses.YearRange
building_categories : list[BuildingCategory]
database_manager : ebm.model.database_manager.DatabaseManager
step_choice : str, optional
Returns
-------
pd.DataFrame
"""
b_c = building_categories if building_categories else [e for e in BuildingCategory]
area_forecast = self.extract_area_forecast(b_c,
database_manager,
period=year_range)
area_forecast = area_forecast.set_index(['building_category', 'building_code', 'building_condition', 'year'])
df = area_forecast
if 'energy-requirements' in step_choice or 'heating-systems' in step_choice or 'energy-use' in step_choice:
logger.debug('Extracting area energy requirements')
energy_requirements_result = self.extract_energy_requirements(b_c,
database_manager,
area_forecast,
period=year_range)
df = energy_requirements_result
if 'heating-systems' in step_choice or 'energy-use' in step_choice:
logger.debug('Extracting heating systems')
df = calculate_heating_systems(energy_requirements=energy_requirements_result,
database_manager=database_manager)
return df
# noinspection PyTypeChecker
[docs]
@staticmethod
def extract_energy_requirements(building_categories,
database_manager: DatabaseManager,
area_forecast: pd.DataFrame,
period: YearRange) -> pd.DataFrame:
"""
Extracts energy needs for building_categories and period
Parameters
----------
building_categories : list[BuildingCategory]
database_manager : ebm.model.database_manager.DatabaseManager
area_forecast : pd.DataFrame
period :ebm.model.dataclasses.YearRange
Returns
-------
pd.DataFrame
"""
energy_requirements_result = calculate_building_category_energy_requirements(
building_category=building_categories,
area_forecast=area_forecast,
database_manager=database_manager,
start_year=period.start,
end_year=period.end)
return energy_requirements_result
[docs]
@staticmethod
def extract_area_forecast(building_categories,
database_manager: DatabaseManager,
period: YearRange) -> pd.DataFrame:
area_forecast = calculate_building_category_area_forecast(
database_manager=database_manager,
start_year=period.start,
end_year=period.end)
return area_forecast
[docs]
@staticmethod
def write_tqdm_result(output_file: pathlib.Path, output: pd.DataFrame, csv_delimiter: str=',', reset_index=True):
try:
from tqdm import tqdm
except ImportError:
# When tqdm is not installed we use write_result instead
write_result(output_file, csv_delimiter, output)
return
logger.debug(f'Writing to {output_file}')
if str(output_file) == '-':
try:
print(output.to_markdown())
except ImportError:
print(output.to_string())
return
if reset_index:
logger.debug('Resetting dataframe index')
output = output.reset_index()
chunk_size = 2000
logger.debug(f'{chunk_size=}')
closing_file = time.time()
with tqdm(total=len(output), desc="Writing to spreadsheet") as pbar:
write_file = time.time()
if output_file.suffix == '.csv':
for i in range(0, len(output), chunk_size): # Adjust the chunk size as needed
building_category = output.iloc[i].building_category
pbar.update(chunk_size)
output.iloc[i:i + chunk_size].to_csv(output_file, mode='a', header=(i == 0), index=False,
sep=csv_delimiter)
pbar.display(f'Writing {building_category}')
closing_file = time.time()
pbar.display(f'Wrote {output_file}')
else:
with pd.ExcelWriter(output_file, engine='xlsxwriter') as excel_writer:
for i in range(0, len(output), chunk_size): # Adjust the chunk size as needed
building_category = output.iloc[i].name[0] if 'building_category' not in output.columns else \
output.building_category.iloc[i]
pbar.set_description(f'Writing {building_category}')
start_row = 0 if i == 0 else i + 1
page_start = i
page_end = min(i + chunk_size, len(output))
logger.trace(f'{start_row=} {page_start=} {page_end=}')
output.iloc[page_start:page_end].to_excel(excel_writer, startrow=start_row, header=(i == 0),
merge_cells=True, index=not reset_index)
pbar.update(chunk_size)
pbar.set_description(f'Closing {output_file}')
closing_file = time.time()
logger.success('Wrote {filename}', filename=output_file)
logger.debug(f' wrote dataframe in { closing_file - write_file:.4} seconds')
logger.debug(f' closed file in {time.time() - closing_file:.4} seconds')
logger.debug(f' wrote {int(output_file.stat().st_size/1_000_000):_d} MB in {time.time() - write_file:.4} seconds')