Source code for ebm.heating_system_forecast

# noinspection SpellCheckingInspection
import pandas as pd
from loguru import logger

from ebm.model.building_category import NON_RESIDENTIAL, RESIDENTIAL, BuildingCategory
from ebm.model.data_classes import YearRange
from ebm.model.database_manager import DatabaseManager
from ebm.model.heating_systems import HeatingSystems

BUILDING_CATEGORY = 'building_category'
BUILDING_CODE = 'building_code'
HEATING_SYSTEMS = 'heating_systems'
NEW_HEATING_SYSTEMS = 'new_heating_systems'
YEAR = 'year'
TEK_SHARES = 'heating_system_share'


[docs] class HeatingSystemsForecast: # noqa: D101
[docs] def __init__(self, shares_start_year: pd.DataFrame, efficiencies: pd.DataFrame, forecast: pd.DataFrame, building_code_list: list[str], period: YearRange): """Init HeatingSystemsForecast.""" self.shares_start_year = shares_start_year self.efficiencies = efficiencies self.forecast = forecast self.building_code_list = building_code_list self.period = period self._validate_years() check_sum_of_shares(shares_start_year)
def _validate_years(self) -> None: """ Ensure that the years in the dataframes provided during initialization align with the specified period. This method performs the following validations: 1. Confirms that `shares_start_year` has exactly one unique start year. 2. Checks that the minimum year in `projection` for each combination of `BUILDING_CATEGORY` and `TEK` matches the expected start year + 1. 3. Verifies that all years in the given `period` are present in the `projection` dataframe for unique combinations of `BUILDING_CATEGORY` and `TEK`. Raises ------ ValueError If any of the above validations fail. """ start_year = self.shares_start_year[YEAR].unique() if len(start_year) != 1: raise ValueError("More than one start year in dataframe.") start_year = start_year[0] if start_year != self.period.start: raise ValueError("Start year in dataframe doesn't match start year for given period.") projection = self.forecast.melt(id_vars = [BUILDING_CATEGORY, BUILDING_CODE, HEATING_SYSTEMS, NEW_HEATING_SYSTEMS], var_name = YEAR, value_name = "Andel_utskiftning") projection[YEAR] = projection[YEAR].astype(int) min_df = projection.groupby([BUILDING_CATEGORY, BUILDING_CODE]).agg(min_year=(YEAR, 'min')).reset_index() min_mismatch = min_df[min_df['min_year'] != (start_year + 1)] if not min_mismatch.empty: raise ValueError("Years don't match between dataframes.") projection_period = self.period.subset(1).range() def check_years(group: pd.Series): # noqa: ANN202 return set(projection_period).issubset(group[YEAR]) period_match = projection.groupby(by=[BUILDING_CATEGORY, BUILDING_CODE]).apply(check_years).reset_index() if not period_match[period_match[0] == False].empty: # noqa: E712 raise ValueError("Years in dataframe not present in given period.")
[docs] def calculate_forecast(self) -> pd.DataFrame: """ Project heating system shares across model years. Returns ------- pd.Dataframe TEK shares for heating systems per year, along with different load shares and efficiencies. Raises ------ ValueError If sum of shares for a building_code is not equal to 1. """ shares_all_heating_systems = add_missing_heating_systems(self.shares_start_year, HeatingSystems, self.period.start) projected_shares = expand_building_category_building_code(self.forecast, self.building_code_list) new_shares = project_heating_systems(shares_all_heating_systems, projected_shares, self.period) heating_systems_projection = add_existing_heating_system_shares_to_projection(new_shares, self.shares_start_year, self.period) check_sum_of_shares(heating_systems_projection) heating_systems_projection = add_load_shares_and_efficiencies(heating_systems_projection, self.efficiencies) return heating_systems_projection
[docs] @staticmethod def new_instance(period: YearRange, database_manager: DatabaseManager = None) -> 'HeatingSystemsForecast': """ Create a new instance of the HeatingSystemsProjection class, using the specified YearRange Period and an optional database manager. If a database manager is not provided, a new DatabaseManager instance will be created. Parameters ---------- period: YearRange period of forecast database_manager: DatabaseManager a database manager for heating system forecast Returns ------- HeatingSystemsForecast A new instance of HeatingSystemsProjection initialized with data from the specified database manager. """ dm = database_manager if isinstance(database_manager, DatabaseManager) else DatabaseManager() shares_start_year = dm.get_heating_systems_shares_start_year() efficiencies = dm.get_heating_system_efficiencies() projection = dm.get_heating_system_forecast() building_code_list = dm.get_building_code_list() return HeatingSystemsForecast(shares_start_year=shares_start_year, efficiencies=efficiencies, forecast=projection, building_code_list=building_code_list, period=period)
[docs] @staticmethod def pad_projection(hf: pd.DataFrame, years_to_pad: YearRange) -> pd.DataFrame: """ Left pad dataframe hf with years in years_to_pad. The padding will be equal to existing first year of hf. Parameters ---------- hf : pd.DataFrame heating systems to pad years_to_pad : YearRange range of years to pad unto hf Returns ------- pd.DataFrame hf with left padding """ padding_value = hf[hf.year == years_to_pad.end + 1].copy() left_padding = [] for year in years_to_pad: year_values = padding_value.copy() year_values['year'] = year left_padding.append(year_values) return pd.concat(left_padding + [hf]) # noqa: RUF005
[docs] def add_missing_heating_systems(heating_systems_shares: pd.DataFrame, heating_systems: HeatingSystems = None, start_year: int|None = None) -> pd.DataFrame: """Add missing HeatingSystems per BuildingCategory and building_codewith a default TEK_share of 0.""" df_aggregert_0 = heating_systems_shares.copy() input_start_year = df_aggregert_0[YEAR].unique() if len(input_start_year) != 1: raise ValueError("More than one start year in dataframe") # TODO: drop start year as input param and only use year in dataframe? if not start_year: start_year = input_start_year[0] elif start_year != input_start_year: raise ValueError("Given start_year doesn't match year in dataframe.") if not heating_systems: heating_systems = HeatingSystems oppvarmingstyper = pd.DataFrame( {HEATING_SYSTEMS: [hs for hs in heating_systems]}, ) df_aggregert_0_kombinasjoner = df_aggregert_0[[BUILDING_CATEGORY, BUILDING_CODE]].drop_duplicates() df_aggregert_0_alle_oppvarmingstyper = df_aggregert_0_kombinasjoner.merge(oppvarmingstyper, how = 'cross') df_aggregert_merged = df_aggregert_0_alle_oppvarmingstyper.merge(df_aggregert_0, on = [BUILDING_CATEGORY, BUILDING_CODE, HEATING_SYSTEMS], how = 'left') #TODO: Kan droppe kopi av df og heller ta fillna() for de to kolonnene manglende_rader = df_aggregert_merged[df_aggregert_merged[TEK_SHARES].isna()].copy() manglende_rader[YEAR] = start_year manglende_rader[TEK_SHARES] = 0 manglende_rader = manglende_rader[[BUILDING_CATEGORY, BUILDING_CODE, HEATING_SYSTEMS, YEAR, TEK_SHARES]] df_aggregert_alle_kombinasjoner = pd.concat([df_aggregert_0, manglende_rader]) return df_aggregert_alle_kombinasjoner
[docs] def add_load_shares_and_efficiencies(df: pd.DataFrame, heating_systems_efficiencies: pd.DataFrame) -> pd.DataFrame: """ Add load share and efficiency data to heating system share records by merging with efficiency reference data. Parameters ---------- df : pandas.DataFrame DataFrame containing heating system shares, including columns for year and heating system type. heating_systems_efficiencies : pandas.DataFrame DataFrame containing efficiency and load share data for each heating system type. Returns ------- pandas.DataFrame Merged DataFrame with heating system shares enriched with efficiency and load share information. Notes ----- - The merge is performed on the HEATING_SYSTEMS column using a left join. - The YEAR column is cast to integer to ensure consistent data types. """ df_hoved_spiss_og_ekstralast = heating_systems_efficiencies.copy() df_oppvarmingsteknologier_andeler = df.merge(df_hoved_spiss_og_ekstralast, on = [HEATING_SYSTEMS], how ='left') df_oppvarmingsteknologier_andeler[YEAR] = df_oppvarmingsteknologier_andeler[YEAR].astype(int) return df_oppvarmingsteknologier_andeler
[docs] def aggregere_lik_oppvarming_fjern_0(df:pd.DataFrame) -> pd.DataFrame: """ Aggregate heating system shares by summing TEK_share values, excluding entries with zero share. Parameters ---------- df : pandas.DataFrame Input DataFrame containing heating system share data, including TEK_share values. Returns ------- pandas.DataFrame Aggregated DataFrame grouped by building category, building code, heating system, and year, with summed TEK_share values excluding zero-share entries. Notes ----- - Rows where TEK_share is zero are removed before aggregation. - The resulting DataFrame is grouped by [BUILDING_CATEGORY, BUILDING_CODE, HEATING_SYSTEMS, YEAR]. """ df_fjern_null = df.query(f"{TEK_SHARES} != 0").copy() df_aggregert = df_fjern_null.groupby([BUILDING_CATEGORY, BUILDING_CODE, HEATING_SYSTEMS, YEAR], as_index = False)[TEK_SHARES].sum() return df_aggregert
[docs] def expand_building_category_building_code(projection: pd.DataFrame, building_code_list: list[str]) -> pd.DataFrame: """Add necessary building categories and building_code to the heating_systems_forecast dataframe.""" score = '_score' original_building_category = '_original_bc' original_building_code = '_original_building_code' projection[original_building_category] = projection['building_category'] projection[original_building_code] = projection['building_code'] alle_bygningskategorier = '+'.join(BuildingCategory) alle_building_code = '+'.join(tek for tek in building_code_list) husholdning = '+'.join(bc for bc in BuildingCategory if bc.is_residential()) yrkesbygg = '+'.join(bc for bc in BuildingCategory if bc.is_non_residential()) df = projection.copy() df.loc[df[BUILDING_CODE] == "default", BUILDING_CODE] = alle_building_code df.loc[df[BUILDING_CATEGORY] == "default", BUILDING_CATEGORY] = alle_bygningskategorier df.loc[df[BUILDING_CATEGORY] == RESIDENTIAL, BUILDING_CATEGORY] = husholdning df.loc[df[BUILDING_CATEGORY] == NON_RESIDENTIAL, BUILDING_CATEGORY] = yrkesbygg df = df.assign(**{BUILDING_CATEGORY: df[BUILDING_CATEGORY].str.split('+')}).explode(BUILDING_CATEGORY) df2 = df.assign(**{BUILDING_CODE: df[BUILDING_CODE].str.split('+')}).explode(BUILDING_CODE) df2 = df2.reset_index(drop=True) df2[score] = (df2[original_building_category] != 'default') * 1 + \ (~df2[original_building_category].isin(['default', NON_RESIDENTIAL, RESIDENTIAL])) * 1 + \ (df2[original_building_code] != 'default') * 1 df2 = df2.sort_values(by=[score]) de_duped = df2.drop_duplicates(subset=[BUILDING_CATEGORY, BUILDING_CODE, HEATING_SYSTEMS, NEW_HEATING_SYSTEMS], keep='last') return de_duped.drop(columns=[score, original_building_category, original_building_code])
[docs] def project_heating_systems(shares_start_year_all_systems: pd.DataFrame, projected_shares: pd.DataFrame, period: YearRange) -> pd.DataFrame: """ Forecast heating system shares over a given period based on initial shares and projected replacement rates. Parameters ---------- shares_start_year_all_systems : pandas.DataFrame DataFrame containing TEK_share values for all heating systems at the start year. projected_shares : pandas.DataFrame DataFrame containing projected replacement shares for heating systems across years. period : YearRange The projection period, defined by a start and end year. Returns ------- pandas.DataFrame A DataFrame with projected TEK_share values for both existing and new heating systems across the specified period. Notes ----- - The function melts the projected shares into long format and filters them to match the projection period. - It calculates new shares based on replacement rates and adjusts existing shares accordingly. - New and existing heating system shares are merged and aggregated, removing duplicates and zero-share entries. - The final output contains TEK_share values per year, building category, building code, and heating system. - Internal helper functions like `aggregere_lik_oppvarming_fjern_0` are used to clean and aggregate data. - The YEAR column is explicitly cast to integer at the end to ensure consistency. """ df = shares_start_year_all_systems.copy() inputfil_oppvarming = projected_shares.copy() df_framskrive_oppvarming_long = inputfil_oppvarming.melt(id_vars=[BUILDING_CATEGORY, BUILDING_CODE, HEATING_SYSTEMS, NEW_HEATING_SYSTEMS], var_name=YEAR, value_name="Andel_utskiftning") df_framskrive_oppvarming_long[YEAR] = df_framskrive_oppvarming_long[YEAR].astype(int) df_framskrive_oppvarming_long = df_framskrive_oppvarming_long[df_framskrive_oppvarming_long[YEAR].isin(period.subset(1).range())] liste_eksisterende_oppvarming = list(df_framskrive_oppvarming_long[HEATING_SYSTEMS].unique()) liste_ny_oppvarming = list(df_framskrive_oppvarming_long[NEW_HEATING_SYSTEMS].unique()) columns_to_keep = [BUILDING_CATEGORY, BUILDING_CODE, HEATING_SYSTEMS, YEAR, TEK_SHARES] oppvarming_og_tek = df.query(f"{HEATING_SYSTEMS} == {liste_eksisterende_oppvarming}")[columns_to_keep].copy() columns_to_keep = [BUILDING_CATEGORY, BUILDING_CODE, HEATING_SYSTEMS, TEK_SHARES] oppvarming_og_tek_foer_endring = df.query(f"{HEATING_SYSTEMS} == {liste_ny_oppvarming}")[columns_to_keep].copy() df_merge = oppvarming_og_tek.merge(df_framskrive_oppvarming_long, on=[BUILDING_CATEGORY, BUILDING_CODE, HEATING_SYSTEMS], how='inner') df_merge['Ny_andel'] = (df_merge[TEK_SHARES] * df_merge['Andel_utskiftning']) df_ny_andel_sum = df_merge.groupby([BUILDING_CATEGORY, BUILDING_CODE, HEATING_SYSTEMS, f'{YEAR}_y'], as_index = False)[['Ny_andel']].sum() df_ny_andel_sum = df_ny_andel_sum.rename(columns={"Ny_andel": "Sum_ny_andel"}) df_merge_sum_ny_andel = df_merge.merge(df_ny_andel_sum, on = [BUILDING_CATEGORY,BUILDING_CODE,HEATING_SYSTEMS, f'{YEAR}_y']) df_merge_sum_ny_andel['Eksisterende_andel'] = (df_merge_sum_ny_andel[TEK_SHARES] - df_merge_sum_ny_andel['Sum_ny_andel']) kolonner_eksisterende = [f'{YEAR}_y', BUILDING_CATEGORY, BUILDING_CODE, 'Eksisterende_andel', HEATING_SYSTEMS] navn_eksisterende_kolonner = {"Eksisterende_andel": TEK_SHARES, NEW_HEATING_SYSTEMS : HEATING_SYSTEMS, f'{YEAR}_y': YEAR} kolonner_nye = [f'{YEAR}_y', BUILDING_CATEGORY, BUILDING_CODE, 'Ny_andel', NEW_HEATING_SYSTEMS] navn_nye_kolonner = {"Ny_andel": TEK_SHARES, NEW_HEATING_SYSTEMS: HEATING_SYSTEMS, f'{YEAR}_y': YEAR} rekkefolge_kolonner = [YEAR, BUILDING_CATEGORY, BUILDING_CODE, HEATING_SYSTEMS, TEK_SHARES] nye_andeler_eksisterende = df_merge_sum_ny_andel[kolonner_eksisterende].rename(columns=navn_eksisterende_kolonner) nye_andeler_nye = df_merge_sum_ny_andel[kolonner_nye].rename(columns=navn_nye_kolonner) nye_andeler_nye = aggregere_lik_oppvarming_fjern_0(nye_andeler_nye) nye_andeler_pluss_eksisterende = nye_andeler_nye.merge(oppvarming_og_tek_foer_endring, on=[BUILDING_CATEGORY,BUILDING_CODE,HEATING_SYSTEMS], how='inner') nye_andeler_pluss_eksisterende[TEK_SHARES] = nye_andeler_pluss_eksisterende[f'{TEK_SHARES}_x'] + nye_andeler_pluss_eksisterende[f'{TEK_SHARES}_y'] nye_andeler_pluss_eksisterende = nye_andeler_pluss_eksisterende.drop(columns=[f'{TEK_SHARES}_x', f'{TEK_SHARES}_y']) nye_andeler_samlet = pd.concat([nye_andeler_eksisterende, nye_andeler_pluss_eksisterende]) nye_andeler_drop_dupe = nye_andeler_samlet.drop_duplicates( subset=[YEAR, BUILDING_CATEGORY, BUILDING_CODE, HEATING_SYSTEMS, TEK_SHARES], keep='first') nye_andeler_samlet_uten_0 = aggregere_lik_oppvarming_fjern_0(nye_andeler_drop_dupe) nye_andeler_samlet_uten_0 = nye_andeler_samlet_uten_0[rekkefolge_kolonner] # TODO: check dtype changes in function nye_andeler_samlet_uten_0[YEAR] = nye_andeler_samlet_uten_0[YEAR].astype(int) return nye_andeler_samlet_uten_0
[docs] def check_sum_of_shares(projected_shares: pd.DataFrame, precision: int = 10) -> None: """ Make sure that the sum of heating_system_share equals 1 per TEK, building category and year. Parameters ---------- projected_shares: pd.Dataframe Dataframe must contain columns: 'building_category', 'building_code', 'year' and 'heating_system_share' precision: int Precision used for value check (with round) Raises ------ ValueError If sum of shares for a building_codeis not equal to 1. """ df = projected_shares.copy() df = df.groupby(by=[BUILDING_CATEGORY, BUILDING_CODE, YEAR])[[TEK_SHARES]].sum() df['check'] = round(df[TEK_SHARES] * 100, precision) == 100.0 # noqa: PLR2004 invalid_shares = df[df['check'] == False].copy() # noqa: E712 invalid_shares = invalid_shares.drop(columns=['check']) if len(invalid_shares) > 0: logger.error('Sum of TEK shares not equal to 1 for:') for idx, row in invalid_shares.iterrows(): logger.error('{idx}: {}', idx=idx, row_dict=row.to_dict()) logger.warning('Skipping ValueError on sum!=1.0')
[docs] def add_existing_heating_system_shares_to_projection(new_shares: pd.DataFrame, existing_shares: pd.DataFrame, period: YearRange) -> pd.DataFrame: """ Extend heating system shares in the projection period by preserving TEK_share values for systems with unprojected existing building code shares. Parameters ---------- new_shares : pandas.DataFrame DataFrame containing projected heating system shares for buildings. existing_shares : pandas.DataFrame DataFrame containing existing heating system shares for buildings. period : YearRange The projection period, defined by a start and end year. Returns ------- pandas.DataFrame Combined DataFrame with projected shares and extended existing shares for heating systems not present in the new projections. Notes ----- - Heating systems with existing shares but missing from the new projections will retain their TEK_share values across the projection period. - The function filters out combinations already present in the new projections and extends the remaining ones across the projection years. - The `Sortering` column is used internally for identifying unique combinations of building category, code, and heating system. """ def sortering_oppvarmingstyper(df: pd.DataFrame) -> [pd.DataFrame, list[str]]: df_kombinasjoner = df.copy() df_kombinasjoner['Sortering'] = df_kombinasjoner[BUILDING_CATEGORY] + df_kombinasjoner[BUILDING_CODE] + \ df_kombinasjoner[HEATING_SYSTEMS] kombinasjonsliste = list(df_kombinasjoner['Sortering'].unique()) return df_kombinasjoner, kombinasjonsliste df_nye_andeler_kopi = new_shares.copy() new_shares, alle_nye_kombinasjonsliste = sortering_oppvarmingstyper(new_shares) existing_shares, _ = sortering_oppvarmingstyper(existing_shares) df_eksisterende_filtrert = existing_shares.query(f"Sortering != {alle_nye_kombinasjonsliste}") df_eksisterende_filtrert = df_eksisterende_filtrert.drop(columns = ['Sortering']) # TODO: set lower limit to period equal to last year (max) present in forecast data? projection_period = YearRange(period.start + 1, period.end).year_range utvidede_aar_uendret = pd.concat([ df_eksisterende_filtrert.assign(**{YEAR: year}) for year in projection_period ]) samlede_nye_andeler = pd.concat([utvidede_aar_uendret, df_nye_andeler_kopi, existing_shares], ignore_index=True) samlede_nye_andeler = samlede_nye_andeler.drop(columns=['Sortering']) return samlede_nye_andeler