Skip to content

Reference

framjules

JulES

JulES solver.

JulES

Bases: Solver

JulES solver.

Source code in framjules/JulES.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class JulES(Solver):
    """JulES solver."""

    def __init__(self) -> None:
        """Create new JulES solver with default config set."""
        super().__init__()
        self._config = JulESConfig()

    def get_config(self) -> JulESConfig:
        """Get internal config object. Modify this to configure JulES."""
        return self._config

    def _solve(
        self,
        folder: Path,
        model: Model,
    ) -> None:

        t0 = time()
        if _PROFILE_PERFORMANCE:
            profiler = cProfile.Profile()
            profiler.enable()
        handler = SolveHandler(folder, model, self.get_config())
        self.send_debug_event(f"SolveHandler time: {round(time() - t0, 2)} seconds")
        if _PROFILE_PERFORMANCE:
            profiler.disable()  # Stop profiling
            profiler.dump_stats("profile_solvehandler_init.prof")

        t = time()
        if _PROFILE_PERFORMANCE:
            profiler = cProfile.Profile()
            profiler.enable()
        handler.build()
        if _PROFILE_PERFORMANCE:
            profiler.disable()  # Stop profiling
            profiler.dump_stats("profile_solvehandler_build.prof")
        self.send_debug_event(f"build time: {round(time() - t, 2)} seconds")

        t = time()
        handler.configure()
        self.send_debug_event(f"configure time: {round(time() - t, 2)} seconds")

        t = time() 
        loaders: set[Loader] = set()
        add_loaders(loaders, model)
        for loader in loaders:
            loader.clear_cache()
        gc.collect()
        self.send_debug_event(f"clear_cache time: {round(time() - t, 2)} seconds")

        t = time() 
        handler.run()
        self.send_debug_event(f"run time: {round(time() - t, 2)} seconds")

        t = time()
        if _PROFILE_PERFORMANCE:
            profiler = cProfile.Profile()
            profiler.enable()
        handler.set_results()
        if _PROFILE_PERFORMANCE:
            profiler.disable()  # Stop profiling
            profiler.dump_stats("profile_solvehandler_results.prof")
        self.send_debug_event(f"set_results time: {round(time() - t, 2)} seconds")

        self.send_debug_event(f"JulES._solve time: {round(time() - t0, 2)} seconds")
__init__() -> None

Create new JulES solver with default config set.

Source code in framjules/JulES.py
22
23
24
25
def __init__(self) -> None:
    """Create new JulES solver with default config set."""
    super().__init__()
    self._config = JulESConfig()
get_config() -> JulESConfig

Get internal config object. Modify this to configure JulES.

Source code in framjules/JulES.py
27
28
29
def get_config(self) -> JulESConfig:
    """Get internal config object. Modify this to configure JulES."""
    return self._config

JulESConfig

Config object for JulES solver.

JulESConfig

Bases: SolverConfig

Class containing all config for JulES.

Source code in framjules/JulESConfig.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
class JulESConfig(SolverConfig):
    """Class containing all config for JulES."""

    def __init__(self) -> None:
        """Create new JulESConfig object."""
        super().__init__()

        self._julia_exe_path: Path | str | None = None
        self._julia_env_path: Path | str | None = None
        self._julia_depot_path: Path | str | None = None

        self._branch_jules: str | None = None
        self._branch_tulipa: str | None = None

        self._short_term_aggregations: list[Aggregator] = []

        self._time_resolution = JulESTimeResolution()

        self._duration_clearing = timedelta(days=1)
        self._duration_short_term = timedelta(days=6)
        self._duration_medium_term = timedelta(days=364)
        self._duration_long_term = timedelta(days=364 * 5)

        self._market_resolution_clearing = timedelta(hours=1)
        self._market_resolution_short_term = timedelta(hours=2)

        self._market_num_blocks_medium_term = 5
        self._market_num_blocks_long_term = 4

        self._storage_resolution_clearing = timedelta(days=1)
        self._storage_resolution_short_term = timedelta(days=2)
        self._storage_resolution_medium_term = timedelta(days=7)
        self._storage_resolution_long_term = timedelta(days=28)

        self._short_term_storage_cutoff_hours = 10

        self._debug_short_opt_solver = False
        self._debug_med_opt_solver = False
        self._debug_long_opt_solver = False
        self._debug_end_value_opt_solver = False
        self._debug_subsystem_master_opt_solver = False
        self._debug_subsystem_sub_opt_solver = False
        self._debug_clearing_opt_solver = False
        self._is_cache_db = True
        self._skip_install_dependencies = False

        self._clearing_days = 2
        self._market_duration_minutes = 6 * 60
        self._storage_duration_minutes = 2 * 24 * 60
        self._lookahead_days = 5 * 365
        self._detail_level = "fast"

        self._skipmax_days = 6
        self._warn_skipmax_days = 32

    def set_skipmax_days(self, days: int) -> None:
        """Set number of days between calculation of medium and long term storage values.

        This can speed up a simulation. The cost is less good storage values. The longer between
        re-calculation of storage values, the bigger negative impact on simulation result quality.

        If skipmax_days = 6 and clearing_days = 2, JulES will calculate medium and long
        term storage values every 3rd simulation step.
        """
        self._check_type(days, int)
        self._check_int(days, lower_bound=0, upper_bound=None)
        if days > self._warn_skipmax_days:
            message = (
                "Unusually high value for skipmax_days: "
                f"Medium and long term storage values updated only {days}th day. "
                "This can give poor simulation results due to poor storage utilization."
            )
            self.send_warning_event(message)
        self._skipmax_days = days

    def get_skipmax_days(self) -> int:
        """Get number of days between calculation of medium and long term storage values."""
        return self._skipmax_days

    def is_skip_install_dependencies(self) -> bool:
        """Return True if install dependencies will be skipped during by JulES.solve."""
        return self._skip_install_dependencies

    def activate_skip_install_dependencies(self) -> None:
        """Tell JulES to not install dependencies, assuming they are already installed.

        Default is to install.
        """
        self._skip_install_dependencies = True

    def deactivate_skip_install_dependencies(self) -> None:
        """Tell JulES to install dependencies. (This is the default)."""
        self._skip_install_dependencies = False

    def is_cache_db(self) -> bool:
        """Return True if JulES is allowed to use a cache to store precomputed values while building."""
        return self._is_cache_db

    def activate_cache_db(self) -> None:
        """Activates use of cache db."""
        self._is_cache_db = True

    def deactivate_cache_db(self) -> None:
        """Activates use of db without cache."""
        self._is_cache_db = False

    def get_time_resolution(self) -> JulESTimeResolution:
        """Get time resolution object. Modify this to modify time resolution of JulES."""
        return self._time_resolution

    def get_short_term_storage_cutoff_hours(self) -> int:
        """Return num hours.

        JulES will classify all storage subsystems with
        max storage duration less than cutoff as short term storage.
        """
        return self._short_term_storage_cutoff_hours

    def set_jules_version(self, jules_branch: str | None = None, tulipa_branch: str | None = None) -> None:
        """Set which git branch of JulES and/or TuLiPa to use."""
        self._check_type(jules_branch, (str, type(None)))
        self._check_type(tulipa_branch, (str, type(None)))
        if jules_branch is not None:
            self._branch_jules = jules_branch
        if tulipa_branch is not None:
            self._branch_tulipa = tulipa_branch
        if self._branch_tulipa is None and self._branch_jules is not None:
            self._branch_tulipa = self._branch_jules

    def get_jules_version(self) -> str | None:
        """Get JulES git branch."""
        return self._branch_jules

    def get_tulipa_version(self) -> str | None:
        """Get TuLiPa git branch."""
        return self._branch_tulipa

    def set_julia_depot_path(self, path: Path) -> None:
        """Set folder where Julia installs new packages."""
        self._check_type(path, Path)
        self._julia_depot_path = path

    def get_julia_depot_path(self) -> Path | None:
        """Get folder where Julia installs new packages."""
        return self._julia_depot_path

    def set_julia_env_path(self, path: Path) -> None:
        """Set which Julia environment to use."""
        self._check_type(path, Path)
        self._julia_env_path = path

    def get_julia_env_path(self) -> Path | None:
        """Get Julia environment being used."""
        return self._julia_env_path

    def set_julia_exe_path(self, path: Path) -> None:
        """Set which Julia installation to use."""
        self._check_type(path, Path)
        self._julia_exe_path = path

    def get_julia_exe_path(self) -> Path | None:
        """Get Julia installation being used."""
        return self._julia_exe_path

    def _check_supported_aggregators(self, aggregators: list[Aggregator]) -> None:
        for aggr in aggregators:
            if not isinstance(aggr, tuple(_SUPPORTED_AGGREGATORS)):
                message = f"Aggregator of type {type(aggr)} is not supported in JulES. Supported types are: {_SUPPORTED_AGGREGATORS}"
                raise TypeError(message)

    def set_short_term_aggregations(self, aggregators: list[Aggregator]) -> None:
        """Set aggregations to create the short term model from clearing (the Model object being solved)."""
        self._check_supported_aggregators(aggregators)
        self._short_term_aggregations = aggregators

    def get_short_term_aggregations(self) -> list[Aggregator]:
        """Get aggregations to create the short term model from clearing (the Model object being solved)."""
        return self._short_term_aggregations

    """
    Debug optimization solvers of the different JulES problems.
    Helpful if problems are encountered during solving, most commonly infeasibility issues.
    Replaces TuLiPa.HiGHS_Prob with TuLiPa.JuMP_Prob which has better debugging features:
    - More checks while building the optimization problem
    - If infeasible, solve the problem again with relaxed constraints (with penalties) and return the broken constraints
    - Outputs the optimization problem with variable and constraint names from FRAM
    """

    def set_debug_all_opt_solver(self, debug: bool) -> None:
        """Set whether to debug all optimization solvers."""
        self._debug_short_opt_solver = debug
        self._debug_med_opt_solver = debug
        self._debug_long_opt_solver = debug
        self._debug_end_value_opt_solver = debug
        self._debug_subsystem_master_opt_solver = debug
        self._debug_subsystem_sub_opt_solver = debug
        self._debug_clearing_opt_solver = debug

    def set_debug_short_opt_solver(self, debug: bool) -> None:
        """Set whether to debug the short-term optimization solver."""
        self._debug_short_opt_solver = debug

    def get_debug_short_opt_solver(self) -> bool:
        """Get whether to debug the short-term optimization solver."""
        return self._debug_short_opt_solver

    def set_debug_med_opt_solver(self, debug: bool) -> None:
        """Set whether to debug the medium-term optimization solver."""
        self._debug_med_opt_solver = debug

    def get_debug_med_opt_solver(self) -> bool:
        """Get whether to debug the medium-term optimization solver."""
        return self._debug_med_opt_solver

    def set_debug_long_opt_solver(self, debug: bool) -> None:
        """Set whether to debug the long-term optimization solver."""
        self._debug_long_opt_solver = debug

    def get_debug_long_opt_solver(self) -> bool:
        """Get whether to debug the long-term optimization solver."""
        return self._debug_long_opt_solver

    def set_debug_end_value_opt_solver(self, debug: bool) -> None:
        """Set whether to debug the end value optimization solver."""
        self._debug_end_value_opt_solver = debug

    def get_debug_end_value_opt_solver(self) -> bool:
        """Get whether to debug the end value optimization solver."""
        return self._debug_end_value_opt_solver

    def set_debug_subsystem_master_opt_solver(self, debug: bool) -> None:
        """Set whether to debug the subsystem master optimization solver."""
        self._debug_subsystem_master_opt_solver = debug

    def get_debug_subsystem_master_opt_solver(self) -> bool:
        """Get whether to debug the subsystem master optimization solver."""
        return self._debug_subsystem_master_opt_solver

    def set_debug_subsystem_sub_opt_solver(self, debug: bool) -> None:
        """Set whether to debug the subsystem sub optimization solver."""
        self._debug_subsystem_sub_opt_solver = debug

    def get_debug_subsystem_sub_opt_solver(self) -> bool:
        """Get whether to debug the subsystem sub optimization solver."""
        return self._debug_subsystem_sub_opt_solver

    def set_debug_clearing_opt_solver(self, debug: bool) -> None:
        """Set whether to debug the clearing optimization solver."""
        self._debug_clearing_opt_solver = debug

    def get_debug_clearing_opt_solver(self) -> bool:
        """Get whether to debug the clearing optimization solver."""
        return self._debug_clearing_opt_solver
__init__() -> None

Create new JulESConfig object.

Source code in framjules/JulESConfig.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def __init__(self) -> None:
    """Create new JulESConfig object."""
    super().__init__()

    self._julia_exe_path: Path | str | None = None
    self._julia_env_path: Path | str | None = None
    self._julia_depot_path: Path | str | None = None

    self._branch_jules: str | None = None
    self._branch_tulipa: str | None = None

    self._short_term_aggregations: list[Aggregator] = []

    self._time_resolution = JulESTimeResolution()

    self._duration_clearing = timedelta(days=1)
    self._duration_short_term = timedelta(days=6)
    self._duration_medium_term = timedelta(days=364)
    self._duration_long_term = timedelta(days=364 * 5)

    self._market_resolution_clearing = timedelta(hours=1)
    self._market_resolution_short_term = timedelta(hours=2)

    self._market_num_blocks_medium_term = 5
    self._market_num_blocks_long_term = 4

    self._storage_resolution_clearing = timedelta(days=1)
    self._storage_resolution_short_term = timedelta(days=2)
    self._storage_resolution_medium_term = timedelta(days=7)
    self._storage_resolution_long_term = timedelta(days=28)

    self._short_term_storage_cutoff_hours = 10

    self._debug_short_opt_solver = False
    self._debug_med_opt_solver = False
    self._debug_long_opt_solver = False
    self._debug_end_value_opt_solver = False
    self._debug_subsystem_master_opt_solver = False
    self._debug_subsystem_sub_opt_solver = False
    self._debug_clearing_opt_solver = False
    self._is_cache_db = True
    self._skip_install_dependencies = False

    self._clearing_days = 2
    self._market_duration_minutes = 6 * 60
    self._storage_duration_minutes = 2 * 24 * 60
    self._lookahead_days = 5 * 365
    self._detail_level = "fast"

    self._skipmax_days = 6
    self._warn_skipmax_days = 32
activate_cache_db() -> None

Activates use of cache db.

Source code in framjules/JulESConfig.py
114
115
116
def activate_cache_db(self) -> None:
    """Activates use of cache db."""
    self._is_cache_db = True
activate_skip_install_dependencies() -> None

Tell JulES to not install dependencies, assuming they are already installed.

Default is to install.

Source code in framjules/JulESConfig.py
 99
100
101
102
103
104
def activate_skip_install_dependencies(self) -> None:
    """Tell JulES to not install dependencies, assuming they are already installed.

    Default is to install.
    """
    self._skip_install_dependencies = True
deactivate_cache_db() -> None

Activates use of db without cache.

Source code in framjules/JulESConfig.py
118
119
120
def deactivate_cache_db(self) -> None:
    """Activates use of db without cache."""
    self._is_cache_db = False
deactivate_skip_install_dependencies() -> None

Tell JulES to install dependencies. (This is the default).

Source code in framjules/JulESConfig.py
106
107
108
def deactivate_skip_install_dependencies(self) -> None:
    """Tell JulES to install dependencies. (This is the default)."""
    self._skip_install_dependencies = False
get_debug_clearing_opt_solver() -> bool

Get whether to debug the clearing optimization solver.

Source code in framjules/JulESConfig.py
266
267
268
def get_debug_clearing_opt_solver(self) -> bool:
    """Get whether to debug the clearing optimization solver."""
    return self._debug_clearing_opt_solver
get_debug_end_value_opt_solver() -> bool

Get whether to debug the end value optimization solver.

Source code in framjules/JulESConfig.py
242
243
244
def get_debug_end_value_opt_solver(self) -> bool:
    """Get whether to debug the end value optimization solver."""
    return self._debug_end_value_opt_solver
get_debug_long_opt_solver() -> bool

Get whether to debug the long-term optimization solver.

Source code in framjules/JulESConfig.py
234
235
236
def get_debug_long_opt_solver(self) -> bool:
    """Get whether to debug the long-term optimization solver."""
    return self._debug_long_opt_solver
get_debug_med_opt_solver() -> bool

Get whether to debug the medium-term optimization solver.

Source code in framjules/JulESConfig.py
226
227
228
def get_debug_med_opt_solver(self) -> bool:
    """Get whether to debug the medium-term optimization solver."""
    return self._debug_med_opt_solver
get_debug_short_opt_solver() -> bool

Get whether to debug the short-term optimization solver.

Source code in framjules/JulESConfig.py
218
219
220
def get_debug_short_opt_solver(self) -> bool:
    """Get whether to debug the short-term optimization solver."""
    return self._debug_short_opt_solver
get_debug_subsystem_master_opt_solver() -> bool

Get whether to debug the subsystem master optimization solver.

Source code in framjules/JulESConfig.py
250
251
252
def get_debug_subsystem_master_opt_solver(self) -> bool:
    """Get whether to debug the subsystem master optimization solver."""
    return self._debug_subsystem_master_opt_solver
get_debug_subsystem_sub_opt_solver() -> bool

Get whether to debug the subsystem sub optimization solver.

Source code in framjules/JulESConfig.py
258
259
260
def get_debug_subsystem_sub_opt_solver(self) -> bool:
    """Get whether to debug the subsystem sub optimization solver."""
    return self._debug_subsystem_sub_opt_solver
get_jules_version() -> str | None

Get JulES git branch.

Source code in framjules/JulESConfig.py
145
146
147
def get_jules_version(self) -> str | None:
    """Get JulES git branch."""
    return self._branch_jules
get_julia_depot_path() -> Path | None

Get folder where Julia installs new packages.

Source code in framjules/JulESConfig.py
158
159
160
def get_julia_depot_path(self) -> Path | None:
    """Get folder where Julia installs new packages."""
    return self._julia_depot_path
get_julia_env_path() -> Path | None

Get Julia environment being used.

Source code in framjules/JulESConfig.py
167
168
169
def get_julia_env_path(self) -> Path | None:
    """Get Julia environment being used."""
    return self._julia_env_path
get_julia_exe_path() -> Path | None

Get Julia installation being used.

Source code in framjules/JulESConfig.py
176
177
178
def get_julia_exe_path(self) -> Path | None:
    """Get Julia installation being used."""
    return self._julia_exe_path
get_short_term_aggregations() -> list[Aggregator]

Get aggregations to create the short term model from clearing (the Model object being solved).

Source code in framjules/JulESConfig.py
191
192
193
def get_short_term_aggregations(self) -> list[Aggregator]:
    """Get aggregations to create the short term model from clearing (the Model object being solved)."""
    return self._short_term_aggregations
get_short_term_storage_cutoff_hours() -> int

Return num hours.

JulES will classify all storage subsystems with max storage duration less than cutoff as short term storage.

Source code in framjules/JulESConfig.py
126
127
128
129
130
131
132
def get_short_term_storage_cutoff_hours(self) -> int:
    """Return num hours.

    JulES will classify all storage subsystems with
    max storage duration less than cutoff as short term storage.
    """
    return self._short_term_storage_cutoff_hours
get_skipmax_days() -> int

Get number of days between calculation of medium and long term storage values.

Source code in framjules/JulESConfig.py
91
92
93
def get_skipmax_days(self) -> int:
    """Get number of days between calculation of medium and long term storage values."""
    return self._skipmax_days
get_time_resolution() -> JulESTimeResolution

Get time resolution object. Modify this to modify time resolution of JulES.

Source code in framjules/JulESConfig.py
122
123
124
def get_time_resolution(self) -> JulESTimeResolution:
    """Get time resolution object. Modify this to modify time resolution of JulES."""
    return self._time_resolution
get_tulipa_version() -> str | None

Get TuLiPa git branch.

Source code in framjules/JulESConfig.py
149
150
151
def get_tulipa_version(self) -> str | None:
    """Get TuLiPa git branch."""
    return self._branch_tulipa
is_cache_db() -> bool

Return True if JulES is allowed to use a cache to store precomputed values while building.

Source code in framjules/JulESConfig.py
110
111
112
def is_cache_db(self) -> bool:
    """Return True if JulES is allowed to use a cache to store precomputed values while building."""
    return self._is_cache_db
is_skip_install_dependencies() -> bool

Return True if install dependencies will be skipped during by JulES.solve.

Source code in framjules/JulESConfig.py
95
96
97
def is_skip_install_dependencies(self) -> bool:
    """Return True if install dependencies will be skipped during by JulES.solve."""
    return self._skip_install_dependencies
set_debug_all_opt_solver(debug: bool) -> None

Set whether to debug all optimization solvers.

Source code in framjules/JulESConfig.py
204
205
206
207
208
209
210
211
212
def set_debug_all_opt_solver(self, debug: bool) -> None:
    """Set whether to debug all optimization solvers."""
    self._debug_short_opt_solver = debug
    self._debug_med_opt_solver = debug
    self._debug_long_opt_solver = debug
    self._debug_end_value_opt_solver = debug
    self._debug_subsystem_master_opt_solver = debug
    self._debug_subsystem_sub_opt_solver = debug
    self._debug_clearing_opt_solver = debug
set_debug_clearing_opt_solver(debug: bool) -> None

Set whether to debug the clearing optimization solver.

Source code in framjules/JulESConfig.py
262
263
264
def set_debug_clearing_opt_solver(self, debug: bool) -> None:
    """Set whether to debug the clearing optimization solver."""
    self._debug_clearing_opt_solver = debug
set_debug_end_value_opt_solver(debug: bool) -> None

Set whether to debug the end value optimization solver.

Source code in framjules/JulESConfig.py
238
239
240
def set_debug_end_value_opt_solver(self, debug: bool) -> None:
    """Set whether to debug the end value optimization solver."""
    self._debug_end_value_opt_solver = debug
set_debug_long_opt_solver(debug: bool) -> None

Set whether to debug the long-term optimization solver.

Source code in framjules/JulESConfig.py
230
231
232
def set_debug_long_opt_solver(self, debug: bool) -> None:
    """Set whether to debug the long-term optimization solver."""
    self._debug_long_opt_solver = debug
set_debug_med_opt_solver(debug: bool) -> None

Set whether to debug the medium-term optimization solver.

Source code in framjules/JulESConfig.py
222
223
224
def set_debug_med_opt_solver(self, debug: bool) -> None:
    """Set whether to debug the medium-term optimization solver."""
    self._debug_med_opt_solver = debug
set_debug_short_opt_solver(debug: bool) -> None

Set whether to debug the short-term optimization solver.

Source code in framjules/JulESConfig.py
214
215
216
def set_debug_short_opt_solver(self, debug: bool) -> None:
    """Set whether to debug the short-term optimization solver."""
    self._debug_short_opt_solver = debug
set_debug_subsystem_master_opt_solver(debug: bool) -> None

Set whether to debug the subsystem master optimization solver.

Source code in framjules/JulESConfig.py
246
247
248
def set_debug_subsystem_master_opt_solver(self, debug: bool) -> None:
    """Set whether to debug the subsystem master optimization solver."""
    self._debug_subsystem_master_opt_solver = debug
set_debug_subsystem_sub_opt_solver(debug: bool) -> None

Set whether to debug the subsystem sub optimization solver.

Source code in framjules/JulESConfig.py
254
255
256
def set_debug_subsystem_sub_opt_solver(self, debug: bool) -> None:
    """Set whether to debug the subsystem sub optimization solver."""
    self._debug_subsystem_sub_opt_solver = debug
set_jules_version(jules_branch: str | None = None, tulipa_branch: str | None = None) -> None

Set which git branch of JulES and/or TuLiPa to use.

Source code in framjules/JulESConfig.py
134
135
136
137
138
139
140
141
142
143
def set_jules_version(self, jules_branch: str | None = None, tulipa_branch: str | None = None) -> None:
    """Set which git branch of JulES and/or TuLiPa to use."""
    self._check_type(jules_branch, (str, type(None)))
    self._check_type(tulipa_branch, (str, type(None)))
    if jules_branch is not None:
        self._branch_jules = jules_branch
    if tulipa_branch is not None:
        self._branch_tulipa = tulipa_branch
    if self._branch_tulipa is None and self._branch_jules is not None:
        self._branch_tulipa = self._branch_jules
set_julia_depot_path(path: Path) -> None

Set folder where Julia installs new packages.

Source code in framjules/JulESConfig.py
153
154
155
156
def set_julia_depot_path(self, path: Path) -> None:
    """Set folder where Julia installs new packages."""
    self._check_type(path, Path)
    self._julia_depot_path = path
set_julia_env_path(path: Path) -> None

Set which Julia environment to use.

Source code in framjules/JulESConfig.py
162
163
164
165
def set_julia_env_path(self, path: Path) -> None:
    """Set which Julia environment to use."""
    self._check_type(path, Path)
    self._julia_env_path = path
set_julia_exe_path(path: Path) -> None

Set which Julia installation to use.

Source code in framjules/JulESConfig.py
171
172
173
174
def set_julia_exe_path(self, path: Path) -> None:
    """Set which Julia installation to use."""
    self._check_type(path, Path)
    self._julia_exe_path = path
set_short_term_aggregations(aggregators: list[Aggregator]) -> None

Set aggregations to create the short term model from clearing (the Model object being solved).

Source code in framjules/JulESConfig.py
186
187
188
189
def set_short_term_aggregations(self, aggregators: list[Aggregator]) -> None:
    """Set aggregations to create the short term model from clearing (the Model object being solved)."""
    self._check_supported_aggregators(aggregators)
    self._short_term_aggregations = aggregators
set_skipmax_days(days: int) -> None

Set number of days between calculation of medium and long term storage values.

This can speed up a simulation. The cost is less good storage values. The longer between re-calculation of storage values, the bigger negative impact on simulation result quality.

If skipmax_days = 6 and clearing_days = 2, JulES will calculate medium and long term storage values every 3rd simulation step.

Source code in framjules/JulESConfig.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def set_skipmax_days(self, days: int) -> None:
    """Set number of days between calculation of medium and long term storage values.

    This can speed up a simulation. The cost is less good storage values. The longer between
    re-calculation of storage values, the bigger negative impact on simulation result quality.

    If skipmax_days = 6 and clearing_days = 2, JulES will calculate medium and long
    term storage values every 3rd simulation step.
    """
    self._check_type(days, int)
    self._check_int(days, lower_bound=0, upper_bound=None)
    if days > self._warn_skipmax_days:
        message = (
            "Unusually high value for skipmax_days: "
            f"Medium and long term storage values updated only {days}th day. "
            "This can give poor simulation results due to poor storage utilization."
        )
        self.send_warning_event(message)
    self._skipmax_days = days

JulESTimeResolution

JulESTimeResolution

Bases: Base

Time resolution settings for JulES (only some are modifiable).

Source code in framjules/JulESTimeResolution.py
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
class JulESTimeResolution(Base):
    """Time resolution settings for JulES (only some are modifiable)."""

    def __init__(self) -> None:
        """Create instance with default values."""
        self._target_med_days = 12 * 7
        self._target_long_storage_days = 6 * 7
        self._target_lookahead_days = 365 * 5
        self._target_ev_days = 365 * 3

        self._clearing_days = 2
        self._short_days = 5

        self._clearing_market_minutes = 60 * 3
        self._clearing_storage_minutes = 60 * 24

        self._short_market_minutes = self._get_incremented_divisor(
            n=self._clearing_days * 24 * 60,
            divisor=self._clearing_market_minutes,
            num_increments=1,
        )
        self._short_storage_minutes = 60 * 24 * self._short_days

        self._long_adaptive_blocks = 4
        self._long_adaptive_hours = 6

        self._med_adaptive_blocks = 4
        self._med_adaptive_hours = 6

        med_days, long_days, med_storage_days, long_storage_days = self._get_med_long_days_and_storage_days(
            self._target_lookahead_days,
            self._clearing_days,
            self._short_days,
        )

        self._med_days = med_days
        self._long_days = long_days
        self._med_storage_days = med_storage_days
        self._long_storage_days = long_storage_days

    def set_target_ev_days(self, x: int) -> None:
        """Set prefered value for length in days of end value problems.

        Will choose a close valid value.
        """
        self._check_type(x, int)
        self._check_int(value=x, lower_bound=self._clearing_days + self._short_days, upper_bound=None)
        self._target_ev_days = x

    def set_target_long_storage_days(self, x: int) -> None:
        """Set prefered value for long_storage_days.

        Will choose a close valid value.
        """
        self._check_type(x, int)
        self._check_int(value=x, lower_bound=self._clearing_days + self._short_days, upper_bound=None)
        self._target_long_storage_days = x

    def set_target_med_days(self, x: int) -> None:
        """Set prefered value for horizon length in days in medium prognosis problem.

        Will choose a close valid value.
        """
        self._check_type(x, int)
        self._check_int(value=x, lower_bound=self._clearing_days + self._short_days, upper_bound=None)
        self._target_med_num_period = x

    def set_clearing_days(self, x: int) -> None:
        """Set length of clearing problem in days."""
        self._check_type(x, int)
        self._check_int(value=x, lower_bound=1, upper_bound=None)

        self._clearing_days = x

        clearing_minutes = self._clearing_days * 24 * 60

        self._clearing_market_minutes = min(self._clearing_market_minutes, clearing_minutes)
        self._clearing_storage_minutes = min(self._clearing_storage_minutes, clearing_minutes)

        if clearing_minutes % self._clearing_market_minutes != 0:
            message = (
                f"clearing_market_minutes ({self._clearing_market_minutes}) does not go up"
                f" in clearing_days in minutes ({clearing_minutes})."
            )
            raise ValueError(message)

        if clearing_minutes % self._clearing_storage_minutes != 0:
            message = (
                f"clearing_storage_minutes ({self._clearing_storage_minutes}) does not go up"
                f" in clearing_days in minutes ({clearing_minutes})."
            )
            raise ValueError(message)

        self.set_target_lookahead_days(self._target_lookahead_days)

    def set_short_days(self, x: int) -> None:
        """Set length of short term prognosis problem in days."""
        self._check_type(x, int)
        self._check_int(value=x, lower_bound=1, upper_bound=None)

        self._short_days = x

        short_minutes = self._short_days * 24 * 60

        self._short_market_minutes = min(self._short_market_minutes, short_minutes)
        self._short_storage_minutes = min(self._short_storage_minutes, short_minutes)

        if short_minutes % self._short_market_minutes != 0:
            message = (
                f"short_market_minutes ({self._short_market_minutes}) does not go up"
                f" in short_days in minutes ({short_minutes})."
            )
            raise ValueError(message)

        if short_minutes % self._short_storage_minutes != 0:
            message = (
                f"short_storage_minutes ({self._short_storage_minutes}) does not go up"
                f" in short_days in minutes ({short_minutes})."
            )
            raise ValueError(message)

        self.set_target_lookahead_days(self._target_lookahead_days)

    def set_target_lookahead_days(self, x: int) -> None:
        """Set target length of prognosis problems in days.

        Will set med_days and long_days and make sure their sum is minimum this length.

        Will set short_days if target_lookahead_days < short_days.
        """
        self._check_type(x, int)
        self._check_int(value=x, lower_bound=1, upper_bound=None)

        self._target_lookahead_days = x

        if x < self._short_days:
            self.set_short_days(x)

        med_days, long_days, med_storage_days, long_storage_days = self._get_med_long_days_and_storage_days(
            self._target_lookahead_days,
            self._clearing_days,
            self._short_days,
        )

        self._med_days = med_days
        self._long_days = long_days
        self._med_storage_days = med_storage_days
        self._long_storage_days = long_storage_days

    def set_clearing_market_minutes(self, x: int) -> None:
        """Set market period length in clearing problem in minutes. Currently only support whole hours."""
        self._check_type(x, int)
        self._check_int(value=x, lower_bound=1, upper_bound=None)
        self._check_hourly(x)
        self._clearing_market_minutes = x

    def set_clearing_storage_minutes(self, x: int) -> None:
        """Set storage period length in clearing problem in minutes. Currently only support whole hours."""
        self._check_type(x, int)
        self._check_int(value=x, lower_bound=1, upper_bound=None)
        self._clearing_storage_minutes = x

    def set_short_market_minutes(self, x: int) -> None:
        """Set market period length in short prognosis problem in minutes. Currently only support whole hours."""
        self._check_type(x, int)
        self._check_int(value=x, lower_bound=1, upper_bound=None)
        self._short_market_minutes = x

    def set_short_storage_minutes(self, x: int) -> None:
        """Set storage period length in short prognosis problem in minutes. Currently only support whole hours."""
        self._check_type(x, int)
        self._check_int(value=x, lower_bound=1, upper_bound=None)
        self._short_storage_minutes = x

    def get_clearing_days(self) -> int:
        """Get length of clearing problem in days."""
        return self._clearing_days

    def get_short_days(self) -> int:
        """Get length of short prognosis problem in days."""
        return self._short_days

    def get_target_lookahead_days(self) -> int:
        """Get target (minimum) length of prognosis problems in days."""
        return self._target_lookahead_days

    def get_med_days(self) -> int:
        """Get length of medium prognosis problem in days."""
        return self._med_days

    def get_long_days(self) -> int:
        """Get length of long prognosis problem in days."""
        return self._long_days

    def get_med_storage_days(self) -> int:
        """Get storage period length in medium prognosis problem in days."""
        return self._med_storage_days

    def get_long_storage_days(self) -> int:
        """Get storage period length in long prognosis problem in days."""
        return self._long_storage_days

    def get_clearing_market_minutes(self) -> int:
        """Get market period length in clearing problem in minutes. Currently only support whole hours."""
        return self._clearing_market_minutes

    def get_clearing_storage_minutes(self) -> int:
        """Get storage period length in clearing problem in minutes. Currently only support whole hours."""
        return self._clearing_storage_minutes

    def get_short_market_minutes(self) -> int:
        """Get market period length in short prognosis problem in minutes. Currently only support whole hours."""
        return self._short_market_minutes

    def get_short_storage_minutes(self) -> int:
        """Get storage period length in short prognosis problem in minutes. Currently only support whole hours."""
        return self._short_storage_minutes

    def get_long_adaptive_blocks(self) -> int:
        """Get number of market periods long prognosis problem."""
        return self._long_adaptive_blocks

    def get_long_adaptive_hours(self) -> int:
        """Get resolution in hours used in clustering of market period blocks in medium prognosis problem."""
        return self._long_adaptive_hours

    def get_med_adaptive_blocks(self) -> int:
        """Get number of market periods long prognosis problem."""
        return self._med_adaptive_blocks

    def get_med_adaptive_hours(self) -> int:
        """Get resolution in hours used in clustering of market period blocks in medium prognosis problem."""
        return self._med_adaptive_hours

    def get_target_long_storage_days(self) -> int:
        """Get prefered value for long_storage_days."""
        return self._target_long_storage_days

    def get_target_med_days(self) -> int:
        """Get prefered value for horizon length of medium prognosis problem."""
        return self._target_med_days

    def get_target_ev_days(self) -> int:
        """Get prefered value for horizon length of end value problem."""
        return self._target_ev_days

    def get_ev_days(self) -> int:
        """Get number of days in horizon of end value problems."""
        target_horizon_days = self.get_target_ev_days()
        long_period_days = self.get_long_storage_days()
        return math.ceil(target_horizon_days / long_period_days) * long_period_days

    def get_content_dict(self) -> dict[str, int]:
        """Return dict of all settings. Useful to get an overview."""
        return {
            "clearing_days": self.get_clearing_days(),
            "short_days": self.get_short_days(),
            "med_days": self.get_med_days(),
            "long_days": self.get_long_days(),
            "long_storage_days": self.get_long_storage_days(),
            "med_storage_days": self.get_med_storage_days(),
            "clearing_market_minutes": self.get_clearing_market_minutes(),
            "clearing_storage_minutes": self.get_clearing_storage_minutes(),
            "short_market_minutes": self.get_short_market_minutes(),
            "short_storage_minutes": self.get_short_storage_minutes(),
            "long_adaptive_blocks": self.get_long_adaptive_blocks(),
            "long_adaptive_hours": self.get_long_adaptive_hours(),
            "med_adaptive_blocks": self.get_med_adaptive_blocks(),
            "med_adaptive_hours": self.get_med_adaptive_hours(),
            "target_lookahead_days": self.get_target_lookahead_days(),
            "target_long_storage_days": self.get_target_long_storage_days(),
            "target_med_days": self.get_target_med_days(),
            "target_ev_days": self.get_target_ev_days(),
            "ev_days": self.get_ev_days(),
        }

    def _get_med_long_days_and_storage_days(
        self,
        target_lookahead_days: int,
        clearing_days: int,
        short_days: int,
    ) -> tuple[int, int, int, int]:
        """Find the valid configuration that is closest to the user supplied targets."""

        med_period = clearing_days + short_days

        possible_med_pairs = self._get_possible_med_pairs(med_period, target_lookahead_days, short_days)

        if not possible_med_pairs:
            return tuple([clearing_days + short_days] * 4)

        candidates = set()
        for med_period, med_num_periods in possible_med_pairs:
            possible_long_pairs = self._get_possible_long_pairs(
                med_period,
                med_num_periods,
                target_lookahead_days,
                short_days,
            )
            for long_period, long_num_periods in possible_long_pairs:
                med_days = med_period * med_num_periods
                long_days = long_period * long_num_periods
                candidate = (med_days, long_days, med_period, long_period)
                candidates.add(candidate)

        target_med_days = self.get_target_med_days()
        target_long_storage_days = self.get_target_long_storage_days()

        def distance_from_targets(candidate: tuple[int, int, int, int]) -> float | int:
            med_days = candidate[0]
            long_storage_days = candidate[3]
            med_square_diff = (target_med_days - med_days) ** 2
            long_square_diff = (target_long_storage_days - long_storage_days) ** 2
            return med_square_diff + long_square_diff

        return min(candidates, key=distance_from_targets)

    def _get_possible_med_pairs(
        self,
        med_period: int,
        target_lookahead_days: int,
        short_days: int,
    ) -> list[tuple[int, int]]:
        """Fuzz number of med storage periods to get more candidates."""
        target_med_days = self.get_target_med_days()
        out = []
        n = math.ceil(target_med_days / med_period)
        for m in [n - 1, n, n + 1]:
            implied_target_lookahead_days = med_period * (m + 1) + short_days
            if m > 1 and implied_target_lookahead_days <= target_lookahead_days:
                out.append((med_period, m))
        return out

    def _get_possible_long_pairs(
        self,
        med_period: int,
        med_num_periods: int,
        target_lookahead_days: int,
        short_days: int,
    ) -> list[tuple[int, int]]:
        """Find valid long pairs. Valid if long_period is divisor of med_days."""
        target_long_storage_days = self.get_target_long_storage_days()
        med_days = med_period * med_num_periods
        divisors = self._get_divisors(med_days)
        divisors = sorted(divisors, key=lambda x: abs(target_long_storage_days - x))
        divisors = divisors[:4]
        out = []
        for long_period in divisors:
            long_num_periods = math.ceil(max(1, target_lookahead_days - med_days - short_days) / long_period)
            out.append((long_period, long_num_periods))
        return out

    def _get_divisors(self, n: int) -> list[int]:
        """Return sorted list of divisors of n.

        Inspiration from: https://stackoverflow.com/questions/171765/what-is-the-best-way-to-get-all-the-divisors-of-a-number
        """
        divs = [1]
        for i in range(2, int(math.sqrt(n)) + 1):
            if n % i == 0:
                divs.extend([i, int(n / i)])
        divs.extend([n])
        return sorted(list(set(divs)))

    def _get_incremented_divisor(self, n: int, divisor: int, num_increments: int) -> int:
        """Get divisor of n num_increments greater than k if <= n else n."""
        divs = self._get_divisors(n)
        try:
            i = divs.index(divisor)
        except ValueError:
            message = f"{divisor} is not a divisor of {n}."
            raise ValueError(message) from None
        return divs[min(i + num_increments, len(divs) - 1)]

    def _check_hourly(self, x: int) -> None:
        if not (x / 60).is_integer():
            message = "Currently, JulES only support hourly resolutions."
            raise ValueError(message)
__init__() -> None

Create instance with default values.

Source code in framjules/JulESTimeResolution.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def __init__(self) -> None:
    """Create instance with default values."""
    self._target_med_days = 12 * 7
    self._target_long_storage_days = 6 * 7
    self._target_lookahead_days = 365 * 5
    self._target_ev_days = 365 * 3

    self._clearing_days = 2
    self._short_days = 5

    self._clearing_market_minutes = 60 * 3
    self._clearing_storage_minutes = 60 * 24

    self._short_market_minutes = self._get_incremented_divisor(
        n=self._clearing_days * 24 * 60,
        divisor=self._clearing_market_minutes,
        num_increments=1,
    )
    self._short_storage_minutes = 60 * 24 * self._short_days

    self._long_adaptive_blocks = 4
    self._long_adaptive_hours = 6

    self._med_adaptive_blocks = 4
    self._med_adaptive_hours = 6

    med_days, long_days, med_storage_days, long_storage_days = self._get_med_long_days_and_storage_days(
        self._target_lookahead_days,
        self._clearing_days,
        self._short_days,
    )

    self._med_days = med_days
    self._long_days = long_days
    self._med_storage_days = med_storage_days
    self._long_storage_days = long_storage_days
get_clearing_days() -> int

Get length of clearing problem in days.

Source code in framjules/JulESTimeResolution.py
179
180
181
def get_clearing_days(self) -> int:
    """Get length of clearing problem in days."""
    return self._clearing_days
get_clearing_market_minutes() -> int

Get market period length in clearing problem in minutes. Currently only support whole hours.

Source code in framjules/JulESTimeResolution.py
207
208
209
def get_clearing_market_minutes(self) -> int:
    """Get market period length in clearing problem in minutes. Currently only support whole hours."""
    return self._clearing_market_minutes
get_clearing_storage_minutes() -> int

Get storage period length in clearing problem in minutes. Currently only support whole hours.

Source code in framjules/JulESTimeResolution.py
211
212
213
def get_clearing_storage_minutes(self) -> int:
    """Get storage period length in clearing problem in minutes. Currently only support whole hours."""
    return self._clearing_storage_minutes
get_content_dict() -> dict[str, int]

Return dict of all settings. Useful to get an overview.

Source code in framjules/JulESTimeResolution.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def get_content_dict(self) -> dict[str, int]:
    """Return dict of all settings. Useful to get an overview."""
    return {
        "clearing_days": self.get_clearing_days(),
        "short_days": self.get_short_days(),
        "med_days": self.get_med_days(),
        "long_days": self.get_long_days(),
        "long_storage_days": self.get_long_storage_days(),
        "med_storage_days": self.get_med_storage_days(),
        "clearing_market_minutes": self.get_clearing_market_minutes(),
        "clearing_storage_minutes": self.get_clearing_storage_minutes(),
        "short_market_minutes": self.get_short_market_minutes(),
        "short_storage_minutes": self.get_short_storage_minutes(),
        "long_adaptive_blocks": self.get_long_adaptive_blocks(),
        "long_adaptive_hours": self.get_long_adaptive_hours(),
        "med_adaptive_blocks": self.get_med_adaptive_blocks(),
        "med_adaptive_hours": self.get_med_adaptive_hours(),
        "target_lookahead_days": self.get_target_lookahead_days(),
        "target_long_storage_days": self.get_target_long_storage_days(),
        "target_med_days": self.get_target_med_days(),
        "target_ev_days": self.get_target_ev_days(),
        "ev_days": self.get_ev_days(),
    }
get_ev_days() -> int

Get number of days in horizon of end value problems.

Source code in framjules/JulESTimeResolution.py
251
252
253
254
255
def get_ev_days(self) -> int:
    """Get number of days in horizon of end value problems."""
    target_horizon_days = self.get_target_ev_days()
    long_period_days = self.get_long_storage_days()
    return math.ceil(target_horizon_days / long_period_days) * long_period_days
get_long_adaptive_blocks() -> int

Get number of market periods long prognosis problem.

Source code in framjules/JulESTimeResolution.py
223
224
225
def get_long_adaptive_blocks(self) -> int:
    """Get number of market periods long prognosis problem."""
    return self._long_adaptive_blocks
get_long_adaptive_hours() -> int

Get resolution in hours used in clustering of market period blocks in medium prognosis problem.

Source code in framjules/JulESTimeResolution.py
227
228
229
def get_long_adaptive_hours(self) -> int:
    """Get resolution in hours used in clustering of market period blocks in medium prognosis problem."""
    return self._long_adaptive_hours
get_long_days() -> int

Get length of long prognosis problem in days.

Source code in framjules/JulESTimeResolution.py
195
196
197
def get_long_days(self) -> int:
    """Get length of long prognosis problem in days."""
    return self._long_days
get_long_storage_days() -> int

Get storage period length in long prognosis problem in days.

Source code in framjules/JulESTimeResolution.py
203
204
205
def get_long_storage_days(self) -> int:
    """Get storage period length in long prognosis problem in days."""
    return self._long_storage_days
get_med_adaptive_blocks() -> int

Get number of market periods long prognosis problem.

Source code in framjules/JulESTimeResolution.py
231
232
233
def get_med_adaptive_blocks(self) -> int:
    """Get number of market periods long prognosis problem."""
    return self._med_adaptive_blocks
get_med_adaptive_hours() -> int

Get resolution in hours used in clustering of market period blocks in medium prognosis problem.

Source code in framjules/JulESTimeResolution.py
235
236
237
def get_med_adaptive_hours(self) -> int:
    """Get resolution in hours used in clustering of market period blocks in medium prognosis problem."""
    return self._med_adaptive_hours
get_med_days() -> int

Get length of medium prognosis problem in days.

Source code in framjules/JulESTimeResolution.py
191
192
193
def get_med_days(self) -> int:
    """Get length of medium prognosis problem in days."""
    return self._med_days
get_med_storage_days() -> int

Get storage period length in medium prognosis problem in days.

Source code in framjules/JulESTimeResolution.py
199
200
201
def get_med_storage_days(self) -> int:
    """Get storage period length in medium prognosis problem in days."""
    return self._med_storage_days
get_short_days() -> int

Get length of short prognosis problem in days.

Source code in framjules/JulESTimeResolution.py
183
184
185
def get_short_days(self) -> int:
    """Get length of short prognosis problem in days."""
    return self._short_days
get_short_market_minutes() -> int

Get market period length in short prognosis problem in minutes. Currently only support whole hours.

Source code in framjules/JulESTimeResolution.py
215
216
217
def get_short_market_minutes(self) -> int:
    """Get market period length in short prognosis problem in minutes. Currently only support whole hours."""
    return self._short_market_minutes
get_short_storage_minutes() -> int

Get storage period length in short prognosis problem in minutes. Currently only support whole hours.

Source code in framjules/JulESTimeResolution.py
219
220
221
def get_short_storage_minutes(self) -> int:
    """Get storage period length in short prognosis problem in minutes. Currently only support whole hours."""
    return self._short_storage_minutes
get_target_ev_days() -> int

Get prefered value for horizon length of end value problem.

Source code in framjules/JulESTimeResolution.py
247
248
249
def get_target_ev_days(self) -> int:
    """Get prefered value for horizon length of end value problem."""
    return self._target_ev_days
get_target_long_storage_days() -> int

Get prefered value for long_storage_days.

Source code in framjules/JulESTimeResolution.py
239
240
241
def get_target_long_storage_days(self) -> int:
    """Get prefered value for long_storage_days."""
    return self._target_long_storage_days
get_target_lookahead_days() -> int

Get target (minimum) length of prognosis problems in days.

Source code in framjules/JulESTimeResolution.py
187
188
189
def get_target_lookahead_days(self) -> int:
    """Get target (minimum) length of prognosis problems in days."""
    return self._target_lookahead_days
get_target_med_days() -> int

Get prefered value for horizon length of medium prognosis problem.

Source code in framjules/JulESTimeResolution.py
243
244
245
def get_target_med_days(self) -> int:
    """Get prefered value for horizon length of medium prognosis problem."""
    return self._target_med_days
set_clearing_days(x: int) -> None

Set length of clearing problem in days.

Source code in framjules/JulESTimeResolution.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def set_clearing_days(self, x: int) -> None:
    """Set length of clearing problem in days."""
    self._check_type(x, int)
    self._check_int(value=x, lower_bound=1, upper_bound=None)

    self._clearing_days = x

    clearing_minutes = self._clearing_days * 24 * 60

    self._clearing_market_minutes = min(self._clearing_market_minutes, clearing_minutes)
    self._clearing_storage_minutes = min(self._clearing_storage_minutes, clearing_minutes)

    if clearing_minutes % self._clearing_market_minutes != 0:
        message = (
            f"clearing_market_minutes ({self._clearing_market_minutes}) does not go up"
            f" in clearing_days in minutes ({clearing_minutes})."
        )
        raise ValueError(message)

    if clearing_minutes % self._clearing_storage_minutes != 0:
        message = (
            f"clearing_storage_minutes ({self._clearing_storage_minutes}) does not go up"
            f" in clearing_days in minutes ({clearing_minutes})."
        )
        raise ValueError(message)

    self.set_target_lookahead_days(self._target_lookahead_days)
set_clearing_market_minutes(x: int) -> None

Set market period length in clearing problem in minutes. Currently only support whole hours.

Source code in framjules/JulESTimeResolution.py
154
155
156
157
158
159
def set_clearing_market_minutes(self, x: int) -> None:
    """Set market period length in clearing problem in minutes. Currently only support whole hours."""
    self._check_type(x, int)
    self._check_int(value=x, lower_bound=1, upper_bound=None)
    self._check_hourly(x)
    self._clearing_market_minutes = x
set_clearing_storage_minutes(x: int) -> None

Set storage period length in clearing problem in minutes. Currently only support whole hours.

Source code in framjules/JulESTimeResolution.py
161
162
163
164
165
def set_clearing_storage_minutes(self, x: int) -> None:
    """Set storage period length in clearing problem in minutes. Currently only support whole hours."""
    self._check_type(x, int)
    self._check_int(value=x, lower_bound=1, upper_bound=None)
    self._clearing_storage_minutes = x
set_short_days(x: int) -> None

Set length of short term prognosis problem in days.

Source code in framjules/JulESTimeResolution.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def set_short_days(self, x: int) -> None:
    """Set length of short term prognosis problem in days."""
    self._check_type(x, int)
    self._check_int(value=x, lower_bound=1, upper_bound=None)

    self._short_days = x

    short_minutes = self._short_days * 24 * 60

    self._short_market_minutes = min(self._short_market_minutes, short_minutes)
    self._short_storage_minutes = min(self._short_storage_minutes, short_minutes)

    if short_minutes % self._short_market_minutes != 0:
        message = (
            f"short_market_minutes ({self._short_market_minutes}) does not go up"
            f" in short_days in minutes ({short_minutes})."
        )
        raise ValueError(message)

    if short_minutes % self._short_storage_minutes != 0:
        message = (
            f"short_storage_minutes ({self._short_storage_minutes}) does not go up"
            f" in short_days in minutes ({short_minutes})."
        )
        raise ValueError(message)

    self.set_target_lookahead_days(self._target_lookahead_days)
set_short_market_minutes(x: int) -> None

Set market period length in short prognosis problem in minutes. Currently only support whole hours.

Source code in framjules/JulESTimeResolution.py
167
168
169
170
171
def set_short_market_minutes(self, x: int) -> None:
    """Set market period length in short prognosis problem in minutes. Currently only support whole hours."""
    self._check_type(x, int)
    self._check_int(value=x, lower_bound=1, upper_bound=None)
    self._short_market_minutes = x
set_short_storage_minutes(x: int) -> None

Set storage period length in short prognosis problem in minutes. Currently only support whole hours.

Source code in framjules/JulESTimeResolution.py
173
174
175
176
177
def set_short_storage_minutes(self, x: int) -> None:
    """Set storage period length in short prognosis problem in minutes. Currently only support whole hours."""
    self._check_type(x, int)
    self._check_int(value=x, lower_bound=1, upper_bound=None)
    self._short_storage_minutes = x
set_target_ev_days(x: int) -> None

Set prefered value for length in days of end value problems.

Will choose a close valid value.

Source code in framjules/JulESTimeResolution.py
45
46
47
48
49
50
51
52
def set_target_ev_days(self, x: int) -> None:
    """Set prefered value for length in days of end value problems.

    Will choose a close valid value.
    """
    self._check_type(x, int)
    self._check_int(value=x, lower_bound=self._clearing_days + self._short_days, upper_bound=None)
    self._target_ev_days = x
set_target_long_storage_days(x: int) -> None

Set prefered value for long_storage_days.

Will choose a close valid value.

Source code in framjules/JulESTimeResolution.py
54
55
56
57
58
59
60
61
def set_target_long_storage_days(self, x: int) -> None:
    """Set prefered value for long_storage_days.

    Will choose a close valid value.
    """
    self._check_type(x, int)
    self._check_int(value=x, lower_bound=self._clearing_days + self._short_days, upper_bound=None)
    self._target_long_storage_days = x
set_target_lookahead_days(x: int) -> None

Set target length of prognosis problems in days.

Will set med_days and long_days and make sure their sum is minimum this length.

Will set short_days if target_lookahead_days < short_days.

Source code in framjules/JulESTimeResolution.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def set_target_lookahead_days(self, x: int) -> None:
    """Set target length of prognosis problems in days.

    Will set med_days and long_days and make sure their sum is minimum this length.

    Will set short_days if target_lookahead_days < short_days.
    """
    self._check_type(x, int)
    self._check_int(value=x, lower_bound=1, upper_bound=None)

    self._target_lookahead_days = x

    if x < self._short_days:
        self.set_short_days(x)

    med_days, long_days, med_storage_days, long_storage_days = self._get_med_long_days_and_storage_days(
        self._target_lookahead_days,
        self._clearing_days,
        self._short_days,
    )

    self._med_days = med_days
    self._long_days = long_days
    self._med_storage_days = med_storage_days
    self._long_storage_days = long_storage_days
set_target_med_days(x: int) -> None

Set prefered value for horizon length in days in medium prognosis problem.

Will choose a close valid value.

Source code in framjules/JulESTimeResolution.py
63
64
65
66
67
68
69
70
def set_target_med_days(self, x: int) -> None:
    """Set prefered value for horizon length in days in medium prognosis problem.

    Will choose a close valid value.
    """
    self._check_type(x, int)
    self._check_int(value=x, lower_bound=self._clearing_days + self._short_days, upper_bound=None)
    self._target_med_num_period = x

loaders

time_vector_loaders

DemandJulESH5TimeVectorLoader

Bases: JulESH5TimeVectorLoader

Workaround to get demand results and at the same time avoid name conflicts.

Source code in framjules/loaders/time_vector_loaders.py
248
249
250
251
252
253
254
255
256
class DemandJulESH5TimeVectorLoader(JulESH5TimeVectorLoader):
    """Workaround to get demand results and at the same time avoid name conflicts."""

    _SUPPORTED_SUFFIXES: ClassVar[list] = [".h5"]
    _SEARCH_FIELDS: ClassVar[list] = [
        ("demandnames", "priceindex", "demandvalues", True),
    ]
    _DEFAULT_INDEX = "priceindex"
    pass
JulESH5TimeVectorLoader

Bases: FileLoader, TimeVectorLoader

Loader for JulES H5 files containing time vectors.

Source code in framjules/loaders/time_vector_loaders.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
class JulESH5TimeVectorLoader(FileLoader, TimeVectorLoader):
    """Loader for JulES H5 files containing time vectors."""

    _SUPPORTED_SUFFIXES: ClassVar[list] = [".h5"]
    # find time vector id in one of the names, then use the corresponding names to get the data
    _SEARCH_FIELDS: ClassVar[list] = [
        ("areanames", "priceindex", "pricematrix", True),
        ("batnames", "batindex", "batmatrix", True),
        ("resnames", "resindex", "resmatrix_water", True),
        ("othernames_Vars_Power", "priceindex", "othervalues_Vars_Power", True),
        ("othernames_Vars_Hydro", "resindex", "othervalues_Vars_Hydro", True),
        (
            "storagenames",
            "stateindex",
            "storagevalues_main",
            True,
        ),  
    ]
    _DEFAULT_INDEX = "priceindex"

    def __init__(
        self,
        source: Path | str,
        units: dict[str, str],
        relative_loc: Path | str | None = None,
        is_whole_years: bool = False,
    ) -> None:
        """Initialize the NVEH5TimeVectorLoader."""
        super().__init__(source, relative_loc)
        self._data: dict[str, NDArray] = {}
        self._index: dict[str, TimeIndex] = {}
        self._units: dict[str, str] = units
        self._is_whole_years = is_whole_years

        self._id_fields_map: dict[str, list[set]] = {}

    def clear_cache(self) -> None:
        """Clear cached data."""
        self._data = {}
        self._index = {}
        self._id_fields_map = {}

    def get_reference_period(self, vector_id: str) -> None:
        """Return None."""
        return

    def is_max_level(self, vector_id: str) -> None:
        """Return None."""
        return

    def is_zero_one_profile(self, vector_id: str) -> None:
        """Return None."""
        return True

    def get_values(self, vector_id: str) -> NDArray:
        """Find the values for a given vector id.

        Args:
            vector_id: (str)

        Returns:
            NDArray: Values for the vector id.

        """

        self._id_exsists(vector_id)
        self._check_multiple_fields(vector_id)
        if vector_id not in self._data:
            id_field, values_field = self._get_id_values_field(vector_id)
            with h5py.File(self.get_source(), "r") as f:
                ids = np.array([item.decode() for item in f[id_field]])
                values_matrix = np.array(f[values_field])
                new_data = {id_: values_matrix[i, :] for i, id_ in enumerate(ids)}
                self._data.update(new_data)
        # index may not use all values
        index = self.get_index(vector_id)
        vector = self._data[vector_id]
        n = index.get_num_periods()
        return vector[:n]

    def get_index(self, vector_id: str) -> TimeIndex:
        """Find the time index for a given vector id.

        Args:
            vector_id: (str)

        Returns:
            TimeIndex

        """
        self._id_exsists(vector_id)  # calls get_ids which calls _get_ids which sets _id_fields_map
        self._check_multiple_fields(vector_id)  # therefore we can use this afterwards
        with h5py.File(self.get_source(), "r") as f:
            index_field = self._id_fields_map.get(vector_id)[0][1]
            fmt = "%Y-%m-%dT%H:%M:%S"
            if self._index is None or (index_field not in self._index):
                t0 = datetime.strptime(f[index_field][0].decode(), fmt)
                t1 = datetime.strptime(f[index_field][1].decode(), fmt)
                index = FixedFrequencyTimeIndex(
                    start_time=t0,
                    period_duration=t1 - t0,
                    num_periods=len(f[index_field]),
                    is_52_week_years=False,
                    extrapolate_first_point=False,
                    extrapolate_last_point=False,
                )
                if not index.is_whole_years() and self._is_whole_years:
                    # start time for each period
                    datetime_list = [datetime.strptime(x.decode(), fmt) for x in f[index_field]]
                    period_duration = datetime_list[1] - datetime_list[0]

                    # add end index since JulES index represents periods
                    datetime_list.append(datetime_list[-1] + period_duration)
                    num_periods = len(datetime_list)

                    # find last index before new iso year
                    last_in_year_ix = num_periods - 1
                    while last_in_year_ix >= 0:
                        last_in_year_ix -= 1
                        this_period = datetime_list[last_in_year_ix]
                        next_period = this_period + period_duration
                        this_year = this_period.isocalendar().year
                        next_year = next_period.isocalendar().year
                        if next_year > this_year:
                            break

                    last_in_year = datetime_list[last_in_year_ix]

                    first_next_year = last_in_year.fromisocalendar(last_in_year.isocalendar().year + 1, 1, 1)

                    if last_in_year + period_duration == first_next_year:
                        index = FixedFrequencyTimeIndex(
                            start_time=datetime_list[0],
                            period_duration=period_duration,
                            num_periods=last_in_year_ix + 1,
                            is_52_week_years=False,
                            extrapolate_first_point=False,
                            extrapolate_last_point=False,
                        )
                    elif last_in_year + period_duration > first_next_year:
                        # TODO: test (only reachable without profiles as ProfileTimeIndex enforces whole years)
                        datetime_list[last_in_year_ix + 1] = first_next_year
                        del datetime_list[last_in_year_ix + 2 :]  # (slice delete does not error when out-of-bounds)
                        index = ListTimeIndex(
                            datetime_list=datetime_list,
                            is_52_week_years=False,
                            extrapolate_first_point=False,
                            extrapolate_last_point=False,
                        )
                    else:
                        n = last_in_year_ix
                        message = (
                            f"Unexpected last_in_year + period_duration < first_next_year.\n"
                            f"vector_id = {vector_id}\n"
                            f"last_in_year = {last_in_year}\n"
                            f"period_duration = {period_duration}\n"
                            f"first_next_year = {first_next_year}\n"
                            f"datetime_list around last_in_year_ix = {datetime_list[n - 10 : n + 10]}"
                        )
                        raise RuntimeError(message)

                self._index[index_field] = index
        return self._index[index_field]

    def get_unit(self, vector_id: str) -> str:
        """Get the unit of the time vector."""
        return self._units[vector_id]

    def get_metadata(self) -> str:
        """Get metadata from the file."""
        return ""

    def _get_id_values_field(self, vector_id: str) -> str:
        search_fields = self._id_fields_map.get(vector_id)
        return search_fields[0][0], search_fields[0][2]

    def _get_ids(self) -> list[str]:
        if not self._id_fields_map:
            self._create_id_fields_map()
        return list(self._id_fields_map.keys())

    def _create_id_fields_map(self) -> dict[str, list[str]]:
        if not self._id_fields_map:
            self._id_fields_map: dict[str, list[str]] = dict()
            with h5py.File(self.get_source(), "r") as f:
                for search_name in self._SEARCH_FIELDS:
                    if search_name[0] in f:
                        new_ids = [item.decode() for item in f[search_name[0]]]
                        for vector_id in new_ids:
                            if vector_id not in self._id_fields_map:
                                self._id_fields_map[vector_id] = [search_name]
                            else:
                                self._id_fields_map[vector_id].append(search_name)

    def _check_multiple_fields(self, vector_id: str) -> None:
        self._create_id_fields_map()
        # check if the vector id is found in multiple fields.
        if len(self._id_fields_map[vector_id]) > 1:
            msg = (
                f"Vector ID {vector_id} found in multiple fields: {self._id_fields_map[vector_id]}. "
                "Could not determine which field to use."
            )
            raise NotImplementedError(msg)

    def get_fingerprint(self) -> Fingerprint:
        """Get the fingerprint of the NVEH5TimeVectorLoader."""
        return None

    def __eq__(self, other: object) -> bool:
        """Check if self and other are equal."""
        if not isinstance(other, type(self)):
            return False
        return self.get_source() == other.get_source() and self._SEARCH_FIELDS == other._SEARCH_FIELDS

    def __hash__(self) -> int:
        """Return hash of NVEH5TimeVectorLoader object."""
        return hash(
            (
                self.get_source(),
                frozenset(self._SEARCH_FIELDS),
            ),
        )
__eq__(other: object) -> bool

Check if self and other are equal.

Source code in framjules/loaders/time_vector_loaders.py
221
222
223
224
225
def __eq__(self, other: object) -> bool:
    """Check if self and other are equal."""
    if not isinstance(other, type(self)):
        return False
    return self.get_source() == other.get_source() and self._SEARCH_FIELDS == other._SEARCH_FIELDS
__hash__() -> int

Return hash of NVEH5TimeVectorLoader object.

Source code in framjules/loaders/time_vector_loaders.py
227
228
229
230
231
232
233
234
def __hash__(self) -> int:
    """Return hash of NVEH5TimeVectorLoader object."""
    return hash(
        (
            self.get_source(),
            frozenset(self._SEARCH_FIELDS),
        ),
    )
__init__(source: Path | str, units: dict[str, str], relative_loc: Path | str | None = None, is_whole_years: bool = False) -> None

Initialize the NVEH5TimeVectorLoader.

Source code in framjules/loaders/time_vector_loaders.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def __init__(
    self,
    source: Path | str,
    units: dict[str, str],
    relative_loc: Path | str | None = None,
    is_whole_years: bool = False,
) -> None:
    """Initialize the NVEH5TimeVectorLoader."""
    super().__init__(source, relative_loc)
    self._data: dict[str, NDArray] = {}
    self._index: dict[str, TimeIndex] = {}
    self._units: dict[str, str] = units
    self._is_whole_years = is_whole_years

    self._id_fields_map: dict[str, list[set]] = {}
clear_cache() -> None

Clear cached data.

Source code in framjules/loaders/time_vector_loaders.py
49
50
51
52
53
def clear_cache(self) -> None:
    """Clear cached data."""
    self._data = {}
    self._index = {}
    self._id_fields_map = {}
get_fingerprint() -> Fingerprint

Get the fingerprint of the NVEH5TimeVectorLoader.

Source code in framjules/loaders/time_vector_loaders.py
217
218
219
def get_fingerprint(self) -> Fingerprint:
    """Get the fingerprint of the NVEH5TimeVectorLoader."""
    return None
get_index(vector_id: str) -> TimeIndex

Find the time index for a given vector id.

Parameters:

Name Type Description Default
vector_id str

(str)

required

Returns:

Type Description
TimeIndex

TimeIndex

Source code in framjules/loaders/time_vector_loaders.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def get_index(self, vector_id: str) -> TimeIndex:
    """Find the time index for a given vector id.

    Args:
        vector_id: (str)

    Returns:
        TimeIndex

    """
    self._id_exsists(vector_id)  # calls get_ids which calls _get_ids which sets _id_fields_map
    self._check_multiple_fields(vector_id)  # therefore we can use this afterwards
    with h5py.File(self.get_source(), "r") as f:
        index_field = self._id_fields_map.get(vector_id)[0][1]
        fmt = "%Y-%m-%dT%H:%M:%S"
        if self._index is None or (index_field not in self._index):
            t0 = datetime.strptime(f[index_field][0].decode(), fmt)
            t1 = datetime.strptime(f[index_field][1].decode(), fmt)
            index = FixedFrequencyTimeIndex(
                start_time=t0,
                period_duration=t1 - t0,
                num_periods=len(f[index_field]),
                is_52_week_years=False,
                extrapolate_first_point=False,
                extrapolate_last_point=False,
            )
            if not index.is_whole_years() and self._is_whole_years:
                # start time for each period
                datetime_list = [datetime.strptime(x.decode(), fmt) for x in f[index_field]]
                period_duration = datetime_list[1] - datetime_list[0]

                # add end index since JulES index represents periods
                datetime_list.append(datetime_list[-1] + period_duration)
                num_periods = len(datetime_list)

                # find last index before new iso year
                last_in_year_ix = num_periods - 1
                while last_in_year_ix >= 0:
                    last_in_year_ix -= 1
                    this_period = datetime_list[last_in_year_ix]
                    next_period = this_period + period_duration
                    this_year = this_period.isocalendar().year
                    next_year = next_period.isocalendar().year
                    if next_year > this_year:
                        break

                last_in_year = datetime_list[last_in_year_ix]

                first_next_year = last_in_year.fromisocalendar(last_in_year.isocalendar().year + 1, 1, 1)

                if last_in_year + period_duration == first_next_year:
                    index = FixedFrequencyTimeIndex(
                        start_time=datetime_list[0],
                        period_duration=period_duration,
                        num_periods=last_in_year_ix + 1,
                        is_52_week_years=False,
                        extrapolate_first_point=False,
                        extrapolate_last_point=False,
                    )
                elif last_in_year + period_duration > first_next_year:
                    # TODO: test (only reachable without profiles as ProfileTimeIndex enforces whole years)
                    datetime_list[last_in_year_ix + 1] = first_next_year
                    del datetime_list[last_in_year_ix + 2 :]  # (slice delete does not error when out-of-bounds)
                    index = ListTimeIndex(
                        datetime_list=datetime_list,
                        is_52_week_years=False,
                        extrapolate_first_point=False,
                        extrapolate_last_point=False,
                    )
                else:
                    n = last_in_year_ix
                    message = (
                        f"Unexpected last_in_year + period_duration < first_next_year.\n"
                        f"vector_id = {vector_id}\n"
                        f"last_in_year = {last_in_year}\n"
                        f"period_duration = {period_duration}\n"
                        f"first_next_year = {first_next_year}\n"
                        f"datetime_list around last_in_year_ix = {datetime_list[n - 10 : n + 10]}"
                    )
                    raise RuntimeError(message)

            self._index[index_field] = index
    return self._index[index_field]
get_metadata() -> str

Get metadata from the file.

Source code in framjules/loaders/time_vector_loaders.py
181
182
183
def get_metadata(self) -> str:
    """Get metadata from the file."""
    return ""
get_reference_period(vector_id: str) -> None

Return None.

Source code in framjules/loaders/time_vector_loaders.py
55
56
57
def get_reference_period(self, vector_id: str) -> None:
    """Return None."""
    return
get_unit(vector_id: str) -> str

Get the unit of the time vector.

Source code in framjules/loaders/time_vector_loaders.py
177
178
179
def get_unit(self, vector_id: str) -> str:
    """Get the unit of the time vector."""
    return self._units[vector_id]
get_values(vector_id: str) -> NDArray

Find the values for a given vector id.

Parameters:

Name Type Description Default
vector_id str

(str)

required

Returns:

Name Type Description
NDArray NDArray

Values for the vector id.

Source code in framjules/loaders/time_vector_loaders.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def get_values(self, vector_id: str) -> NDArray:
    """Find the values for a given vector id.

    Args:
        vector_id: (str)

    Returns:
        NDArray: Values for the vector id.

    """

    self._id_exsists(vector_id)
    self._check_multiple_fields(vector_id)
    if vector_id not in self._data:
        id_field, values_field = self._get_id_values_field(vector_id)
        with h5py.File(self.get_source(), "r") as f:
            ids = np.array([item.decode() for item in f[id_field]])
            values_matrix = np.array(f[values_field])
            new_data = {id_: values_matrix[i, :] for i, id_ in enumerate(ids)}
            self._data.update(new_data)
    # index may not use all values
    index = self.get_index(vector_id)
    vector = self._data[vector_id]
    n = index.get_num_periods()
    return vector[:n]
is_max_level(vector_id: str) -> None

Return None.

Source code in framjules/loaders/time_vector_loaders.py
59
60
61
def is_max_level(self, vector_id: str) -> None:
    """Return None."""
    return
is_zero_one_profile(vector_id: str) -> None

Return None.

Source code in framjules/loaders/time_vector_loaders.py
63
64
65
def is_zero_one_profile(self, vector_id: str) -> None:
    """Return None."""
    return True
SupplyJulESH5TimeVectorLoader

Bases: JulESH5TimeVectorLoader

Workaround to get supply results and at the same time avoid name conflicts.

Source code in framjules/loaders/time_vector_loaders.py
237
238
239
240
241
242
243
244
245
class SupplyJulESH5TimeVectorLoader(JulESH5TimeVectorLoader):
    """Workaround to get supply results and at the same time avoid name conflicts."""

    _SUPPORTED_SUFFIXES: ClassVar[list] = [".h5"]
    _SEARCH_FIELDS: ClassVar[list] = [
        ("supplynames", "priceindex", "supplyvalues", True),
    ]
    _DEFAULT_INDEX = "priceindex"
    pass

solve_handler

JulESAggregator

JulESAggregator

Bases: Base

Source code in framjules/solve_handler/JulESAggregator.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
class JulESAggregator(Base):
    def __init__(
        self,
        clearing: Model,
        short: list[Aggregator],
        medium: list[Aggregator],
        long: list[Aggregator],
    ) -> None:
        """Class for defining and calculation aggregated Model instances based on a clearing Model.

        Creates three aggregated Model instances for short-, medium- and long-term simulation in JulES.

        Note:
            - Short term Model is aggregated from a Clearing Model.
            - Medium term Model is aggregated from the Short term Model.
            - Long term Model is aggregated from the medium term Model

        Args:
            clearing (Model): The clearing Model to aggregate from.
            short (list[Aggregator]): List of aggregations to create the short term Model from clearing.
            medium (list[Aggregator]): List of aggregations to create the medium term Model from short
            long (list[Aggregator]): List of aggregations to create the long term Model from medium.

        """
        self._clearing = clearing
        self._short = short
        self._medium = medium
        self._long = long

        self._short_model: Model | None = None
        self._medium_model: Model | None = None
        self._long_model: Model | None = None

    def get_short_term_model(self) -> Model:
        """Apply defined aggregations for short term Model."""
        if self._short_model is None:
            self._short_model = self._aggregate(self._clearing, self._short)
        return self._short_model

    def get_medium_term_model(self) -> Model:
        """Apply defined aggregations for medium term Model."""
        if self._medium_model is None:
            self._medium_model = self._aggregate(self.get_short_term_model(), self._medium)
        return self._medium_model

    def get_long_term_model(self) -> Model:
        """Apply defined aggregations for long term Model."""
        if self._long_model is None:
            self._long_model = self._aggregate(self.get_medium_term_model(), self._long)
        return self._long_model

    def get_short_term_aggregation_map(self) -> dict[str, set[str] | None]:
        """Get the aggregation map of Components from clearing to short term Model."""
        return self._create_aggregation_map(self._clearing, self._short)

    def get_medium_term_aggregation_map(self) -> dict[str, set[str] | None]:
        """Get the aggregation map of Components from clearing to medium term Model."""
        return self._create_aggregation_map(self._clearing, self._short + self._medium)

    def get_long_term_aggregation_map(self) -> dict[str, set[str] | None]:
        """Get the aggregation map of Components from clearing to long term Model."""
        return self._create_aggregation_map(self._clearing, self._short + self._medium + self._long)

    def get_short_term_graph_map(
        self,
        graph_clearing: dict[str, Component],
        graph_short: dict[str, Component],
    ) -> dict[str, set[str] | None]:
        """Get aggregation map for version of short term Model with graph of Flows and Nodes."""
        return self._get_graph_aggregation_map(
            original_agg_map=self.get_short_term_aggregation_map(),
            clearing=self._clearing,
            graph_clearing=graph_clearing,
            aggregated=self.get_short_term_model(),
            graph_aggregated=graph_short,
        )

    def get_medium_term_graph_map(
        self,
        graph_clearing: dict[str, Component],
        graph_medium: dict[str, Component],
    ) -> dict[str, set[str] | None]:
        """Get aggregation map for version of medium term Model with graph of Flows and Nodes."""
        return self._get_graph_aggregation_map(
            original_agg_map=self.get_medium_term_aggregation_map(),
            clearing=self._clearing,
            graph_clearing=graph_clearing,
            aggregated=self.get_medium_term_model(),
            graph_aggregated=graph_medium,
        )

    def get_long_term_graph_map(
        self,
        graph_clearing: dict[str, Component],
        graph_long: dict[str, Component],
    ) -> dict[str, set[str] | None]:
        """Get aggregation map for version of long term Model with graph of Flows and Nodes."""
        return self._get_graph_aggregation_map(
            original_agg_map=self.get_long_term_aggregation_map(),
            clearing=self._clearing,
            graph_clearing=graph_clearing,
            aggregated=self.get_long_term_model(),
            graph_aggregated=graph_long,
        )

    def assert_equal_storages(
        self,
        simpler_short: dict[str, Component],
        simpler_medium: dict[str, Component],
        simpler_long: dict[str, Component],
    ) -> None:
        """Check that all Nodes with Storages are preserved between short, medium and long term Models.

        Args:
            simpler_short (dict[str, Component]): Short term Model Components.
            simpler_medium (dict[str, Component]): Medium term Model Components.
            simpler_long (dict[str, Component]): Long term Model Components.

        Raises:
            ValueError: If the Models have differing Storages.

        """
        short_storages = self._get_storages(simpler_short)
        medium_storages = self._get_storages(simpler_medium)
        long_storages = self._get_storages(simpler_long)

        if short_storages != medium_storages != long_storages:
            message = "Storages are not equal between short, medium and long term Models."
            unique_short = short_storages - (medium_storages | long_storages)
            unique_medium = medium_storages - (short_storages | long_storages)
            unique_long = long_storages - (short_storages | medium_storages)
            if unique_short:
                message += f"\n - Unique Nodes with Storages in Short Model: {unique_short}"
            if unique_medium:
                message += f"\n - Unique Nodes with Storages in Medium Model: {unique_medium}"
            if unique_long:
                message += f"\n - Unique Nodes with Storages in Long Model: {unique_long}"
            raise ValueError(message)

    def _aggregate(self, model: Model, aggs: list[Aggregator]) -> Model:
        if aggs:
            # works because aggregators should not modify the original components
            # except if disaggregate is called, but we shall only use aggregate
            agg_model = Model()
            agg_model.get_data().update(model.get_data())
        else:
            agg_model = model
        for agg in aggs:
            agg.aggregate(agg_model)
        return agg_model

    def _create_aggregation_map(self, clearing: Model, aggs: list[Aggregator]) -> dict[str, list[str]]:
        """Merge aggregation maps of a list of aggregators from clearing to final."""
        clearing_ids = [name for name, ob in clearing.get_data().items() if isinstance(ob, Component)]
        full_agg_mapping = {k: {v} for k, v in zip(clearing_ids, clearing_ids, strict=True)}

        for agg in aggs:
            agg_map = agg.get_aggregation_map()
            for detailed_id, aggregated_ids in full_agg_mapping.items():
                if not aggregated_ids:  # Component has been deleted by an aggregation.
                    continue

                new_agg_ids = set()
                for agg_id in aggregated_ids:
                    if agg_id not in agg_map:
                        new_agg_ids.add(agg_id)  # left as is
                        continue
                    if not agg_map[agg_id]:
                        # deleted. if all agg_ids are marked deleted, so is the detailed one.
                        continue
                    new_agg_ids |= agg_map[agg_id]

                full_agg_mapping[detailed_id] = new_agg_ids  # empty set signifies deleted component

        return full_agg_mapping

    def _get_graph_aggregation_map(
        self,
        original_agg_map: dict[str, set[str]],
        clearing: Model | dict[str, Component],
        graph_clearing: dict[str, Component],
        aggregated: Model | dict[str, Component],
        graph_aggregated: dict[str, Component],
    ) -> dict[str, set[str]]:
        """Create aggregation map with simpler Component IDs based on an original mapping from clearing to aggregated.

        Use get_top_parent of components to find IDs in original_agg_map then change to the Flow/Node ID.

        Args:
            original_agg_map (dict[str, set[str]]): Mapping between Components of clearing and aggregated Models.
            clearing (dict[str, Component]): Clearing Model with top parents.
            graph_clearing (dict[str, Component]): Clearing Model version with Flows and Nodes. Derived from
                                                     clearing.
            aggregated (dict[str, Component]): Aggregated Model with top parents. Aggregated from clearing.
            graph_aggregated (dict[str, Component]): Aggregated Model version with Flows and Nodes. Derived from
                                                       aggregated.

        Returns:
            dict[str, set[str]]: Mapping between components of simpler clearing and simpler aggregated Models.

        """
        if isinstance(clearing, Model):
            clearing = {k: v for k, v in clearing.get_data().items() if isinstance(v, Component)}
        if isinstance(aggregated, Model):
            aggregated = {k: v for k, v in aggregated.get_data().items() if isinstance(v, Component)}

        self._check_agg_map_compatibility(clearing, aggregated, original_agg_map)

        graph_clearing_map = self._get_top_parent_to_simple(original=clearing, simpler=graph_clearing)
        graph_aggregated_map = self._get_top_parent_to_simple(original=aggregated, simpler=graph_aggregated)
        simple_agg_map = {}

        for clearing_id, agg_ids in original_agg_map.items():
            # the two if statements are there for if we want to map only a subset of the simpler Components.
            if clearing_id in graph_clearing_map:
                if not agg_ids:
                    continue  # choose not to add deleted components. May change this later.
                simple_agg_ids = set()
                for agg_id in agg_ids:
                    if agg_id in graph_aggregated_map:  # Again to allow subset to be mapped
                        simple_agg_ids |= graph_aggregated_map[agg_id]  # add set if simple component ids
                if simple_agg_ids:
                    for graph_clearing_id in graph_clearing_map[clearing_id]:
                        simple_agg_map[graph_clearing_id] = simple_agg_ids

        self._check_agg_map_validity(graph_clearing, graph_aggregated, simple_agg_map)
        return simple_agg_map

    def _check_agg_map_compatibility(
        self,
        clearing: Model | dict[str, Component],
        aggregated: Model | dict[str, Component],
        original_agg_map: dict[str, set[str]],
    ) -> None:
        if set(clearing.keys()) != set(original_agg_map.keys()):
            missing_in_clearing = set(original_agg_map.keys()).difference(clearing.keys())
            extra_in_clearing = set(clearing.keys()).difference(original_agg_map.keys())
            message = (
                "clearing is incompatible with the aggregation mapping between clearing and aggregated Models.\n"
                f"Missing in clearing: {missing_in_clearing}\n"
                f"Extra in clearing: {extra_in_clearing}"
            )
            raise KeyError(message)

        original_agg_map_values = set().union(*(v for v in original_agg_map.values() if v))
        if set(aggregated.keys()) != original_agg_map_values:
            missing_in_aggregated = original_agg_map_values.difference(aggregated.keys())
            extra_in_aggregated = set(aggregated.keys()).difference(original_agg_map_values)
            message = (
                "aggregated is incompatible with the aggregation mapping between clearing and aggregated Models.\n"
                f"Missing in aggregated: {missing_in_aggregated}\n"
                f"Extra in aggregated: {extra_in_aggregated}"
            )
            raise KeyError(message)

    def _check_agg_map_validity(
        self,
        original_components: dict[str, Component],
        aggregated_components: dict[str, Component],
        agg_map: dict[str, set[str] | None],
    ) -> None:
        """Check Flow and Node rules for all mappings in an aggregation map."""
        errors = set()
        for original_id, aggregated_ids in agg_map.items():
            component = original_components[original_id]
            if isinstance(component, Node):
                self._check_node_rules(original_id, component, aggregated_ids, aggregated_components, errors)

            if isinstance(component, Flow) and component.get_startupcost() is not None:
                self._check_flow_rules(original_id, aggregated_ids, aggregated_components, errors)

            if not isinstance(component, (Flow, Node)):
                message = (
                    f"Invalig Model of simpler Components. Must consist of only Flows and Nodes. Found: {component}"
                )
                raise ValueError(message)

        self._report_errors(errors)

    def _check_node_rules(
        self,
        original_id: str,
        node: Node,
        aggregated_ids: set[str] | None,
        aggregated_components: dict[str, Component],
        errors: set[str],
    ) -> None:
        """Check rules for Nodes for a Component ID in an aggregation map.

        A Node on the disaggregated side (keys) must map to exactly one other Node. More keys are alowed to map to the
        same aggregated Node.

        """
        if node.get_storage() is None:
            # Check rules here?
            return

        if aggregated_ids is None:
            e = f"Node with Storage {original_id} was deleted during aggregations. This is not supported in JulES."
            errors.add(e)
            return
        aggregated_storages = set()
        for agg_id in aggregated_ids:
            agg_component = aggregated_components[agg_id]
            if isinstance(agg_component, Node) and agg_component.get_storage() is not None:
                aggregated_storages.add(agg_id)
        if len(aggregated_storages) != 1:
            errors.add(
                f"Node with Storage {original_id} must be connected to exactly one Node with Storage in the "
                f"aggregation map in JulES. Currently connected to: {aggregated_storages}.",
            )

    def _check_flow_rules(
        self,
        original_id: str,
        aggregated_ids: set[str] | None,
        aggregated_components: dict[str, Component],
        errors: set[str],
    ) -> None:
        """Check rules for Flows for a Component ID in an aggregation map.

        A Flow on the disaggregated side (keys) must map to exactly one other Flow. More keys are alowed to map to the
        same aggregated Flow.

        """
        if aggregated_ids is None:
            e = f"Flow with StartUpCost {original_id} was deleted during aggregations. This is not supported in JulES."
            errors.add(e)
            return
        aggregated_flows = set()
        for agg_id in aggregated_ids:
            agg_component = aggregated_components[agg_id]
            if isinstance(agg_component, Flow) and agg_component.get_startupcost() is not None:
                aggregated_flows.add(agg_id)
        if len(aggregated_flows) != 1:
            errors.add(
                f"Flow with StartUpCost {original_id} must be connected to exactly one Flow with StartUpCost in the "
                f"aggregation map in JulES. Currently connected to: {aggregated_flows}.",
            )

    @staticmethod
    def _get_storages(simpler: dict[str, Component]) -> set[str]:
        nodes_with_storages = set()
        for n, c in simpler.items():
            if isinstance(c, Node) and c.get_storage() is not None:
                nodes_with_storages.add(n)
        return nodes_with_storages

    @staticmethod
    def _get_top_parent_to_simple(original: dict[str, Component], simpler: dict[str, Component]) -> dict[str, set[str]]:
        """Map simpler components to their top parent."""
        inv_original = {c: n for n, c in original.items()}
        simpler_map: dict[str, set[str]] = {}

        for simple_id, component in simpler.items():
            top_parent = component.get_top_parent()
            if top_parent is None:
                message = (
                    f"Component {component} with ID {simple_id} has no parents. This means it has not been "
                    "derived from original."
                )
                raise ValueError(message)
            try:
                top_parent_id = inv_original[top_parent]
            except KeyError as e:
                message = (
                    f"Component {top_parent} does not exist in original Model. This means simpler has not been "
                    "derived from original."
                )
                raise KeyError(message) from e
            if top_parent_id in simpler_map:
                # list has been set, wo we add the simple component id to the
                simpler_map[top_parent_id].add(simple_id)
            else:
                simpler_map[top_parent_id] = {simple_id}

        return simpler_map
__init__(clearing: Model, short: list[Aggregator], medium: list[Aggregator], long: list[Aggregator]) -> None

Class for defining and calculation aggregated Model instances based on a clearing Model.

Creates three aggregated Model instances for short-, medium- and long-term simulation in JulES.

Note
  • Short term Model is aggregated from a Clearing Model.
  • Medium term Model is aggregated from the Short term Model.
  • Long term Model is aggregated from the medium term Model

Parameters:

Name Type Description Default
clearing Model

The clearing Model to aggregate from.

required
short list[Aggregator]

List of aggregations to create the short term Model from clearing.

required
medium list[Aggregator]

List of aggregations to create the medium term Model from short

required
long list[Aggregator]

List of aggregations to create the long term Model from medium.

required
Source code in framjules/solve_handler/JulESAggregator.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def __init__(
    self,
    clearing: Model,
    short: list[Aggregator],
    medium: list[Aggregator],
    long: list[Aggregator],
) -> None:
    """Class for defining and calculation aggregated Model instances based on a clearing Model.

    Creates three aggregated Model instances for short-, medium- and long-term simulation in JulES.

    Note:
        - Short term Model is aggregated from a Clearing Model.
        - Medium term Model is aggregated from the Short term Model.
        - Long term Model is aggregated from the medium term Model

    Args:
        clearing (Model): The clearing Model to aggregate from.
        short (list[Aggregator]): List of aggregations to create the short term Model from clearing.
        medium (list[Aggregator]): List of aggregations to create the medium term Model from short
        long (list[Aggregator]): List of aggregations to create the long term Model from medium.

    """
    self._clearing = clearing
    self._short = short
    self._medium = medium
    self._long = long

    self._short_model: Model | None = None
    self._medium_model: Model | None = None
    self._long_model: Model | None = None
assert_equal_storages(simpler_short: dict[str, Component], simpler_medium: dict[str, Component], simpler_long: dict[str, Component]) -> None

Check that all Nodes with Storages are preserved between short, medium and long term Models.

Parameters:

Name Type Description Default
simpler_short dict[str, Component]

Short term Model Components.

required
simpler_medium dict[str, Component]

Medium term Model Components.

required
simpler_long dict[str, Component]

Long term Model Components.

required

Raises:

Type Description
ValueError

If the Models have differing Storages.

Source code in framjules/solve_handler/JulESAggregator.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def assert_equal_storages(
    self,
    simpler_short: dict[str, Component],
    simpler_medium: dict[str, Component],
    simpler_long: dict[str, Component],
) -> None:
    """Check that all Nodes with Storages are preserved between short, medium and long term Models.

    Args:
        simpler_short (dict[str, Component]): Short term Model Components.
        simpler_medium (dict[str, Component]): Medium term Model Components.
        simpler_long (dict[str, Component]): Long term Model Components.

    Raises:
        ValueError: If the Models have differing Storages.

    """
    short_storages = self._get_storages(simpler_short)
    medium_storages = self._get_storages(simpler_medium)
    long_storages = self._get_storages(simpler_long)

    if short_storages != medium_storages != long_storages:
        message = "Storages are not equal between short, medium and long term Models."
        unique_short = short_storages - (medium_storages | long_storages)
        unique_medium = medium_storages - (short_storages | long_storages)
        unique_long = long_storages - (short_storages | medium_storages)
        if unique_short:
            message += f"\n - Unique Nodes with Storages in Short Model: {unique_short}"
        if unique_medium:
            message += f"\n - Unique Nodes with Storages in Medium Model: {unique_medium}"
        if unique_long:
            message += f"\n - Unique Nodes with Storages in Long Model: {unique_long}"
        raise ValueError(message)
get_long_term_aggregation_map() -> dict[str, set[str] | None]

Get the aggregation map of Components from clearing to long term Model.

Source code in framjules/solve_handler/JulESAggregator.py
71
72
73
def get_long_term_aggregation_map(self) -> dict[str, set[str] | None]:
    """Get the aggregation map of Components from clearing to long term Model."""
    return self._create_aggregation_map(self._clearing, self._short + self._medium + self._long)
get_long_term_graph_map(graph_clearing: dict[str, Component], graph_long: dict[str, Component]) -> dict[str, set[str] | None]

Get aggregation map for version of long term Model with graph of Flows and Nodes.

Source code in framjules/solve_handler/JulESAggregator.py
103
104
105
106
107
108
109
110
111
112
113
114
115
def get_long_term_graph_map(
    self,
    graph_clearing: dict[str, Component],
    graph_long: dict[str, Component],
) -> dict[str, set[str] | None]:
    """Get aggregation map for version of long term Model with graph of Flows and Nodes."""
    return self._get_graph_aggregation_map(
        original_agg_map=self.get_long_term_aggregation_map(),
        clearing=self._clearing,
        graph_clearing=graph_clearing,
        aggregated=self.get_long_term_model(),
        graph_aggregated=graph_long,
    )
get_long_term_model() -> Model

Apply defined aggregations for long term Model.

Source code in framjules/solve_handler/JulESAggregator.py
57
58
59
60
61
def get_long_term_model(self) -> Model:
    """Apply defined aggregations for long term Model."""
    if self._long_model is None:
        self._long_model = self._aggregate(self.get_medium_term_model(), self._long)
    return self._long_model
get_medium_term_aggregation_map() -> dict[str, set[str] | None]

Get the aggregation map of Components from clearing to medium term Model.

Source code in framjules/solve_handler/JulESAggregator.py
67
68
69
def get_medium_term_aggregation_map(self) -> dict[str, set[str] | None]:
    """Get the aggregation map of Components from clearing to medium term Model."""
    return self._create_aggregation_map(self._clearing, self._short + self._medium)
get_medium_term_graph_map(graph_clearing: dict[str, Component], graph_medium: dict[str, Component]) -> dict[str, set[str] | None]

Get aggregation map for version of medium term Model with graph of Flows and Nodes.

Source code in framjules/solve_handler/JulESAggregator.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def get_medium_term_graph_map(
    self,
    graph_clearing: dict[str, Component],
    graph_medium: dict[str, Component],
) -> dict[str, set[str] | None]:
    """Get aggregation map for version of medium term Model with graph of Flows and Nodes."""
    return self._get_graph_aggregation_map(
        original_agg_map=self.get_medium_term_aggregation_map(),
        clearing=self._clearing,
        graph_clearing=graph_clearing,
        aggregated=self.get_medium_term_model(),
        graph_aggregated=graph_medium,
    )
get_medium_term_model() -> Model

Apply defined aggregations for medium term Model.

Source code in framjules/solve_handler/JulESAggregator.py
51
52
53
54
55
def get_medium_term_model(self) -> Model:
    """Apply defined aggregations for medium term Model."""
    if self._medium_model is None:
        self._medium_model = self._aggregate(self.get_short_term_model(), self._medium)
    return self._medium_model
get_short_term_aggregation_map() -> dict[str, set[str] | None]

Get the aggregation map of Components from clearing to short term Model.

Source code in framjules/solve_handler/JulESAggregator.py
63
64
65
def get_short_term_aggregation_map(self) -> dict[str, set[str] | None]:
    """Get the aggregation map of Components from clearing to short term Model."""
    return self._create_aggregation_map(self._clearing, self._short)
get_short_term_graph_map(graph_clearing: dict[str, Component], graph_short: dict[str, Component]) -> dict[str, set[str] | None]

Get aggregation map for version of short term Model with graph of Flows and Nodes.

Source code in framjules/solve_handler/JulESAggregator.py
75
76
77
78
79
80
81
82
83
84
85
86
87
def get_short_term_graph_map(
    self,
    graph_clearing: dict[str, Component],
    graph_short: dict[str, Component],
) -> dict[str, set[str] | None]:
    """Get aggregation map for version of short term Model with graph of Flows and Nodes."""
    return self._get_graph_aggregation_map(
        original_agg_map=self.get_short_term_aggregation_map(),
        clearing=self._clearing,
        graph_clearing=graph_clearing,
        aggregated=self.get_short_term_model(),
        graph_aggregated=graph_short,
    )
get_short_term_model() -> Model

Apply defined aggregations for short term Model.

Source code in framjules/solve_handler/JulESAggregator.py
45
46
47
48
49
def get_short_term_model(self) -> Model:
    """Apply defined aggregations for short term Model."""
    if self._short_model is None:
        self._short_model = self._aggregate(self._clearing, self._short)
    return self._short_model

JulESNames

JulESNames

Constants (both static and dynamic ones defined in init) used in JulES.

Source code in framjules/solve_handler/JulESNames.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
class JulESNames:
    """Constants (both static and dynamic ones defined in __init__) used in JulES."""

    JSON_INDENT = 4
    YAML_INDENT = 4

    OTHER_TERMS = "otherterms"
    VARS = "Vars"

    AGGREGATED = "aggregated"
    MACRO = "macro"
    BALANCE_RHSDATA = "balance"

    FILENAME_CONFIG = "config.yaml"
    FILENAME_H5_OUTPUT = "output.h5"
    FILENAME_STORAGE_MAPPING = "storage_mapping.json"
    FILENAME_START_STORAGES_AGGREGATED = "start_storages_aggregated.json"
    FILENAME_START_STORAGES_CLEARING = "start_storages_clearing.json"
    ROOT_FILENAME_DATAELEMENTS = "data_elements"
    FILENAME_DATAELEMENTS_TIMEVECTORS = "data_elements_timevectors.json"

    PY_JULES_SETTINGS_NAME = "python_jules_settings"
    PY_JULES_OUTPUT_NAME = "python_jules_output"

    MAIN = "main"
    MISSING_CONFIG = "missing_config"

    RESULTS = "results"
    MAINRESULTS = "mainresults"
    TIMES = "times"
    SCENARIOS = "scenarios"
    MEMORY = "memory"
    STORAGEVALUES = "storagevalues"
    STORAGEVALUES_ALL_PROBLEMS = "storagevalues_all_problems"
    ALL = "all"

    TERM_DURATION_WEEKS = "termduration_weeks"
    TERM_DURATION_DAYS = "termduration_days"
    TERM_DURATION_HOURS = "termduration_hours"
    SEQUENTIAL_HORIZON = "SequentialHorizon"
    ADAPTIVE_HORIZON = "AdaptiveHorizon"

    COMMODITIES = "commodities"
    POWER = "Power"
    HYDRO = "Hydro"
    BATTERY = "Battery"

    SHRINKAFTER_DAYS = "startafter_days"
    SHRINKATLEAST_DAYS = "shrinkatleast_days"

    TWO_STORAGE_DURATION = "twostorageduration"
    SHORT_STOCH_DURATION_HOURS = "shorttermstoragecutoff_hours"
    LONG_STOCH_DURATION_DAYS = "longstochduration_days"
    LONG_EV_DURATION_DAYS = "longevduration_days"

    DISTRIBUTION_METHOD_MP = "distribution_method_mp"
    DISTRIBUTION_METHOD_SP = "distribution_method_sp"
    BYSIZE = "bysize"
    ADVANCED = "advanced"
    STORAGE = "storage"
    GREEDY = "greedy"
    WITHMP = "withmp"

    STATEDEPENDENT_PROD = "statedependentprod"
    STATEDEPENDENT_PUMP = "statedependentpump"
    HEADLOSSCOST = "headlosscost"

    SUBSYSTEMS = "subsystems"
    RESULTS = "results"
    STARTSTORAGES = "startstorages"
    ENDVALUE = "endvalue"

    OUTPUT_FORMAT = "outputformat"
    DATETIME_FORMAT = "datetimeformat"
    DATETIME_FORMAT_JULESIO = "yyyy-mm-ddTHH:MM:SS"
    HDF5 = "hdf5"
    ELASTIC = "elastic"
    TRUE = True
    FALSE = False

    JULIA = "julia"
    INPUT = "input"
    OUTPUT_PATH = "outputpath"
    NUM_CORES = "numcores"
    DATA_YEARS = "datayears"
    SCENARIO_YEARS = "weatheryears"
    WEEK_START = "weekstart"
    NUM_SIM_YEARS = "simulationyears"
    EXTRA_STEPS = "extrasteps"
    SETTINGS = "settings"
    OUTPUT_NAME = "outputname"

    OUTPUT_INDEX = "outputindex"
    WEATHER_YEAR = "weatheryear"
    DATA_YEAR = "datayear"

    TIME = "time"
    WEATHER_YEAR_START = "weatheryearstart"
    WEATHER_YEAR_STOP = "weatheryearstop"
    PROB_TIME = "probtime"
    NORMAL_TIME = "normaltime"

    FIXED_DATA_TWO_TIME = "FixedDataTwoTime"
    PHASE_IN_FIXED_DATA_TWO_TIME = "PhaseinFixedDataTwoTime"

    PHASE_IN_TIME = "phaseintime"
    PHASE_IN_DELTA_DAYS = "phaseindelta_days"
    PHASE_IN_DELTA_STEPS = "phaseinsteps"
    PROBLEMS = "problems"
    PROGNOSIS = "prognosis"
    SIMULATION = "simulation"
    SHRINKABLE = "shrinkable"
    AGGZONE = "aggzone"
    AGGSUPPLYN = "aggsupplyn"
    SHORT_TERM_STORAGE_CUTOFF_HOURS = "shorttermstoragecutoff_hours"
    SHORTER_THAN_PROGNOSIS_MED_DAYS = "shorterthanprognosismed_days"
    LONG = "long"
    MED = "med"
    SHORT = "short"
    PROB = "prob"
    SOLVER = "solver"
    FUNCTION = "function"
    AGG_STARTMAG_DICT = "aggstartmagdict"
    STARTMAG_DICT = "startmagdict"
    RESIDUAL_AREA_LIST = "residualarealist"

    SCENARIO_GENERATION = "scenariogeneration"
    INFLOW_CLUSTERING_METHOD = "InflowClusteringMethod"
    NUM_SCEN = "numscen"
    SCEN_DELTA_DAYS = "scendelta_days"

    PARTS = "parts"

    SKIPMAX = "skipmax"

    HIGHS_PROB = "HiGHS_Prob()"
    HIGHS_SIMPLEX = "HighsSimplexMethod()"
    HIGHS_SIMPLEX_NO_WARMSTART = "HighsSimplexMethod(warmstart=false)"
    HIGHS_SIMPLEX_SIP_NO_WARMSTART = "HighsSimplexSIPMethod(warmstart=false)"
    JUMP_HIGHS = "JuMPHiGHSMethod()"

    STOCHASTIC = "stochastic"
    MAXCUTS = "maxcuts"
    LB = "lb"
    RELTOL = "reltol"
    ONLY_AGG_HYDRO = "onlyagghydro"
    MASTER = "master"
    SUBS = "subs"

    HORIZONS = "horizons"
    HORIZON_DURATION_WEEKS = "horizonduration_weeks"
    HORIZON_DURATION_HOURS = "horizonduration_hours"
    PERIOD_DURATION_DAYS = "periodduration_days"
    PERIOD_DURATION_HOURS = "periodduration_hours"
    POWER_PARTS = "powerparts"

    RHSDATA = "rhsdata"
    DYNAMIC_EXOGEN_PRICE_AH_DATA = "DynamicExogenPriceAHData"
    DYNAMIC_RHS_AH_DATA = "DynamicRHSAHData"
    RHSMETHOD = "rhsmethod"
    KMEANS_AH_METHOD = "KMeansAHMethod()"
    CLUSTERS = "clusters"
    UNIT_DURATION_HOURS = "unitduration_hours"

    SETTINGS_SCENARIO_YEAR_START = "scenarioyearstart"

    CLEARING = "clearing"
    SHORT_TERM = "short_term"
    MEDIUM_TERM = "medium_term"
    LONG_TERM = "long_term"

    MARKET = "Power"
    STORAGE_SYSTEM = "Hydro"
    SHORT_TERM_STORAGE = "Battery"

    DFMTin = "%Y-%m-%dT%H:%M:%S"

    FLOW = "Flow"
    STORAGE = "Storage"
    BALANCE = "Balance"
    COMMODITY = "Commodity"
    PARAM = "Param"
    CAPACITY = "Capacity"
    RHSTERM = "RHSTerm"
    TIMEVECTOR = "TimeVector"
    TIMEINDEX = "TimeIndex"
    TABLE = "Table"
    TIMEDELTA = "TimeDelta"
    TIMEVALUES = "TimeValues"
    ARROW = "Arrow"
    LOSS = "Loss"
    PRICE = "Price"
    CONVERSION = "Conversion"
    COST = "Cost"
    STARTUPCOST = "StartUpCost"

    BASEFLOW = "BaseFlow"
    BASEBALANCE = "BaseBalance"
    EXOGENBALANCE = "ExogenBalance"
    BASESTORAGE = "BaseStorage"
    MWTOGWHPARAM = "MWToGWhParam"
    M3STOMM3PARAM = "M3SToMM3Param"
    MEANSERIESPARAM = "MeanSeriesParam"
    MSTIMEDELTA = "MsTimeDelta"
    INFINITETIMEVECTOR = "InfiniteTimeVector"
    ROTATINGTIMEVECTOR = "RotatingTimeVector"
    ONEYEARTIMEVECTOR = "OneYearTimeVector"
    CONSTANTTIMEVECTOR = "ConstantTimeVector"
    RANGETIMEINDEX = "RangeTimeIndex"
    VECTORTIMEINDEX = "VectorTimeIndex"
    BASETABLE = "BaseTable"
    COLUMNTIMEVALUES = "ColumnTimeValues"
    VECTORTIMEVALUES = "VectorTimeValues"
    LOWERZEROCAPACITY = "LowerZeroCapacity"
    POSITIVECAPACITY = "PositiveCapacity"
    BASERHSTERM = "BaseRHSTerm"
    BASEARROW = "BaseArrow"
    SEGMENTEDARROW = "SegmentedArrow"
    SIMPLELOSS = "SimpleLoss"
    COSTTERM = "CostTerm"
    SIMPLESTARTUPCOST = "SimpleStartUpCost"

    LOSSFACTORKEY = "LossFactor"
    UTILIZATIONKEY = "Utilization"
    FALLBACK_UTILIZATION = 0.5

    STARTCOSTKEY = "StartCost"
    MINSTABLELOADKEY = "MinStableLoad"

    WHICHCONCEPT = "WhichConcept"
    WHICHINSTANCE = "WhichInstance"

    DIRECTIONKEY = "Direction"
    DIRECTIONIN = "In"
    DIRECTIONOUT = "Out"

    BOUNDKEY = "Bound"
    BOUNDUPPER = "Upper"
    BOUNDLOWER = "Lower"

    LEVEL = "Level"
    PROFILE = "Profile"
    VALUE = "Value"
    START = "Start"
    STEPS = "Steps"
    DELTA = "Delta"
    PERIOD = "Period"
    VECTOR = "Vector"
    MATRIX = "Matrix"
    NAMES = "Names"
    NAME = "Name"

    METADATA = "Metadata"
    GLOBALENEQ = "GlobalEneq"
    RESIDUALHINT = "Residualhint"

    JULES_CONFIG = "config.yaml"
    OUTPUT_FOLDER = "output"
    JULIA_ENV_NAME = "JulES_julia_env"

    def __init__(self) -> None:
        """Dynamically settable names for JulES."""
        # This is set in BuildHandler when we build data elements
        # for the clearing model. It is used in ConfigHandler in
        # connection with using AdaptiveHorizon
        self.dummy_exogenous_balance_name: str | None = None
        self.dummy_exogenous_profile_id: str | None = None
__init__() -> None

Dynamically settable names for JulES.

Source code in framjules/solve_handler/JulESNames.py
261
262
263
264
265
266
267
def __init__(self) -> None:
    """Dynamically settable names for JulES."""
    # This is set in BuildHandler when we build data elements
    # for the clearing model. It is used in ConfigHandler in
    # connection with using AdaptiveHorizon
    self.dummy_exogenous_balance_name: str | None = None
    self.dummy_exogenous_profile_id: str | None = None

SolveHandler

SolveHandler

Bases: Base

Common data methods for different simulation modes.

Source code in framjules/solve_handler/SolveHandler.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
class SolveHandler(Base):
    """Common data methods for different simulation modes."""

    def __init__(self, folder: Path, clearing_model: Model, config: JulESConfig) -> None:
        """Hold all data and methods needed to solve JulES."""
        names = JulESNames()

        t = time()
        short_aggregations = config.get_short_term_aggregations()  # list of Aggregators
        self.send_debug_event(f"get_short_term_aggregations time: {round(time() - t, 2)} seconds")
        mid_aggregations = []  # config.get_mid_term_aggregations()  # not supported yet
        long_aggregations = []  # config.get_long_term_aggregations()  # not supported yet

        t = time()
        aggregator = JulESAggregator(
            clearing=clearing_model,
            short=short_aggregations,
            medium=mid_aggregations,
            long=long_aggregations,
        )
        self.send_debug_event(f"JulESAggregator init time: {round(time() - t, 2)} seconds")

        t = time()
        short_term_model = aggregator.get_short_term_model()
        self.send_debug_event(f"get_short_term_model init time: {round(time() - t, 2)} seconds")
        t = time()
        medium_term_model = aggregator.get_medium_term_model()
        self.send_debug_event(f"get_medium_term_model init time: {round(time() - t, 2)} seconds")
        t = time()
        long_term_model = aggregator.get_long_term_model()
        self.send_debug_event(f"get_long_term_model init time: {round(time() - t, 2)} seconds")

        t = time()
        domain_models = DomainModels(
            clearing=clearing_model,
            short_term=short_term_model,
            medium_term=medium_term_model,
            long_term=long_term_model,
        )
        self.send_debug_event(f"DomainModels init time: {round(time() - t, 2)} seconds")

        t = time()
        supported_types = (Flow, Node)
        forbidden_types = tuple()
        graphs = NodeFlowGraphs(
            clearing=get_supported_components(
                self._get_components(clearing_model),
                supported_types,
                forbidden_types,
            ),
            short_term=get_supported_components(
                self._get_components(short_term_model),
                supported_types,
                forbidden_types,
            ),
            medium_term=get_supported_components(
                self._get_components(medium_term_model),
                supported_types,
                forbidden_types,
            ),
            long_term=get_supported_components(
                self._get_components(long_term_model),
                supported_types,
                forbidden_types,
            ),
        )
        self.send_debug_event(f"NodeFlowGraphs init time: {round(time() - t, 2)} seconds")

        t = time()
        graph_infos = GraphInfos(
            clearing={k: ComponentInfo() for k in graphs.clearing},
            short_term={k: ComponentInfo() for k in graphs.short_term},
            medium_term={k: ComponentInfo() for k in graphs.medium_term},
            long_term={k: ComponentInfo() for k in graphs.long_term},
        )
        self.send_debug_event(f"GraphInfos init time: {round(time() - t, 2)} seconds")

        # we check that that aggregated models don't have different storages
        t = time()
        aggregator.assert_equal_storages(
            graphs.short_term,
            graphs.medium_term,
            graphs.long_term,
        )
        self.send_debug_event(f"assert_equal_storages time: {round(time() - t, 2)} seconds")

        t = time()
        constructor = CacheDB if config.is_cache_db() else ModelDB
        db = constructor(
            domain_models.clearing,
            domain_models.short_term,
            domain_models.medium_term,
            domain_models.long_term,
        )
        self.send_debug_event(f"DB init time: {round(time() - t, 2)} seconds")

        t = time()
        self.fill_graph_infos(graph_infos, graphs, names, aggregator, config, db)
        self.send_debug_event(f"fill_graph_infos time: {round(time() - t, 2)} seconds")

        # Finally, we set the member data
        self.folder: Path = folder
        self.config: JulESConfig = config
        self.names: JulESNames = names
        self.domain_models: DomainModels = domain_models
        self.graphs: NodeFlowGraphs = graphs
        self.graph_infos: GraphInfos = graph_infos
        # NB! will be freed after self.configure()
        # so we don't hold up memory during run
        self.db: QueryDB = db

    def build(self) -> None:
        """Build input files for JulES."""
        handler = self.create_build_handler()
        handler.build()

    def configure(self) -> None:
        """Build configuration file for JulES."""
        handler = self.create_config_handler()
        handler.configure()

        self.db = None
        gc.collect()

    def run(self) -> None:
        """Run Julia-JulES."""
        handler = self.create_run_handler()
        handler.run()

    def set_results(self) -> None:
        """Set results from Julia-JulES run into domain models."""
        handler = self.create_results_handler()
        handler.set_results()

    def create_build_handler(self) -> BuildHandler:
        """Create specialized BuildHandler for the chosen simulation mode."""
        if self.config.is_simulation_mode_serial():
            handler_constructor = SerialBuildHandler
        else:
            raise NotImplementedError

        return handler_constructor(
            folder=self.folder,
            config=self.config,
            names=self.names,
            domain_models=self.domain_models,
            graphs=self.graphs,
            graph_infos=self.graph_infos,
            db=self.db,
        )

    def create_config_handler(self) -> ConfigHandler:
        """Create specialized ConfigHandler for the chosen simulation mode."""
        if self.config.is_simulation_mode_serial():
            handler_constructor = SerialConfigHandler
        else:
            raise NotImplementedError

        return handler_constructor(
            folder=self.folder,
            config=self.config,
            names=self.names,
            graph_infos=self.graph_infos,
        )

    def create_run_handler(self) -> SerialRunHandler:
        """Create specialized RunHandler for the chosen simulation mode."""
        dependencies = []

        tulipa_version = self.config.get_tulipa_version()
        if tulipa_version is not None:
            if Path.exists(Path(tulipa_version)):
                dependencies.append(tulipa_version)
            else:
                dependencies.append(("https://github.com/NVE/TuLiPa.git", tulipa_version))

        jules_version = self.config.get_jules_version()
        if jules_version is not None:
            if Path.exists(Path(jules_version)):
                dependencies.append(jules_version)
            else:
                dependencies.append(("https://github.com/NVE/JulES.git", jules_version))

        dependencies.extend(["YAML", "HDF5", "JSON", "PythonCall"])

        if self.config.is_simulation_mode_serial():
            handler_constructor = SerialRunHandler
        else:
            message = "JulES Parallel simulation mode is not yet supported."
            raise NotImplementedError(message)
        handler_constructor.ENV_NAME = self.names.JULIA_ENV_NAME
        return handler_constructor(folder=self.folder, config=self.config, names=self.names, dependencies=dependencies)

    def create_results_handler(self) -> SerialResultsHandler:
        """Create a SerialResultsHandler."""
        if self.config.is_simulation_mode_serial():
            handler_constructor = SerialResultsHandler
        else:
            message = "JulES Parallel simulation mode is not yet supported."
            raise NotImplementedError(message)
        return handler_constructor(
            folder=self.folder,
            config=self.config,
            names=self.names,
            graphs=self.graphs,
            graph_infos=self.graph_infos,
        )

    def _get_components(self, model: Model) -> dict[str, Component]:
        return {k: v for k, v in model.get_data().items() if isinstance(v, Component)}

    def fill_graph_infos(
        self,
        graph_infos: GraphInfos,
        graphs: NodeFlowGraphs,
        names: JulESNames,
        aggregator: JulESAggregator,
        config: JulESConfig,
        db: QueryDB,
    ) -> None:
        """Fill graph_info with derived info."""
        # Intent is to gather complex derivations in just one place

        # NB! Order of below method calls matter
        t = time()
        self.set_basic_node_flow_info(graph_infos.clearing, graphs.clearing)
        self.set_basic_node_flow_info(graph_infos.short_term, graphs.short_term)
        self.set_basic_node_flow_info(graph_infos.medium_term, graphs.medium_term)
        self.set_basic_node_flow_info(graph_infos.long_term, graphs.long_term)
        self.send_debug_event(f"set_basic_node_flow_info time: {round(time() - t, 2)} seconds")

        t = time()
        self.set_sss_info(graph_infos.clearing, graphs.clearing, names, config)
        self.set_sss_info(graph_infos.short_term, graphs.short_term, names, config)
        self.set_sss_info(graph_infos.medium_term, graphs.medium_term, names, config)
        self.set_sss_info(graph_infos.long_term, graphs.long_term, names, config)
        self.send_debug_event(f"set_sss_info time: {round(time() - t, 2)} seconds")

        t = time()
        self.set_market_info(graph_infos.clearing, graphs.clearing, names)
        self.set_market_info(graph_infos.short_term, graphs.short_term, names)
        self.set_market_info(graph_infos.medium_term, graphs.medium_term, names)
        self.set_market_info(graph_infos.long_term, graphs.long_term, names)
        self.send_debug_event(f"set_market_info time: {round(time() - t, 2)} seconds")

        t = time()
        self.set_agg_storage_node_info(graph_infos.clearing, aggregator, graphs.clearing, graphs.short_term)
        # self.set_agg_market_node_info(graph_infos.clearing, aggregator, graphs.clearing, graphs.medium_term)
        self.send_debug_event(f"set_agg_storage_node_info time: {round(time() - t, 2)} seconds")

        t = time()
        self.set_jules_id_info(graph_infos.clearing, is_aggregated=False, names=names)
        self.set_jules_id_info(graph_infos.short_term, is_aggregated=True, names=names)
        self.send_debug_event(f"set_jules_id_info time: {round(time() - t, 2)} seconds")

        t = time()
        self.set_unit_info(graph_infos.clearing, graphs.clearing, config, names)
        self.set_unit_info(graph_infos.short_term, graphs.short_term, config, names)
        self.send_debug_event(f"set_unit_info time: {round(time() - t, 2)} seconds")

        t = time()
        self.set_sss_global_eneq_info(graph_infos.clearing, graphs.clearing, db, config)
        self.set_sss_global_eneq_info(graph_infos.short_term, graphs.short_term, db, config)
        self.send_debug_event(f"set_sss_global_eneq_info time: {round(time() - t, 2)} seconds")

        t = time()
        self.set_sss_initial_storage(graph_infos.clearing, graphs.clearing, db, config)
        self.set_agg_initial_storage(graph_infos.short_term, graph_infos.clearing)
        self.send_debug_event(f"set_sss_initial_storage time: {round(time() - t, 2)} seconds")

        t = time()
        # assert that graph_infos has expected content
        assert all({True, False} == {x.is_node, x.is_flow} for x in graph_infos.clearing.values())
        assert all({True, False} == {x.is_node, x.is_flow} for x in graph_infos.short_term.values())
        assert all({True, False} == {x.is_node, x.is_flow} for x in graph_infos.medium_term.values())
        assert all({True, False} == {x.is_node, x.is_flow} for x in graph_infos.long_term.values())

        self.send_debug_event(f"validation time: {round(time() - t, 2)} seconds")

    def set_basic_node_flow_info(
        self,
        out_graph_info: dict[str, ComponentInfo],
        graph: dict[str, Flow | Node],
    ) -> None:
        """Info directly accessible from Node and Flow API.

        We also set domain_commodity for Flow as main_node.get_commodity().
        """
        for component_id, c in graph.items():
            info = out_graph_info[component_id]

            info.is_flow = isinstance(c, Flow)
            info.is_node = isinstance(c, Node)

            info.is_exogenous = c.is_exogenous()

            info.is_storage_node = isinstance(c, Node) and c.get_storage() is not None

            if info.is_node:
                info.main_node_id = component_id
                info.domain_commodity = c.get_commodity()

            if info.is_flow:
                info.main_node_id = c.get_main_node()
                info.domain_commodity = graph[info.main_node_id].get_commodity()
                info.num_arrows = len(c.get_arrows())

    def set_sss_info(
        self,
        out_graph_info: dict[str, ComponentInfo],
        graph: dict[str, Flow | Node],
        names: JulESNames,
        config: JulESConfig,
    ) -> None:
        """Storage SubSystem (sss) info."""
        include_boundaries = False  # so market nodes at the boundary is not incorrectly classified
        subsystems = get_one_commodity_storage_subsystems(graph, include_boundaries)

        for info in out_graph_info.values():
            info.has_storage_resolution = False

        for subsystem_id, (__, subsystem, boundary_domain_commodities) in subsystems.items():
            is_short_term = self.is_short_term_storage_subsystem(subsystem, graph, config)

            jules_commodity = names.SHORT_TERM_STORAGE if is_short_term else names.STORAGE_SYSTEM

            if len(boundary_domain_commodities) == 0:
                message = (
                    f"Warning! No boundary domain commodity found for storage subsystem {subsystem_id} "
                    f"with members {subsystem}.\n"
                )
                print(message)
                for component_id in subsystem:
                    info = out_graph_info[component_id]
                    info.jules_commodity = jules_commodity
                continue
            assert len(boundary_domain_commodities) == 1
            market_commodity = next(iter(boundary_domain_commodities))

            for component_id in subsystem:
                info = out_graph_info[component_id]

                info.is_sss_member = True

                info.sss_id = subsystem_id
                info.sss_is_short_term = is_short_term
                info.sss_market_commodity = market_commodity
                info.sss_members = subsystem

                info.jules_commodity = jules_commodity

            # all nodes in subsystem get True below since include_boundaries = False
            # Flow get False if any arrow points to market commodity
            assert include_boundaries is False
            for component_id in subsystem:
                component = graph[component_id]
                info = out_graph_info[component_id]
                info.has_storage_resolution = True
                if isinstance(component, Flow):
                    for arrow in component.get_arrows():
                        node_id = arrow.get_node()
                        node_info = out_graph_info[node_id]
                        if node_info.jules_commodity == info.sss_market_commodity:
                            info.has_storage_resolution = False
                            break

    def set_market_info(
        self,
        out_graph_info: dict[str, ComponentInfo],
        graph: dict[str, Flow | Node],
        names: JulESNames,
    ) -> None:
        """Set is_market_node and if so, also set jules_commodity to market."""
        for component_id, info in out_graph_info.items():
            info.is_market_node = info.is_node and not info.is_storage_node and not info.is_sss_member

            if info.is_market_node:
                info.jules_commodity = names.MARKET

        for component_id, info in out_graph_info.items():
            info.is_market_flow = False
            if info.is_flow:
                flow = graph[component_id]
                for arrow in flow.get_arrows():
                    if out_graph_info[arrow.get_node()].is_market_node:
                        info.is_market_flow = True
                        break

    def set_agg_storage_node_info(
        self,
        out: dict[str, ComponentInfo],
        aggregator: JulESAggregator,
        detailed_graph: dict[str, Flow | Node],
        aggregated_graph: dict[str, Flow | Node],
    ) -> None:
        """Aggregate storages and update info.agg_storage_node_id."""
        agg_storage_node_ids = {
            n: c for n, c in aggregated_graph.items() if isinstance(c, Node) and c.get_storage() is not None
        }
        graph_map = aggregator.get_short_term_graph_map(detailed_graph, agg_storage_node_ids)
        for member_id, agg_node_ids in graph_map.items():
            assert len(agg_node_ids) > 0
            assert sum(int(n in agg_storage_node_ids) for n in agg_node_ids) == 1
            info = out[member_id]
            if info.is_storage_node:
                for agg_node_id in agg_node_ids:
                    if agg_node_id in agg_storage_node_ids:
                        info.agg_storage_node_id = agg_node_id
                        break

    def set_agg_market_node_info(
        self,
        out: dict[str, ComponentInfo],
        aggregator: JulESAggregator,
        detailed_graph: dict[str, Flow | Node],
        aggregated_graph: dict[str, Flow | Node],
    ) -> None:
        """Aggregate market nodes and update info.agg_market_node_id."""
        market_nodes = {n: c for n, c in aggregated_graph.items() if out[n].is_market_node}
        graph_map = aggregator.get_medium_term_graph_map(detailed_graph, market_nodes)
        for agg_market_node_id, member_node_ids in graph_map.items():
            agg_component = aggregated_graph[agg_market_node_id]
            if not isinstance(agg_component, Node):
                continue
            if agg_component.get_storage() is not None:
                continue
            for node_id in member_node_ids:
                info = out[node_id]
                if info.is_market_node:
                    info.agg_market_node_id = agg_market_node_id

    def set_jules_id_info(
        self,
        out: dict[str, ComponentInfo],
        is_aggregated: bool,
        names: JulESNames,
    ) -> None:
        """Add jules ids in compliance with required format.
        Warning! Julia-JulES currently requires this format """

        for node_id, info in out.items():
            info.jules_global_eneq_id = f"{names.GLOBALENEQ}_{node_id}"

            if info.is_storage_node:
                if is_aggregated:
                    info.jules_balance_id = f"{info.jules_commodity}Balance_{node_id}_hydro_reservoir"
                    info.jules_storage_id = f"Reservoir_{node_id}_hydro_reservoir"
                else:
                    info.jules_balance_id = f"{info.jules_commodity}Balance_{node_id}"
                    info.jules_storage_id = f"Reservoir_{node_id}"

            elif info.is_node:
                info.jules_balance_id = f"{info.jules_commodity}Balance_{node_id}"

    def set_unit_info(
        self,
        out: dict[str, ComponentInfo],
        graph: dict[str, Flow | Node],
        config: JulESConfig,
        names: JulESNames,
    ) -> None:
        """Calculate all types of target units.

        Need from config:
        - unit_money
        - unit_stock per commodity for each storage_node
        - unit_flow per commodity for each flow

        Will derive:
        - unit_price per commodity for each market_node
        - unit_cost for each flow
        - unit_coeffs for each flow
        - unit_eneq for each sss_member in each sss

        And also for each flow, we derive:
        - unit_param_type
        - unit_param_flow_unit
        - unit_param_flow_unit

        """
        unit_money = config.get_currency()

        node_info = {k: info for k, info in out.items() if info.is_node}
        flow_info = {k: info for k, info in out.items() if info.is_flow}
        market_node_info = {k: info for k, info in node_info.items() if info.is_market_node}
        storage_node_info = {k: info for k, info in node_info.items() if info.is_storage_node}
        sss_member_info = {k: info for k, info in node_info.items() if info.is_sss_member}

        for info in market_node_info.values():
            unit_stock = config.get_unit_stock(info.domain_commodity)
            info.unit_price = f"{unit_money}/{unit_stock}"

        for info in storage_node_info.values():
            unit_flow = config.get_unit_flow(out[info.main_node_id].domain_commodity)
            unit_stock = config.get_unit_stock(out[info.main_node_id].domain_commodity)
            info.unit_flow = unit_flow
            info.unit_stock = unit_stock

        for d in [flow_info, storage_node_info]:
            for info in d.values():
                unit_flow = config.get_unit_flow(out[info.main_node_id].domain_commodity)
                unit_stock = config.get_unit_stock(out[info.main_node_id].domain_commodity)
                info.unit_flow = unit_flow
                info.unit_stock = unit_stock
                info.unit_cost = f"{unit_money}/{unit_stock}"
                if is_convertable(info.unit_flow, "MW"):
                    info.unit_param_type = names.MWTOGWHPARAM
                    info.unit_param_unit_flow = "MW"
                    info.unit_param_unit_stock = "GWh"
                elif is_convertable(info.unit_flow, "m3/s"):
                    info.unit_param_type = names.M3STOMM3PARAM
                    info.unit_param_unit_flow = "m3/s"
                    info.unit_param_unit_stock = "Mm3"
                else:
                    message = f"Unsupported unit_flow: {info.unit_flow}"
                    raise ValueError(message)

                if info.is_market_flow:
                    seconds = config.get_time_resolution().get_clearing_market_minutes() * 60
                else:
                    seconds = config.get_time_resolution().get_clearing_storage_minutes() * 60
                info.unit_flow_result = f"{unit_stock}/({seconds} * s)"

        for flow_id, info in flow_info.items():
            flow: Flow = graph[flow_id]
            info.unit_coeffs = dict()
            for arrow in flow.get_arrows():
                from_node_id = arrow.get_node()
                unit_coeff = None
                if from_node_id != info.main_node_id:
                    from_node_unit = config.get_unit_stock(out[from_node_id].domain_commodity)
                    unit_coeff = None if from_node_unit == info.unit_stock else f"{from_node_unit}/{info.unit_stock}"
                info.unit_coeffs[from_node_id] = unit_coeff

        for info in sss_member_info.values():
            if info.sss_global_eneq_unit is not None:
                continue
            unit_market = config.get_unit_stock(info.sss_market_commodity)
            unit_stock = config.get_unit_stock(info.domain_commodity)
            unit_eneq = f"{unit_market}/{unit_stock}"
            for component_id in info.sss_members:
                member_info = out[component_id]
                member_info.sss_global_eneq_unit = unit_eneq

    def set_sss_global_eneq_info(
        self,
        out: dict[str, ComponentInfo],
        graph: dict[str, Flow | Node],
        db: QueryDB,
        config: JulESConfig,
    ) -> dict[str, float]:
        """Set global_energy_coefficient using metadata. Convert to usable unit."""
        for component_id, info in out.items():

            if not info.is_sss_member or not info.is_storage_node:
                continue

            data_dim: FixedFrequencyTimeIndex = config.get_data_period()

            start_year, num_years = config.get_weather_years()
            scen_dim = AverageYearRange(start_year, num_years)

            metakeys = graph[component_id].get_meta_keys()
            if "EnergyEqDownstream" in metakeys:  
                metadata = graph[component_id].get_meta("EnergyEqDownstream")
            elif "enekv_global" in metakeys:
                metadata = graph[component_id].get_meta("enekv_global")
            else:
                message = (
                    f"Missing metadata EnergyEqDownstream or enekv_global for {component_id}, "
                    f"only metadata keys {list(metakeys)}."
                )
                message = message + f" Object info: {info}"
                raise ValueError(message)
            expr = metadata.get_value()

            info.sss_global_eneq_value = get_level_value(
                expr=expr,
                unit=info.sss_global_eneq_unit,
                db=db,
                data_dim=data_dim,
                scen_dim=scen_dim,
                is_max=False,
            )

    def set_sss_initial_storage(
        self,
        out: dict[str, ComponentInfo],
        graph: dict[str, Flow | Node],
        db: QueryDB,
        config: JulESConfig,
    ) -> dict[str, float]:
        """Set sss_initial_storage. Convert to usable unit."""
        for node_id, info in out.items():
            if not info.is_storage_node:
                continue

            node: Node = graph[node_id]


            percentage = 0.6
            try:
                percentage = node.get_initial_storage_percentage()
                assert 0 <= percentage <= 1
            except Exception:
                self.send_warning_event(
                    f"Missing initial storage for {node_id}. Using 60 % of capacity.",
                )

            info.sss_initial_storage = self._get_initial_storage_capacity(
                node_id,
                node,
                info,
                percentage,
                db,
                config,
            )

    def _get_initial_storage_capacity(
        self,
        node_id: str,
        node: Node,
        info: ComponentInfo,
        percentage: float,
        db: QueryDB,
        config: JulESConfig,
    ) -> float:
        data_dim: FixedFrequencyTimeIndex = config.get_data_period()

        if data_dim.get_num_periods() > 1:

            raise NotImplementedError

        start_year, num_years = config.get_weather_years()
        scen_dim = AverageYearRange(start_year, num_years)

        capacity: StockVolume = node.get_storage().get_capacity()
        data_value: float = capacity.get_data_value(
            db=db,
            level_period=data_dim,
            scenario_horizon=scen_dim,
            unit=info.unit_stock,
            is_max_level=True,
        )

        return data_value * percentage

    def set_agg_initial_storage(
        self,
        agg_graph_info: dict[str, ComponentInfo],
        det_graph_info: dict[str, ComponentInfo],
    ) -> None:
        """Set global_eneq and initial_storage in aggregated graph_info from detailed graph_info."""
        for det_id, det in det_graph_info.items():
            if not det.is_storage_node:
                continue
            if det.agg_storage_node_id is None:
                continue

            agg = agg_graph_info[det.agg_storage_node_id]

            if agg.sss_initial_storage is None:
                agg.sss_initial_storage = 0.0

            agg.sss_initial_storage += det.sss_global_eneq_value * det.sss_initial_storage

    def is_short_term_storage_subsystem(self, subsystem: set[str], graph: NodeFlowGraphs, config: JulESConfig) -> bool:
        """Return True if is_short_term_storage_system."""
        return False
__init__(folder: Path, clearing_model: Model, config: JulESConfig) -> None

Hold all data and methods needed to solve JulES.

Source code in framjules/solve_handler/SolveHandler.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def __init__(self, folder: Path, clearing_model: Model, config: JulESConfig) -> None:
    """Hold all data and methods needed to solve JulES."""
    names = JulESNames()

    t = time()
    short_aggregations = config.get_short_term_aggregations()  # list of Aggregators
    self.send_debug_event(f"get_short_term_aggregations time: {round(time() - t, 2)} seconds")
    mid_aggregations = []  # config.get_mid_term_aggregations()  # not supported yet
    long_aggregations = []  # config.get_long_term_aggregations()  # not supported yet

    t = time()
    aggregator = JulESAggregator(
        clearing=clearing_model,
        short=short_aggregations,
        medium=mid_aggregations,
        long=long_aggregations,
    )
    self.send_debug_event(f"JulESAggregator init time: {round(time() - t, 2)} seconds")

    t = time()
    short_term_model = aggregator.get_short_term_model()
    self.send_debug_event(f"get_short_term_model init time: {round(time() - t, 2)} seconds")
    t = time()
    medium_term_model = aggregator.get_medium_term_model()
    self.send_debug_event(f"get_medium_term_model init time: {round(time() - t, 2)} seconds")
    t = time()
    long_term_model = aggregator.get_long_term_model()
    self.send_debug_event(f"get_long_term_model init time: {round(time() - t, 2)} seconds")

    t = time()
    domain_models = DomainModels(
        clearing=clearing_model,
        short_term=short_term_model,
        medium_term=medium_term_model,
        long_term=long_term_model,
    )
    self.send_debug_event(f"DomainModels init time: {round(time() - t, 2)} seconds")

    t = time()
    supported_types = (Flow, Node)
    forbidden_types = tuple()
    graphs = NodeFlowGraphs(
        clearing=get_supported_components(
            self._get_components(clearing_model),
            supported_types,
            forbidden_types,
        ),
        short_term=get_supported_components(
            self._get_components(short_term_model),
            supported_types,
            forbidden_types,
        ),
        medium_term=get_supported_components(
            self._get_components(medium_term_model),
            supported_types,
            forbidden_types,
        ),
        long_term=get_supported_components(
            self._get_components(long_term_model),
            supported_types,
            forbidden_types,
        ),
    )
    self.send_debug_event(f"NodeFlowGraphs init time: {round(time() - t, 2)} seconds")

    t = time()
    graph_infos = GraphInfos(
        clearing={k: ComponentInfo() for k in graphs.clearing},
        short_term={k: ComponentInfo() for k in graphs.short_term},
        medium_term={k: ComponentInfo() for k in graphs.medium_term},
        long_term={k: ComponentInfo() for k in graphs.long_term},
    )
    self.send_debug_event(f"GraphInfos init time: {round(time() - t, 2)} seconds")

    # we check that that aggregated models don't have different storages
    t = time()
    aggregator.assert_equal_storages(
        graphs.short_term,
        graphs.medium_term,
        graphs.long_term,
    )
    self.send_debug_event(f"assert_equal_storages time: {round(time() - t, 2)} seconds")

    t = time()
    constructor = CacheDB if config.is_cache_db() else ModelDB
    db = constructor(
        domain_models.clearing,
        domain_models.short_term,
        domain_models.medium_term,
        domain_models.long_term,
    )
    self.send_debug_event(f"DB init time: {round(time() - t, 2)} seconds")

    t = time()
    self.fill_graph_infos(graph_infos, graphs, names, aggregator, config, db)
    self.send_debug_event(f"fill_graph_infos time: {round(time() - t, 2)} seconds")

    # Finally, we set the member data
    self.folder: Path = folder
    self.config: JulESConfig = config
    self.names: JulESNames = names
    self.domain_models: DomainModels = domain_models
    self.graphs: NodeFlowGraphs = graphs
    self.graph_infos: GraphInfos = graph_infos
    # NB! will be freed after self.configure()
    # so we don't hold up memory during run
    self.db: QueryDB = db
build() -> None

Build input files for JulES.

Source code in framjules/solve_handler/SolveHandler.py
136
137
138
139
def build(self) -> None:
    """Build input files for JulES."""
    handler = self.create_build_handler()
    handler.build()
configure() -> None

Build configuration file for JulES.

Source code in framjules/solve_handler/SolveHandler.py
141
142
143
144
145
146
147
def configure(self) -> None:
    """Build configuration file for JulES."""
    handler = self.create_config_handler()
    handler.configure()

    self.db = None
    gc.collect()
create_build_handler() -> BuildHandler

Create specialized BuildHandler for the chosen simulation mode.

Source code in framjules/solve_handler/SolveHandler.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def create_build_handler(self) -> BuildHandler:
    """Create specialized BuildHandler for the chosen simulation mode."""
    if self.config.is_simulation_mode_serial():
        handler_constructor = SerialBuildHandler
    else:
        raise NotImplementedError

    return handler_constructor(
        folder=self.folder,
        config=self.config,
        names=self.names,
        domain_models=self.domain_models,
        graphs=self.graphs,
        graph_infos=self.graph_infos,
        db=self.db,
    )
create_config_handler() -> ConfigHandler

Create specialized ConfigHandler for the chosen simulation mode.

Source code in framjules/solve_handler/SolveHandler.py
176
177
178
179
180
181
182
183
184
185
186
187
188
def create_config_handler(self) -> ConfigHandler:
    """Create specialized ConfigHandler for the chosen simulation mode."""
    if self.config.is_simulation_mode_serial():
        handler_constructor = SerialConfigHandler
    else:
        raise NotImplementedError

    return handler_constructor(
        folder=self.folder,
        config=self.config,
        names=self.names,
        graph_infos=self.graph_infos,
    )
create_results_handler() -> SerialResultsHandler

Create a SerialResultsHandler.

Source code in framjules/solve_handler/SolveHandler.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def create_results_handler(self) -> SerialResultsHandler:
    """Create a SerialResultsHandler."""
    if self.config.is_simulation_mode_serial():
        handler_constructor = SerialResultsHandler
    else:
        message = "JulES Parallel simulation mode is not yet supported."
        raise NotImplementedError(message)
    return handler_constructor(
        folder=self.folder,
        config=self.config,
        names=self.names,
        graphs=self.graphs,
        graph_infos=self.graph_infos,
    )
create_run_handler() -> SerialRunHandler

Create specialized RunHandler for the chosen simulation mode.

Source code in framjules/solve_handler/SolveHandler.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def create_run_handler(self) -> SerialRunHandler:
    """Create specialized RunHandler for the chosen simulation mode."""
    dependencies = []

    tulipa_version = self.config.get_tulipa_version()
    if tulipa_version is not None:
        if Path.exists(Path(tulipa_version)):
            dependencies.append(tulipa_version)
        else:
            dependencies.append(("https://github.com/NVE/TuLiPa.git", tulipa_version))

    jules_version = self.config.get_jules_version()
    if jules_version is not None:
        if Path.exists(Path(jules_version)):
            dependencies.append(jules_version)
        else:
            dependencies.append(("https://github.com/NVE/JulES.git", jules_version))

    dependencies.extend(["YAML", "HDF5", "JSON", "PythonCall"])

    if self.config.is_simulation_mode_serial():
        handler_constructor = SerialRunHandler
    else:
        message = "JulES Parallel simulation mode is not yet supported."
        raise NotImplementedError(message)
    handler_constructor.ENV_NAME = self.names.JULIA_ENV_NAME
    return handler_constructor(folder=self.folder, config=self.config, names=self.names, dependencies=dependencies)
fill_graph_infos(graph_infos: GraphInfos, graphs: NodeFlowGraphs, names: JulESNames, aggregator: JulESAggregator, config: JulESConfig, db: QueryDB) -> None

Fill graph_info with derived info.

Source code in framjules/solve_handler/SolveHandler.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def fill_graph_infos(
    self,
    graph_infos: GraphInfos,
    graphs: NodeFlowGraphs,
    names: JulESNames,
    aggregator: JulESAggregator,
    config: JulESConfig,
    db: QueryDB,
) -> None:
    """Fill graph_info with derived info."""
    # Intent is to gather complex derivations in just one place

    # NB! Order of below method calls matter
    t = time()
    self.set_basic_node_flow_info(graph_infos.clearing, graphs.clearing)
    self.set_basic_node_flow_info(graph_infos.short_term, graphs.short_term)
    self.set_basic_node_flow_info(graph_infos.medium_term, graphs.medium_term)
    self.set_basic_node_flow_info(graph_infos.long_term, graphs.long_term)
    self.send_debug_event(f"set_basic_node_flow_info time: {round(time() - t, 2)} seconds")

    t = time()
    self.set_sss_info(graph_infos.clearing, graphs.clearing, names, config)
    self.set_sss_info(graph_infos.short_term, graphs.short_term, names, config)
    self.set_sss_info(graph_infos.medium_term, graphs.medium_term, names, config)
    self.set_sss_info(graph_infos.long_term, graphs.long_term, names, config)
    self.send_debug_event(f"set_sss_info time: {round(time() - t, 2)} seconds")

    t = time()
    self.set_market_info(graph_infos.clearing, graphs.clearing, names)
    self.set_market_info(graph_infos.short_term, graphs.short_term, names)
    self.set_market_info(graph_infos.medium_term, graphs.medium_term, names)
    self.set_market_info(graph_infos.long_term, graphs.long_term, names)
    self.send_debug_event(f"set_market_info time: {round(time() - t, 2)} seconds")

    t = time()
    self.set_agg_storage_node_info(graph_infos.clearing, aggregator, graphs.clearing, graphs.short_term)
    # self.set_agg_market_node_info(graph_infos.clearing, aggregator, graphs.clearing, graphs.medium_term)
    self.send_debug_event(f"set_agg_storage_node_info time: {round(time() - t, 2)} seconds")

    t = time()
    self.set_jules_id_info(graph_infos.clearing, is_aggregated=False, names=names)
    self.set_jules_id_info(graph_infos.short_term, is_aggregated=True, names=names)
    self.send_debug_event(f"set_jules_id_info time: {round(time() - t, 2)} seconds")

    t = time()
    self.set_unit_info(graph_infos.clearing, graphs.clearing, config, names)
    self.set_unit_info(graph_infos.short_term, graphs.short_term, config, names)
    self.send_debug_event(f"set_unit_info time: {round(time() - t, 2)} seconds")

    t = time()
    self.set_sss_global_eneq_info(graph_infos.clearing, graphs.clearing, db, config)
    self.set_sss_global_eneq_info(graph_infos.short_term, graphs.short_term, db, config)
    self.send_debug_event(f"set_sss_global_eneq_info time: {round(time() - t, 2)} seconds")

    t = time()
    self.set_sss_initial_storage(graph_infos.clearing, graphs.clearing, db, config)
    self.set_agg_initial_storage(graph_infos.short_term, graph_infos.clearing)
    self.send_debug_event(f"set_sss_initial_storage time: {round(time() - t, 2)} seconds")

    t = time()
    # assert that graph_infos has expected content
    assert all({True, False} == {x.is_node, x.is_flow} for x in graph_infos.clearing.values())
    assert all({True, False} == {x.is_node, x.is_flow} for x in graph_infos.short_term.values())
    assert all({True, False} == {x.is_node, x.is_flow} for x in graph_infos.medium_term.values())
    assert all({True, False} == {x.is_node, x.is_flow} for x in graph_infos.long_term.values())

    self.send_debug_event(f"validation time: {round(time() - t, 2)} seconds")
is_short_term_storage_subsystem(subsystem: set[str], graph: NodeFlowGraphs, config: JulESConfig) -> bool

Return True if is_short_term_storage_system.

Source code in framjules/solve_handler/SolveHandler.py
691
692
693
def is_short_term_storage_subsystem(self, subsystem: set[str], graph: NodeFlowGraphs, config: JulESConfig) -> bool:
    """Return True if is_short_term_storage_system."""
    return False
run() -> None

Run Julia-JulES.

Source code in framjules/solve_handler/SolveHandler.py
149
150
151
152
def run(self) -> None:
    """Run Julia-JulES."""
    handler = self.create_run_handler()
    handler.run()
set_agg_initial_storage(agg_graph_info: dict[str, ComponentInfo], det_graph_info: dict[str, ComponentInfo]) -> None

Set global_eneq and initial_storage in aggregated graph_info from detailed graph_info.

Source code in framjules/solve_handler/SolveHandler.py
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
def set_agg_initial_storage(
    self,
    agg_graph_info: dict[str, ComponentInfo],
    det_graph_info: dict[str, ComponentInfo],
) -> None:
    """Set global_eneq and initial_storage in aggregated graph_info from detailed graph_info."""
    for det_id, det in det_graph_info.items():
        if not det.is_storage_node:
            continue
        if det.agg_storage_node_id is None:
            continue

        agg = agg_graph_info[det.agg_storage_node_id]

        if agg.sss_initial_storage is None:
            agg.sss_initial_storage = 0.0

        agg.sss_initial_storage += det.sss_global_eneq_value * det.sss_initial_storage
set_agg_market_node_info(out: dict[str, ComponentInfo], aggregator: JulESAggregator, detailed_graph: dict[str, Flow | Node], aggregated_graph: dict[str, Flow | Node]) -> None

Aggregate market nodes and update info.agg_market_node_id.

Source code in framjules/solve_handler/SolveHandler.py
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
def set_agg_market_node_info(
    self,
    out: dict[str, ComponentInfo],
    aggregator: JulESAggregator,
    detailed_graph: dict[str, Flow | Node],
    aggregated_graph: dict[str, Flow | Node],
) -> None:
    """Aggregate market nodes and update info.agg_market_node_id."""
    market_nodes = {n: c for n, c in aggregated_graph.items() if out[n].is_market_node}
    graph_map = aggregator.get_medium_term_graph_map(detailed_graph, market_nodes)
    for agg_market_node_id, member_node_ids in graph_map.items():
        agg_component = aggregated_graph[agg_market_node_id]
        if not isinstance(agg_component, Node):
            continue
        if agg_component.get_storage() is not None:
            continue
        for node_id in member_node_ids:
            info = out[node_id]
            if info.is_market_node:
                info.agg_market_node_id = agg_market_node_id
set_agg_storage_node_info(out: dict[str, ComponentInfo], aggregator: JulESAggregator, detailed_graph: dict[str, Flow | Node], aggregated_graph: dict[str, Flow | Node]) -> None

Aggregate storages and update info.agg_storage_node_id.

Source code in framjules/solve_handler/SolveHandler.py
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
def set_agg_storage_node_info(
    self,
    out: dict[str, ComponentInfo],
    aggregator: JulESAggregator,
    detailed_graph: dict[str, Flow | Node],
    aggregated_graph: dict[str, Flow | Node],
) -> None:
    """Aggregate storages and update info.agg_storage_node_id."""
    agg_storage_node_ids = {
        n: c for n, c in aggregated_graph.items() if isinstance(c, Node) and c.get_storage() is not None
    }
    graph_map = aggregator.get_short_term_graph_map(detailed_graph, agg_storage_node_ids)
    for member_id, agg_node_ids in graph_map.items():
        assert len(agg_node_ids) > 0
        assert sum(int(n in agg_storage_node_ids) for n in agg_node_ids) == 1
        info = out[member_id]
        if info.is_storage_node:
            for agg_node_id in agg_node_ids:
                if agg_node_id in agg_storage_node_ids:
                    info.agg_storage_node_id = agg_node_id
                    break
set_basic_node_flow_info(out_graph_info: dict[str, ComponentInfo], graph: dict[str, Flow | Node]) -> None

Info directly accessible from Node and Flow API.

We also set domain_commodity for Flow as main_node.get_commodity().

Source code in framjules/solve_handler/SolveHandler.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
def set_basic_node_flow_info(
    self,
    out_graph_info: dict[str, ComponentInfo],
    graph: dict[str, Flow | Node],
) -> None:
    """Info directly accessible from Node and Flow API.

    We also set domain_commodity for Flow as main_node.get_commodity().
    """
    for component_id, c in graph.items():
        info = out_graph_info[component_id]

        info.is_flow = isinstance(c, Flow)
        info.is_node = isinstance(c, Node)

        info.is_exogenous = c.is_exogenous()

        info.is_storage_node = isinstance(c, Node) and c.get_storage() is not None

        if info.is_node:
            info.main_node_id = component_id
            info.domain_commodity = c.get_commodity()

        if info.is_flow:
            info.main_node_id = c.get_main_node()
            info.domain_commodity = graph[info.main_node_id].get_commodity()
            info.num_arrows = len(c.get_arrows())
set_jules_id_info(out: dict[str, ComponentInfo], is_aggregated: bool, names: JulESNames) -> None

Add jules ids in compliance with required format. Warning! Julia-JulES currently requires this format

Source code in framjules/solve_handler/SolveHandler.py
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
def set_jules_id_info(
    self,
    out: dict[str, ComponentInfo],
    is_aggregated: bool,
    names: JulESNames,
) -> None:
    """Add jules ids in compliance with required format.
    Warning! Julia-JulES currently requires this format """

    for node_id, info in out.items():
        info.jules_global_eneq_id = f"{names.GLOBALENEQ}_{node_id}"

        if info.is_storage_node:
            if is_aggregated:
                info.jules_balance_id = f"{info.jules_commodity}Balance_{node_id}_hydro_reservoir"
                info.jules_storage_id = f"Reservoir_{node_id}_hydro_reservoir"
            else:
                info.jules_balance_id = f"{info.jules_commodity}Balance_{node_id}"
                info.jules_storage_id = f"Reservoir_{node_id}"

        elif info.is_node:
            info.jules_balance_id = f"{info.jules_commodity}Balance_{node_id}"
set_market_info(out_graph_info: dict[str, ComponentInfo], graph: dict[str, Flow | Node], names: JulESNames) -> None

Set is_market_node and if so, also set jules_commodity to market.

Source code in framjules/solve_handler/SolveHandler.py
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
def set_market_info(
    self,
    out_graph_info: dict[str, ComponentInfo],
    graph: dict[str, Flow | Node],
    names: JulESNames,
) -> None:
    """Set is_market_node and if so, also set jules_commodity to market."""
    for component_id, info in out_graph_info.items():
        info.is_market_node = info.is_node and not info.is_storage_node and not info.is_sss_member

        if info.is_market_node:
            info.jules_commodity = names.MARKET

    for component_id, info in out_graph_info.items():
        info.is_market_flow = False
        if info.is_flow:
            flow = graph[component_id]
            for arrow in flow.get_arrows():
                if out_graph_info[arrow.get_node()].is_market_node:
                    info.is_market_flow = True
                    break
set_results() -> None

Set results from Julia-JulES run into domain models.

Source code in framjules/solve_handler/SolveHandler.py
154
155
156
157
def set_results(self) -> None:
    """Set results from Julia-JulES run into domain models."""
    handler = self.create_results_handler()
    handler.set_results()
set_sss_global_eneq_info(out: dict[str, ComponentInfo], graph: dict[str, Flow | Node], db: QueryDB, config: JulESConfig) -> dict[str, float]

Set global_energy_coefficient using metadata. Convert to usable unit.

Source code in framjules/solve_handler/SolveHandler.py
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
def set_sss_global_eneq_info(
    self,
    out: dict[str, ComponentInfo],
    graph: dict[str, Flow | Node],
    db: QueryDB,
    config: JulESConfig,
) -> dict[str, float]:
    """Set global_energy_coefficient using metadata. Convert to usable unit."""
    for component_id, info in out.items():

        if not info.is_sss_member or not info.is_storage_node:
            continue

        data_dim: FixedFrequencyTimeIndex = config.get_data_period()

        start_year, num_years = config.get_weather_years()
        scen_dim = AverageYearRange(start_year, num_years)

        metakeys = graph[component_id].get_meta_keys()
        if "EnergyEqDownstream" in metakeys:  
            metadata = graph[component_id].get_meta("EnergyEqDownstream")
        elif "enekv_global" in metakeys:
            metadata = graph[component_id].get_meta("enekv_global")
        else:
            message = (
                f"Missing metadata EnergyEqDownstream or enekv_global for {component_id}, "
                f"only metadata keys {list(metakeys)}."
            )
            message = message + f" Object info: {info}"
            raise ValueError(message)
        expr = metadata.get_value()

        info.sss_global_eneq_value = get_level_value(
            expr=expr,
            unit=info.sss_global_eneq_unit,
            db=db,
            data_dim=data_dim,
            scen_dim=scen_dim,
            is_max=False,
        )
set_sss_info(out_graph_info: dict[str, ComponentInfo], graph: dict[str, Flow | Node], names: JulESNames, config: JulESConfig) -> None

Storage SubSystem (sss) info.

Source code in framjules/solve_handler/SolveHandler.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
def set_sss_info(
    self,
    out_graph_info: dict[str, ComponentInfo],
    graph: dict[str, Flow | Node],
    names: JulESNames,
    config: JulESConfig,
) -> None:
    """Storage SubSystem (sss) info."""
    include_boundaries = False  # so market nodes at the boundary is not incorrectly classified
    subsystems = get_one_commodity_storage_subsystems(graph, include_boundaries)

    for info in out_graph_info.values():
        info.has_storage_resolution = False

    for subsystem_id, (__, subsystem, boundary_domain_commodities) in subsystems.items():
        is_short_term = self.is_short_term_storage_subsystem(subsystem, graph, config)

        jules_commodity = names.SHORT_TERM_STORAGE if is_short_term else names.STORAGE_SYSTEM

        if len(boundary_domain_commodities) == 0:
            message = (
                f"Warning! No boundary domain commodity found for storage subsystem {subsystem_id} "
                f"with members {subsystem}.\n"
            )
            print(message)
            for component_id in subsystem:
                info = out_graph_info[component_id]
                info.jules_commodity = jules_commodity
            continue
        assert len(boundary_domain_commodities) == 1
        market_commodity = next(iter(boundary_domain_commodities))

        for component_id in subsystem:
            info = out_graph_info[component_id]

            info.is_sss_member = True

            info.sss_id = subsystem_id
            info.sss_is_short_term = is_short_term
            info.sss_market_commodity = market_commodity
            info.sss_members = subsystem

            info.jules_commodity = jules_commodity

        # all nodes in subsystem get True below since include_boundaries = False
        # Flow get False if any arrow points to market commodity
        assert include_boundaries is False
        for component_id in subsystem:
            component = graph[component_id]
            info = out_graph_info[component_id]
            info.has_storage_resolution = True
            if isinstance(component, Flow):
                for arrow in component.get_arrows():
                    node_id = arrow.get_node()
                    node_info = out_graph_info[node_id]
                    if node_info.jules_commodity == info.sss_market_commodity:
                        info.has_storage_resolution = False
                        break
set_sss_initial_storage(out: dict[str, ComponentInfo], graph: dict[str, Flow | Node], db: QueryDB, config: JulESConfig) -> dict[str, float]

Set sss_initial_storage. Convert to usable unit.

Source code in framjules/solve_handler/SolveHandler.py
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
def set_sss_initial_storage(
    self,
    out: dict[str, ComponentInfo],
    graph: dict[str, Flow | Node],
    db: QueryDB,
    config: JulESConfig,
) -> dict[str, float]:
    """Set sss_initial_storage. Convert to usable unit."""
    for node_id, info in out.items():
        if not info.is_storage_node:
            continue

        node: Node = graph[node_id]


        percentage = 0.6
        try:
            percentage = node.get_initial_storage_percentage()
            assert 0 <= percentage <= 1
        except Exception:
            self.send_warning_event(
                f"Missing initial storage for {node_id}. Using 60 % of capacity.",
            )

        info.sss_initial_storage = self._get_initial_storage_capacity(
            node_id,
            node,
            info,
            percentage,
            db,
            config,
        )
set_unit_info(out: dict[str, ComponentInfo], graph: dict[str, Flow | Node], config: JulESConfig, names: JulESNames) -> None

Calculate all types of target units.

Need from config: - unit_money - unit_stock per commodity for each storage_node - unit_flow per commodity for each flow

Will derive: - unit_price per commodity for each market_node - unit_cost for each flow - unit_coeffs for each flow - unit_eneq for each sss_member in each sss

And also for each flow, we derive: - unit_param_type - unit_param_flow_unit - unit_param_flow_unit

Source code in framjules/solve_handler/SolveHandler.py
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
def set_unit_info(
    self,
    out: dict[str, ComponentInfo],
    graph: dict[str, Flow | Node],
    config: JulESConfig,
    names: JulESNames,
) -> None:
    """Calculate all types of target units.

    Need from config:
    - unit_money
    - unit_stock per commodity for each storage_node
    - unit_flow per commodity for each flow

    Will derive:
    - unit_price per commodity for each market_node
    - unit_cost for each flow
    - unit_coeffs for each flow
    - unit_eneq for each sss_member in each sss

    And also for each flow, we derive:
    - unit_param_type
    - unit_param_flow_unit
    - unit_param_flow_unit

    """
    unit_money = config.get_currency()

    node_info = {k: info for k, info in out.items() if info.is_node}
    flow_info = {k: info for k, info in out.items() if info.is_flow}
    market_node_info = {k: info for k, info in node_info.items() if info.is_market_node}
    storage_node_info = {k: info for k, info in node_info.items() if info.is_storage_node}
    sss_member_info = {k: info for k, info in node_info.items() if info.is_sss_member}

    for info in market_node_info.values():
        unit_stock = config.get_unit_stock(info.domain_commodity)
        info.unit_price = f"{unit_money}/{unit_stock}"

    for info in storage_node_info.values():
        unit_flow = config.get_unit_flow(out[info.main_node_id].domain_commodity)
        unit_stock = config.get_unit_stock(out[info.main_node_id].domain_commodity)
        info.unit_flow = unit_flow
        info.unit_stock = unit_stock

    for d in [flow_info, storage_node_info]:
        for info in d.values():
            unit_flow = config.get_unit_flow(out[info.main_node_id].domain_commodity)
            unit_stock = config.get_unit_stock(out[info.main_node_id].domain_commodity)
            info.unit_flow = unit_flow
            info.unit_stock = unit_stock
            info.unit_cost = f"{unit_money}/{unit_stock}"
            if is_convertable(info.unit_flow, "MW"):
                info.unit_param_type = names.MWTOGWHPARAM
                info.unit_param_unit_flow = "MW"
                info.unit_param_unit_stock = "GWh"
            elif is_convertable(info.unit_flow, "m3/s"):
                info.unit_param_type = names.M3STOMM3PARAM
                info.unit_param_unit_flow = "m3/s"
                info.unit_param_unit_stock = "Mm3"
            else:
                message = f"Unsupported unit_flow: {info.unit_flow}"
                raise ValueError(message)

            if info.is_market_flow:
                seconds = config.get_time_resolution().get_clearing_market_minutes() * 60
            else:
                seconds = config.get_time_resolution().get_clearing_storage_minutes() * 60
            info.unit_flow_result = f"{unit_stock}/({seconds} * s)"

    for flow_id, info in flow_info.items():
        flow: Flow = graph[flow_id]
        info.unit_coeffs = dict()
        for arrow in flow.get_arrows():
            from_node_id = arrow.get_node()
            unit_coeff = None
            if from_node_id != info.main_node_id:
                from_node_unit = config.get_unit_stock(out[from_node_id].domain_commodity)
                unit_coeff = None if from_node_unit == info.unit_stock else f"{from_node_unit}/{info.unit_stock}"
            info.unit_coeffs[from_node_id] = unit_coeff

    for info in sss_member_info.values():
        if info.sss_global_eneq_unit is not None:
            continue
        unit_market = config.get_unit_stock(info.sss_market_commodity)
        unit_stock = config.get_unit_stock(info.domain_commodity)
        unit_eneq = f"{unit_market}/{unit_stock}"
        for component_id in info.sss_members:
            member_info = out[component_id]
            member_info.sss_global_eneq_unit = unit_eneq

build_handler

BuildHandler
BuildHandler

Bases: Base, ABC

Responsible for implementing shared functionality in build method.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
class BuildHandler(Base, ABC):
    """Responsible for implementing shared functionality in build method."""

    def __init__(
        self,
        folder: Path,
        config: JulESConfig,
        names: JulESNames,
        domain_models: DomainModels,
        graphs: NodeFlowGraphs,
        graph_infos: GraphInfos,
        db: QueryDB,
    ) -> None:
        """Initialize handler.

        Use inputs passed down from SolveHandler

        and create extra fields only relevant for
        the build phase.
        """
        self.folder = folder
        self.config = config
        self.names = names
        self.domain_models = domain_models
        self.graphs = graphs
        self.graph_infos = graph_infos
        self.db = db

        self.append = DataElementAppender(names)

        self.errors: set[str] = set()
        self.timevectors: dict[FixedFrequencyTimeIndex, dict[str, NDArray]] = defaultdict(dict)

    def build(self) -> None:
        """Build input and configuration files for JulES."""
        t = time()
        self.build_data_elements(self.names.CLEARING, self.graphs.clearing, self.graph_infos.clearing)
        self.send_debug_event(f"build_data_elements clearing time: {round(time() - t, 2)} seconds")

        t = time()
        self.build_data_elements(self.names.AGGREGATED, self.graphs.short_term, self.graph_infos.short_term)
        self.send_debug_event(f"build_data_elements aggregated time: {round(time() - t, 2)} seconds")

        # Intent is to build data elements for aggregated models. Pending changes in Julia-JulES
        # self.build_data_elements(self.names.SHORT_TERM, self.graphs.short_term, self.graph_infos.short_term)
        # self.build_data_elements(self.names.MEDIUM_TERM, self.graphs.medium_term, self.graph_infos.medium_term)
        # self.build_data_elements(self.names.LONG_TERM, self.graphs.long_term, self.graph_infos.long_term)

        t = time()
        self.build_time_vectors()
        self.send_debug_event(f"build_time_vectors time: {round(time() - t, 2)} seconds")

        t = time()
        self.build_storage_mapping(self.graph_infos.clearing)
        self.send_debug_event(f"build_storage_mapping time: {round(time() - t, 2)} seconds")

        t = time()
        self.build_start_storage(self.names.FILENAME_START_STORAGES_CLEARING, self.graph_infos.clearing)
        self.send_debug_event(f"build_start_storage clearing time: {round(time() - t, 2)} seconds")

        t = time()
        self.build_start_storage(self.names.FILENAME_START_STORAGES_AGGREGATED, self.graph_infos.short_term)
        self.send_debug_event(f"build_start_storage aggregated time: {round(time() - t, 2)} seconds")

    def build_start_storage(
        self,
        filename: str,
        graph_info: dict[str, ComponentInfo],
    ) -> None:
        """Write start storag json file to folder."""
        data = {
            info.jules_storage_id: info.sss_initial_storage for k, info in graph_info.items() if info.is_storage_node
        }
        self.write_json_file(data, filename)

    def build_data_elements(
        self,
        model_id: str,
        graph: dict[str, Flow | Node],
        graph_info: dict[str, ComponentInfo],
    ) -> None:
        """Write json file with data elements for a graph belonging to a given model_id."""
        self.fill_data_elements(model_id, graph, graph_info)
        self.stop_if_errors()
        filename = f"{self.names.ROOT_FILENAME_DATAELEMENTS}_{model_id}.json"
        self.write_json_file(self.append.data_elements, filename)

    def stop_if_errors(self) -> None:
        """Throw RunTimeError if any errors."""
        if self.errors:
            error_string = "\n".join(self.errors)
            message = f"Errors found:\n{error_string}"
            raise RuntimeError(message)

    def build_time_vectors(self) -> None:
        """Write json file with time vector data elements and csv file for each unique time index."""
        self.append.data_elements = []
        for i, timeindex in enumerate(self.timevectors, start=1):
            time_index_id = f"timeindex_{i}"

            milliseconds: float = timeindex.get_period_duration().total_seconds() * 1000.0
            time_delta_id = f"{time_index_id}_timedelta"
            self.append.ms_time_delta(time_delta_id, milliseconds)

            self.append.range_time_index(
                time_index_id,
                timeindex.get_start_time(),
                timeindex.get_num_periods(),
                time_delta_id,
            )

            table_id = f"{time_index_id}_table"
            path_table, column_names = self.write_table(time_index_id, timeindex)
            self.append.base_table(table_id, path_table, column_names)

            is_one_year = timeindex.is_one_year()

            for column_name in column_names:
                time_vector_id = column_name
                time_values_id = f"{time_vector_id}_values"

                self.append.column_time_values(time_values_id, table_id, column_name)

                if is_one_year:
                    self.append.one_year_time_vector(time_vector_id, time_index_id, time_values_id)
                else:
                    self.append.rotating_time_vector(time_vector_id, time_index_id, time_values_id)

        self.write_json_file(self.append.data_elements, self.names.FILENAME_DATAELEMENTS_TIMEVECTORS)

    def get_time_index_id(self, timeindex: FixedFrequencyTimeIndex) -> str:
        """Return id that works in file name."""
        type_name = type(timeindex).__name__
        num_periods = timeindex.get_num_periods()
        resolution = int(timeindex.get_period_duration().total_seconds() * 1000.0)
        is_52 = timeindex.is_52_week_years()
        extr_first = timeindex.extrapolate_first_point()
        extr_last = timeindex.extrapolate_last_point()
        return f"timeindex_{type_name}_periods_{num_periods}_ms_{resolution}_{is_52}_{extr_first}_{extr_last}"

    def write_table(
        self,
        time_index_id: str,
        timeindex: FixedFrequencyTimeIndex,
    ) -> tuple[Path, list[str]]:
        """Write all vectors corresponding to time_index_id to a csv file."""
        vectors = self.timevectors[timeindex]
        column_names = list(vectors.keys())
        matrix = np.column_stack([vectors[c] for c in column_names])
        matrix = np.round(matrix, decimals=6)  # quick fix for negative values in time vectors
        # if negative values give warning
        if np.any(matrix < 0):
            self.errors.add(f"Negative values found in time vector for {time_index_id}. This might cause issues.")
        filename = f"timevector_{time_index_id}.csv"
        path = self.folder / filename
        np.savetxt(path, matrix, delimiter=",")
        return path, column_names

    def build_storage_mapping(
        self,
        graph_info: dict[str, ComponentInfo],
    ) -> None:
        """Write the mapping of storages from Clearing to Aggregated Model to json."""
        data = {
            k: info.agg_storage_node_id
            for k, info in graph_info.items()
            if info.is_storage_node and info.agg_storage_node_id
        }
        self.write_json_file(data, self.names.FILENAME_STORAGE_MAPPING)

    def fill_data_elements(
        self,
        model_id: str,
        graph: dict[str, Flow | Node],
        graph_info: dict[str, ComponentInfo],
    ) -> None:
        """Reset and fill self.append.data_elements with data element json data."""
        self.append.data_elements = []  # Important to reset the list

        nodes: dict[str, Node] = {k: v for k, v in graph.items() if isinstance(v, Node)}
        flows: dict[str, Flow] = {k: v for k, v in graph.items() if isinstance(v, Flow)}

        exogenous_nodes = {k: v for k, v in nodes.items() if v.is_exogenous()}
        exogenous_flows = {k: v for k, v in flows.items() if v.is_exogenous()}
        endogenous_nodes = {k: v for k, v in nodes.items() if not v.is_exogenous()}
        endogenous_flows = {k: v for k, v in flows.items() if not v.is_exogenous()}

        t = time()
        self.add_exogenous_nodes(exogenous_nodes, graph_info)
        self.send_debug_event(f"add_exogenous_nodes time: {round(time() - t, 2)} seconds")
        t = time()
        self.add_exogenous_flows(exogenous_flows, graph_info)
        self.send_debug_event(f"add_exogenous_flows time: {round(time() - t, 2)} seconds")
        t = time()
        self.add_endogenous_nodes(endogenous_nodes, graph_info, model_id)
        self.send_debug_event(f"add_endogenous_nodes time: {round(time() - t, 2)} seconds")
        t = time()
        self.add_endogenous_flows(endogenous_flows, graph_info)
        self.send_debug_event(f"add_endogenous_flows time: {round(time() - t, 2)} seconds")

        t = time()
        self.add_dummy_exogenous_balance()
        self.send_debug_event(f"add_dummy_exogenous_balance time: {round(time() - t, 2)} seconds")

    def add_dummy_exogenous_balance(self) -> None:
        """Add a dummy exogenous Node for JulES."""
        balance_id = "PowerBalance_DummyNode"
        profile_id = f"{balance_id}_Profile"
        # Find longest name.
        longest_name = ""
        for name in self.graphs.clearing:
            if len(name) > len(longest_name):
                longest_name = name
        if len(longest_name) >= len(balance_id):
            unique_postfix = "_" + "x" * (len(longest_name) - len(balance_id))  # Fill to guarantee uniqueness.
            balance_id += unique_postfix
            profile_id += unique_postfix

        # Set balance and profile ids in names so other parts of system have access to them.
        self.names.dummy_exogenous_balance_name = balance_id
        self.names.dummy_exogenous_profile_id = profile_id

        price_param_id = f"{balance_id}_price_param"
        self.append.exogenous_balance(balance_id, self.names.MARKET, price_param_id)
        self.append.mean_series_param(price_param_id, 1.0, profile_id)

        # Set index and vector so they are added to the dataset in build_time_vectors.

        first_scenario_year, num_scenario_years = self.config.get_weather_years()

        period_duration = timedelta(minutes=self.config.get_time_resolution().get_clearing_market_minutes())

        dummy_timeindex = ProfileTimeIndex(
            start_year=first_scenario_year,
            num_years=num_scenario_years,
            period_duration=self._get_closest_valid_profile_duration(period_duration),
            is_52_week_years=True,
        )

        if dummy_timeindex not in self.timevectors:
            default_vector = np.arange(0, dummy_timeindex.get_num_periods(), 1, dtype=np.float64)
            np.divide(default_vector, default_vector.max(), out=default_vector)

            self.timevectors[dummy_timeindex] = {profile_id: default_vector}

    def add_exogenous_nodes(
        self,
        exogenous_nodes: dict[str, Node],
        graph_info: dict[str, ComponentInfo],
    ) -> None:
        """Append exogenous balance related data elements for exogenous node."""
        for node_id, node in exogenous_nodes.items():
            info = graph_info[node_id]

            balance_id = info.jules_balance_id

            price_param_id = f"{balance_id}_price_param"

            self.append.exogenous_balance(balance_id, info.jules_commodity, price_param_id)

            price: Price = node.get_price()

            if not price.has_level():
                message = f"Node {node_id} is exogenous but has not price."
                raise RuntimeError(message)


            if price.has_profile():
                units = get_units_from_expr(self.db, price.get_profile())
                if units:
                    message = f"Node {node_id} has exogenous price profile with units {units}."
                    raise RuntimeError(message)

            level = self.get_price_level(price_param_id, price, info)
            profile = self.get_price_profile(price_param_id, price, info)

            self.append.mean_series_param(price_param_id, level, profile)

    def add_endogenous_nodes(
        self,
        endogenous_nodes: dict[str, Node],
        graph_info: dict[str, ComponentInfo],
        model_id: str,
    ) -> None:
        """Append endogenous balance related data elements for endogenous node."""
        for node_id, node in endogenous_nodes.items():
            info = graph_info[node_id]

            self.append.endogenous_balance(info.jules_balance_id, info.jules_commodity)

            storage = node.get_storage()
            if storage is not None:
                self.add_storage(model_id, storage, info)

    def add_exogenous_flows(
        self,
        exogenous_flows: dict[str, Flow],
        graph_info: dict[str, ComponentInfo],
    ) -> None:
        """Append data elements related to an exogenous flow."""
        for flow_id, flow in exogenous_flows.items():
            for arrow in flow.get_arrows():
                node_info = graph_info[arrow.get_node()]
                if not node_info.is_exogenous:
                    self.add_rhs_term(flow_id, flow, arrow, node_info, graph_info[flow_id])

    def add_endogenous_flows(
        self,
        endogenous_flows: dict[str, Flow],
        graph_info: dict[str, ComponentInfo],
    ) -> None:
        """Append data elements for endogenous flows and related attributes."""
        for flow_id, flow in endogenous_flows.items():
            self.append.base_flow(flow_id)

            flow_info = graph_info[flow_id]

            self.add_flow_lower_bound(flow_id, flow, flow_info)
            self.add_flow_upper_bound(flow_id, flow, flow_info)
            self.add_flow_arrows(flow_id, flow, graph_info)
            self.add_flow_costs(flow_id, flow, flow_info)

    def add_storage(
        self,
        model_id: str,
        storage: Storage,
        info: ComponentInfo,
    ) -> None:
        """Append data elements related to a storage."""

        storage_id = info.jules_storage_id
        balance_id = info.jules_balance_id

        self.append.base_storage(storage_id, balance_id)

        capacity = storage.get_capacity()

        self.add_positive_capacity(storage_id, info, capacity, f"{storage_id}_upper_bound", False)

        self.append.lower_zero_capacity(f"{storage_id}_lower_bound", info.is_flow, storage_id)

        if model_id == self.names.CLEARING:
            self.append.global_eneq(info.jules_global_eneq_id, info.jules_balance_id, info.sss_global_eneq_value)

    def add_rhs_term(
        self,
        flow_id: str,
        flow: Flow,
        arrow: Arrow,
        node_info: ComponentInfo,
        flow_info: ComponentInfo,
    ) -> None:
        """Append data elements related to rhs term."""
        node_id = arrow.get_node()

        rhs_term_id = f"exogenous_flow_{flow_id}_{node_id}"
        unit_param_id = f"{rhs_term_id}_unit_param"
        series_param_id = f"{rhs_term_id}_series_param"
        balance_id = node_info.jules_balance_id

        self.append.base_rhs_term(rhs_term_id, balance_id, arrow.is_ingoing(), unit_param_id)

        level = self.get_rhs_term_level(rhs_term_id, flow, arrow, flow_info)

        profile = self.get_rhs_term_profile(rhs_term_id, flow, arrow, flow_info)

        # unit is actually flipped from main to target node
        # using conversion factor inside get_rhs_term_level
        # so the jules unit param might say GWh but the values
        # have been converted so that the result will become e.g. Mm3

        self.append.unit_param(unit_param_id, series_param_id, flow_info)
        self.append.mean_series_param(series_param_id, level, profile)

    def add_flow_lower_bound(
        self,
        flow_id: str,
        flow: Flow,
        flow_info: ComponentInfo,
    ) -> None:
        """Append lower bound related data elements for a flow."""
        capacity = flow.get_min_capacity()
        bound_id = f"{flow_id}_lower_bound"
        profile = None if capacity is None else capacity.get_profile()
        if profile is None:
            self.append.lower_zero_capacity(bound_id, flow_info.is_flow, flow_or_storage_id=flow_id)
            return
        self.add_positive_capacity(flow_id, flow_info, capacity, bound_id, is_lower_bound=True)

    def add_flow_upper_bound(
        self,
        flow_id: str,
        flow: Flow,
        flow_info: ComponentInfo,
    ) -> None:
        """Append upper bound related data elements for a flow."""
        capacity = flow.get_max_capacity()
        if capacity is None:
            return
        bound_id = f"{flow_id}_upper_bound"
        self.add_positive_capacity(flow_id, flow_info, capacity, bound_id, is_lower_bound=False)

    def add_flow_arrows(
        self,
        flow_id: str,
        flow: Flow,
        graph_info: dict[str, ComponentInfo],
    ) -> None:
        """Append arrow related data elements for each arrow in flow."""

        flow_info = graph_info[flow_id]
        for arrow in flow.get_arrows():
            assert arrow.has_profile() is False, "Currently not supported, will be implemented later"  

            arrow_id = f"{flow_id}_arrow_{arrow.get_node()}->{flow_info.main_node_id}"

            level = self.get_coefficient_level(arrow_id, arrow, flow_info)

            balance_id = graph_info[arrow.get_node()].jules_balance_id

            self.append.base_arrow(arrow_id, flow_id, balance_id, arrow.is_ingoing(), level)

    def add_flow_costs(
        self,
        flow_id: str,
        flow: Flow,
        flow_info: ComponentInfo,
    ) -> None:
        """Append cost data element for each cost in flow."""
        cost_terms = flow.get_cost_terms()
        for cost_term_id, cost_term in cost_terms.items():
            level = self.get_cost_term_level(cost_term_id, cost_term, flow_info)

            has_profile = cost_term.get_profile() is None

            profile = self.get_cost_term_profile(cost_term_id, cost_term, flow_info) if has_profile else 1.0

            extended_cost_term_id = f"{flow_id}_{cost_term_id}"
            param_id = f"{extended_cost_term_id}_param"
            self.append.cost_term(extended_cost_term_id, flow_id, flow_info.is_flow, cost_term.is_cost(), param_id)

            self.append.mean_series_param(param_id, level, profile)

    def add_positive_capacity(
        self,
        flow_or_storage_id: str,
        info: ComponentInfo,
        capacity: FlowVolume | StockVolume,
        bound_id: str,
        is_lower_bound: bool,
    ) -> None:
        """Append data elements related to positive capacity."""
        series_param_id = f"{bound_id}_series_param"

        if isinstance(capacity, FlowVolume):
            unit_param_id = f"{bound_id}_unit_param"
            self.append.unit_param(unit_param_id, series_param_id, info)
            self.append.positive_capacity(bound_id, info.is_flow, flow_or_storage_id, is_lower_bound, unit_param_id)
        else:
            assert isinstance(capacity, StockVolume)
            self.append.positive_capacity(bound_id, info.is_flow, flow_or_storage_id, is_lower_bound, series_param_id)

        level = self.get_capacity_level(series_param_id, capacity, info)
        profile = self.get_capacity_profile(series_param_id, capacity, info)

        self.append.mean_series_param(series_param_id, level, profile)

    def write_json_file(self, data: object, filename: str) -> None:
        """Write data to json."""
        with Path.open(self.folder / filename, "w") as f:
            json.dump(data, f, indent=self.names.JSON_INDENT)

    def _get_closest_valid_profile_duration(self, period_duration: timedelta) -> timedelta:
        input_seconds = period_duration.total_seconds()
        data = [h * 3600 for h in [168, 84, 56, 42, 28, 24, 21, 14, 12, 8, 7, 6, 4, 3, 2, 1]]
        for profile_seconds in data:
            if profile_seconds <= input_seconds:
                break
        return timedelta(seconds=profile_seconds)

    # Must be implemented for each simulation mode

    @abstractmethod
    def get_price_level(self, root_id: str, price: Price, info: ComponentInfo) -> str | float:
        """Query price level."""
        pass

    @abstractmethod
    def get_price_profile(self, root_id: str, price: Price, info: ComponentInfo) -> str | float:
        """Query price profile."""
        pass

    @abstractmethod
    def get_capacity_level(self, root_id: str, capacity: FlowVolume | StockVolume, info: ComponentInfo) -> str | float:
        """Query capacity level."""
        pass

    @abstractmethod
    def get_capacity_profile(
        self,
        root_id: str,
        capacity: FlowVolume | StockVolume,
        info: ComponentInfo,
    ) -> str | float:
        """Query capacity profile."""
        pass

    @abstractmethod
    def get_coefficient_level(self, root_id: str, arrow: Arrow, info: ComponentInfo) -> str | float:
        """Query arrow coefficient level."""
        pass

    @abstractmethod
    def get_cost_term_level(self, root_id: str, cost_term: Cost, info: ComponentInfo) -> str | float:
        """Query cost term level."""
        pass

    @abstractmethod
    def get_cost_term_profile(self, root_id: str, cost_term: Cost, info: ComponentInfo) -> str | float:
        """Query cost term profile."""
        pass

    @abstractmethod
    def get_rhs_term_level(
        self,
        rhs_term_id: str,
        flow: Flow,
        arrow: Arrow,
        flow_info: ComponentInfo,
    ) -> str | float:
        """Query rhs term level."""
        pass

    @abstractmethod
    def get_rhs_term_profile(
        self,
        rhs_term_id: str,
        flow: Flow,
        arrow: Arrow,
        flow_info: ComponentInfo,
    ) -> str | float:
        """Query rhs term profile."""
        pass
__init__(folder: Path, config: JulESConfig, names: JulESNames, domain_models: DomainModels, graphs: NodeFlowGraphs, graph_infos: GraphInfos, db: QueryDB) -> None

Initialize handler.

Use inputs passed down from SolveHandler

and create extra fields only relevant for the build phase.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(
    self,
    folder: Path,
    config: JulESConfig,
    names: JulESNames,
    domain_models: DomainModels,
    graphs: NodeFlowGraphs,
    graph_infos: GraphInfos,
    db: QueryDB,
) -> None:
    """Initialize handler.

    Use inputs passed down from SolveHandler

    and create extra fields only relevant for
    the build phase.
    """
    self.folder = folder
    self.config = config
    self.names = names
    self.domain_models = domain_models
    self.graphs = graphs
    self.graph_infos = graph_infos
    self.db = db

    self.append = DataElementAppender(names)

    self.errors: set[str] = set()
    self.timevectors: dict[FixedFrequencyTimeIndex, dict[str, NDArray]] = defaultdict(dict)
add_dummy_exogenous_balance() -> None

Add a dummy exogenous Node for JulES.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def add_dummy_exogenous_balance(self) -> None:
    """Add a dummy exogenous Node for JulES."""
    balance_id = "PowerBalance_DummyNode"
    profile_id = f"{balance_id}_Profile"
    # Find longest name.
    longest_name = ""
    for name in self.graphs.clearing:
        if len(name) > len(longest_name):
            longest_name = name
    if len(longest_name) >= len(balance_id):
        unique_postfix = "_" + "x" * (len(longest_name) - len(balance_id))  # Fill to guarantee uniqueness.
        balance_id += unique_postfix
        profile_id += unique_postfix

    # Set balance and profile ids in names so other parts of system have access to them.
    self.names.dummy_exogenous_balance_name = balance_id
    self.names.dummy_exogenous_profile_id = profile_id

    price_param_id = f"{balance_id}_price_param"
    self.append.exogenous_balance(balance_id, self.names.MARKET, price_param_id)
    self.append.mean_series_param(price_param_id, 1.0, profile_id)

    # Set index and vector so they are added to the dataset in build_time_vectors.

    first_scenario_year, num_scenario_years = self.config.get_weather_years()

    period_duration = timedelta(minutes=self.config.get_time_resolution().get_clearing_market_minutes())

    dummy_timeindex = ProfileTimeIndex(
        start_year=first_scenario_year,
        num_years=num_scenario_years,
        period_duration=self._get_closest_valid_profile_duration(period_duration),
        is_52_week_years=True,
    )

    if dummy_timeindex not in self.timevectors:
        default_vector = np.arange(0, dummy_timeindex.get_num_periods(), 1, dtype=np.float64)
        np.divide(default_vector, default_vector.max(), out=default_vector)

        self.timevectors[dummy_timeindex] = {profile_id: default_vector}
add_endogenous_flows(endogenous_flows: dict[str, Flow], graph_info: dict[str, ComponentInfo]) -> None

Append data elements for endogenous flows and related attributes.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
def add_endogenous_flows(
    self,
    endogenous_flows: dict[str, Flow],
    graph_info: dict[str, ComponentInfo],
) -> None:
    """Append data elements for endogenous flows and related attributes."""
    for flow_id, flow in endogenous_flows.items():
        self.append.base_flow(flow_id)

        flow_info = graph_info[flow_id]

        self.add_flow_lower_bound(flow_id, flow, flow_info)
        self.add_flow_upper_bound(flow_id, flow, flow_info)
        self.add_flow_arrows(flow_id, flow, graph_info)
        self.add_flow_costs(flow_id, flow, flow_info)
add_endogenous_nodes(endogenous_nodes: dict[str, Node], graph_info: dict[str, ComponentInfo], model_id: str) -> None

Append endogenous balance related data elements for endogenous node.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
def add_endogenous_nodes(
    self,
    endogenous_nodes: dict[str, Node],
    graph_info: dict[str, ComponentInfo],
    model_id: str,
) -> None:
    """Append endogenous balance related data elements for endogenous node."""
    for node_id, node in endogenous_nodes.items():
        info = graph_info[node_id]

        self.append.endogenous_balance(info.jules_balance_id, info.jules_commodity)

        storage = node.get_storage()
        if storage is not None:
            self.add_storage(model_id, storage, info)
add_exogenous_flows(exogenous_flows: dict[str, Flow], graph_info: dict[str, ComponentInfo]) -> None

Append data elements related to an exogenous flow.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
317
318
319
320
321
322
323
324
325
326
327
def add_exogenous_flows(
    self,
    exogenous_flows: dict[str, Flow],
    graph_info: dict[str, ComponentInfo],
) -> None:
    """Append data elements related to an exogenous flow."""
    for flow_id, flow in exogenous_flows.items():
        for arrow in flow.get_arrows():
            node_info = graph_info[arrow.get_node()]
            if not node_info.is_exogenous:
                self.add_rhs_term(flow_id, flow, arrow, node_info, graph_info[flow_id])
add_exogenous_nodes(exogenous_nodes: dict[str, Node], graph_info: dict[str, ComponentInfo]) -> None

Append exogenous balance related data elements for exogenous node.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def add_exogenous_nodes(
    self,
    exogenous_nodes: dict[str, Node],
    graph_info: dict[str, ComponentInfo],
) -> None:
    """Append exogenous balance related data elements for exogenous node."""
    for node_id, node in exogenous_nodes.items():
        info = graph_info[node_id]

        balance_id = info.jules_balance_id

        price_param_id = f"{balance_id}_price_param"

        self.append.exogenous_balance(balance_id, info.jules_commodity, price_param_id)

        price: Price = node.get_price()

        if not price.has_level():
            message = f"Node {node_id} is exogenous but has not price."
            raise RuntimeError(message)


        if price.has_profile():
            units = get_units_from_expr(self.db, price.get_profile())
            if units:
                message = f"Node {node_id} has exogenous price profile with units {units}."
                raise RuntimeError(message)

        level = self.get_price_level(price_param_id, price, info)
        profile = self.get_price_profile(price_param_id, price, info)

        self.append.mean_series_param(price_param_id, level, profile)
add_flow_arrows(flow_id: str, flow: Flow, graph_info: dict[str, ComponentInfo]) -> None

Append arrow related data elements for each arrow in flow.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
def add_flow_arrows(
    self,
    flow_id: str,
    flow: Flow,
    graph_info: dict[str, ComponentInfo],
) -> None:
    """Append arrow related data elements for each arrow in flow."""

    flow_info = graph_info[flow_id]
    for arrow in flow.get_arrows():
        assert arrow.has_profile() is False, "Currently not supported, will be implemented later"  

        arrow_id = f"{flow_id}_arrow_{arrow.get_node()}->{flow_info.main_node_id}"

        level = self.get_coefficient_level(arrow_id, arrow, flow_info)

        balance_id = graph_info[arrow.get_node()].jules_balance_id

        self.append.base_arrow(arrow_id, flow_id, balance_id, arrow.is_ingoing(), level)
add_flow_costs(flow_id: str, flow: Flow, flow_info: ComponentInfo) -> None

Append cost data element for each cost in flow.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
def add_flow_costs(
    self,
    flow_id: str,
    flow: Flow,
    flow_info: ComponentInfo,
) -> None:
    """Append cost data element for each cost in flow."""
    cost_terms = flow.get_cost_terms()
    for cost_term_id, cost_term in cost_terms.items():
        level = self.get_cost_term_level(cost_term_id, cost_term, flow_info)

        has_profile = cost_term.get_profile() is None

        profile = self.get_cost_term_profile(cost_term_id, cost_term, flow_info) if has_profile else 1.0

        extended_cost_term_id = f"{flow_id}_{cost_term_id}"
        param_id = f"{extended_cost_term_id}_param"
        self.append.cost_term(extended_cost_term_id, flow_id, flow_info.is_flow, cost_term.is_cost(), param_id)

        self.append.mean_series_param(param_id, level, profile)
add_flow_lower_bound(flow_id: str, flow: Flow, flow_info: ComponentInfo) -> None

Append lower bound related data elements for a flow.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
397
398
399
400
401
402
403
404
405
406
407
408
409
410
def add_flow_lower_bound(
    self,
    flow_id: str,
    flow: Flow,
    flow_info: ComponentInfo,
) -> None:
    """Append lower bound related data elements for a flow."""
    capacity = flow.get_min_capacity()
    bound_id = f"{flow_id}_lower_bound"
    profile = None if capacity is None else capacity.get_profile()
    if profile is None:
        self.append.lower_zero_capacity(bound_id, flow_info.is_flow, flow_or_storage_id=flow_id)
        return
    self.add_positive_capacity(flow_id, flow_info, capacity, bound_id, is_lower_bound=True)
add_flow_upper_bound(flow_id: str, flow: Flow, flow_info: ComponentInfo) -> None

Append upper bound related data elements for a flow.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
412
413
414
415
416
417
418
419
420
421
422
423
def add_flow_upper_bound(
    self,
    flow_id: str,
    flow: Flow,
    flow_info: ComponentInfo,
) -> None:
    """Append upper bound related data elements for a flow."""
    capacity = flow.get_max_capacity()
    if capacity is None:
        return
    bound_id = f"{flow_id}_upper_bound"
    self.add_positive_capacity(flow_id, flow_info, capacity, bound_id, is_lower_bound=False)
add_positive_capacity(flow_or_storage_id: str, info: ComponentInfo, capacity: FlowVolume | StockVolume, bound_id: str, is_lower_bound: bool) -> None

Append data elements related to positive capacity.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
def add_positive_capacity(
    self,
    flow_or_storage_id: str,
    info: ComponentInfo,
    capacity: FlowVolume | StockVolume,
    bound_id: str,
    is_lower_bound: bool,
) -> None:
    """Append data elements related to positive capacity."""
    series_param_id = f"{bound_id}_series_param"

    if isinstance(capacity, FlowVolume):
        unit_param_id = f"{bound_id}_unit_param"
        self.append.unit_param(unit_param_id, series_param_id, info)
        self.append.positive_capacity(bound_id, info.is_flow, flow_or_storage_id, is_lower_bound, unit_param_id)
    else:
        assert isinstance(capacity, StockVolume)
        self.append.positive_capacity(bound_id, info.is_flow, flow_or_storage_id, is_lower_bound, series_param_id)

    level = self.get_capacity_level(series_param_id, capacity, info)
    profile = self.get_capacity_profile(series_param_id, capacity, info)

    self.append.mean_series_param(series_param_id, level, profile)
add_rhs_term(flow_id: str, flow: Flow, arrow: Arrow, node_info: ComponentInfo, flow_info: ComponentInfo) -> None

Append data elements related to rhs term.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
def add_rhs_term(
    self,
    flow_id: str,
    flow: Flow,
    arrow: Arrow,
    node_info: ComponentInfo,
    flow_info: ComponentInfo,
) -> None:
    """Append data elements related to rhs term."""
    node_id = arrow.get_node()

    rhs_term_id = f"exogenous_flow_{flow_id}_{node_id}"
    unit_param_id = f"{rhs_term_id}_unit_param"
    series_param_id = f"{rhs_term_id}_series_param"
    balance_id = node_info.jules_balance_id

    self.append.base_rhs_term(rhs_term_id, balance_id, arrow.is_ingoing(), unit_param_id)

    level = self.get_rhs_term_level(rhs_term_id, flow, arrow, flow_info)

    profile = self.get_rhs_term_profile(rhs_term_id, flow, arrow, flow_info)

    # unit is actually flipped from main to target node
    # using conversion factor inside get_rhs_term_level
    # so the jules unit param might say GWh but the values
    # have been converted so that the result will become e.g. Mm3

    self.append.unit_param(unit_param_id, series_param_id, flow_info)
    self.append.mean_series_param(series_param_id, level, profile)
add_storage(model_id: str, storage: Storage, info: ComponentInfo) -> None

Append data elements related to a storage.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
def add_storage(
    self,
    model_id: str,
    storage: Storage,
    info: ComponentInfo,
) -> None:
    """Append data elements related to a storage."""

    storage_id = info.jules_storage_id
    balance_id = info.jules_balance_id

    self.append.base_storage(storage_id, balance_id)

    capacity = storage.get_capacity()

    self.add_positive_capacity(storage_id, info, capacity, f"{storage_id}_upper_bound", False)

    self.append.lower_zero_capacity(f"{storage_id}_lower_bound", info.is_flow, storage_id)

    if model_id == self.names.CLEARING:
        self.append.global_eneq(info.jules_global_eneq_id, info.jules_balance_id, info.sss_global_eneq_value)
build() -> None

Build input and configuration files for JulES.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def build(self) -> None:
    """Build input and configuration files for JulES."""
    t = time()
    self.build_data_elements(self.names.CLEARING, self.graphs.clearing, self.graph_infos.clearing)
    self.send_debug_event(f"build_data_elements clearing time: {round(time() - t, 2)} seconds")

    t = time()
    self.build_data_elements(self.names.AGGREGATED, self.graphs.short_term, self.graph_infos.short_term)
    self.send_debug_event(f"build_data_elements aggregated time: {round(time() - t, 2)} seconds")

    # Intent is to build data elements for aggregated models. Pending changes in Julia-JulES
    # self.build_data_elements(self.names.SHORT_TERM, self.graphs.short_term, self.graph_infos.short_term)
    # self.build_data_elements(self.names.MEDIUM_TERM, self.graphs.medium_term, self.graph_infos.medium_term)
    # self.build_data_elements(self.names.LONG_TERM, self.graphs.long_term, self.graph_infos.long_term)

    t = time()
    self.build_time_vectors()
    self.send_debug_event(f"build_time_vectors time: {round(time() - t, 2)} seconds")

    t = time()
    self.build_storage_mapping(self.graph_infos.clearing)
    self.send_debug_event(f"build_storage_mapping time: {round(time() - t, 2)} seconds")

    t = time()
    self.build_start_storage(self.names.FILENAME_START_STORAGES_CLEARING, self.graph_infos.clearing)
    self.send_debug_event(f"build_start_storage clearing time: {round(time() - t, 2)} seconds")

    t = time()
    self.build_start_storage(self.names.FILENAME_START_STORAGES_AGGREGATED, self.graph_infos.short_term)
    self.send_debug_event(f"build_start_storage aggregated time: {round(time() - t, 2)} seconds")
build_data_elements(model_id: str, graph: dict[str, Flow | Node], graph_info: dict[str, ComponentInfo]) -> None

Write json file with data elements for a graph belonging to a given model_id.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
 98
 99
100
101
102
103
104
105
106
107
108
def build_data_elements(
    self,
    model_id: str,
    graph: dict[str, Flow | Node],
    graph_info: dict[str, ComponentInfo],
) -> None:
    """Write json file with data elements for a graph belonging to a given model_id."""
    self.fill_data_elements(model_id, graph, graph_info)
    self.stop_if_errors()
    filename = f"{self.names.ROOT_FILENAME_DATAELEMENTS}_{model_id}.json"
    self.write_json_file(self.append.data_elements, filename)
build_start_storage(filename: str, graph_info: dict[str, ComponentInfo]) -> None

Write start storag json file to folder.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
87
88
89
90
91
92
93
94
95
96
def build_start_storage(
    self,
    filename: str,
    graph_info: dict[str, ComponentInfo],
) -> None:
    """Write start storag json file to folder."""
    data = {
        info.jules_storage_id: info.sss_initial_storage for k, info in graph_info.items() if info.is_storage_node
    }
    self.write_json_file(data, filename)
build_storage_mapping(graph_info: dict[str, ComponentInfo]) -> None

Write the mapping of storages from Clearing to Aggregated Model to json.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
181
182
183
184
185
186
187
188
189
190
191
def build_storage_mapping(
    self,
    graph_info: dict[str, ComponentInfo],
) -> None:
    """Write the mapping of storages from Clearing to Aggregated Model to json."""
    data = {
        k: info.agg_storage_node_id
        for k, info in graph_info.items()
        if info.is_storage_node and info.agg_storage_node_id
    }
    self.write_json_file(data, self.names.FILENAME_STORAGE_MAPPING)
build_time_vectors() -> None

Write json file with time vector data elements and csv file for each unique time index.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def build_time_vectors(self) -> None:
    """Write json file with time vector data elements and csv file for each unique time index."""
    self.append.data_elements = []
    for i, timeindex in enumerate(self.timevectors, start=1):
        time_index_id = f"timeindex_{i}"

        milliseconds: float = timeindex.get_period_duration().total_seconds() * 1000.0
        time_delta_id = f"{time_index_id}_timedelta"
        self.append.ms_time_delta(time_delta_id, milliseconds)

        self.append.range_time_index(
            time_index_id,
            timeindex.get_start_time(),
            timeindex.get_num_periods(),
            time_delta_id,
        )

        table_id = f"{time_index_id}_table"
        path_table, column_names = self.write_table(time_index_id, timeindex)
        self.append.base_table(table_id, path_table, column_names)

        is_one_year = timeindex.is_one_year()

        for column_name in column_names:
            time_vector_id = column_name
            time_values_id = f"{time_vector_id}_values"

            self.append.column_time_values(time_values_id, table_id, column_name)

            if is_one_year:
                self.append.one_year_time_vector(time_vector_id, time_index_id, time_values_id)
            else:
                self.append.rotating_time_vector(time_vector_id, time_index_id, time_values_id)

    self.write_json_file(self.append.data_elements, self.names.FILENAME_DATAELEMENTS_TIMEVECTORS)
fill_data_elements(model_id: str, graph: dict[str, Flow | Node], graph_info: dict[str, ComponentInfo]) -> None

Reset and fill self.append.data_elements with data element json data.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def fill_data_elements(
    self,
    model_id: str,
    graph: dict[str, Flow | Node],
    graph_info: dict[str, ComponentInfo],
) -> None:
    """Reset and fill self.append.data_elements with data element json data."""
    self.append.data_elements = []  # Important to reset the list

    nodes: dict[str, Node] = {k: v for k, v in graph.items() if isinstance(v, Node)}
    flows: dict[str, Flow] = {k: v for k, v in graph.items() if isinstance(v, Flow)}

    exogenous_nodes = {k: v for k, v in nodes.items() if v.is_exogenous()}
    exogenous_flows = {k: v for k, v in flows.items() if v.is_exogenous()}
    endogenous_nodes = {k: v for k, v in nodes.items() if not v.is_exogenous()}
    endogenous_flows = {k: v for k, v in flows.items() if not v.is_exogenous()}

    t = time()
    self.add_exogenous_nodes(exogenous_nodes, graph_info)
    self.send_debug_event(f"add_exogenous_nodes time: {round(time() - t, 2)} seconds")
    t = time()
    self.add_exogenous_flows(exogenous_flows, graph_info)
    self.send_debug_event(f"add_exogenous_flows time: {round(time() - t, 2)} seconds")
    t = time()
    self.add_endogenous_nodes(endogenous_nodes, graph_info, model_id)
    self.send_debug_event(f"add_endogenous_nodes time: {round(time() - t, 2)} seconds")
    t = time()
    self.add_endogenous_flows(endogenous_flows, graph_info)
    self.send_debug_event(f"add_endogenous_flows time: {round(time() - t, 2)} seconds")

    t = time()
    self.add_dummy_exogenous_balance()
    self.send_debug_event(f"add_dummy_exogenous_balance time: {round(time() - t, 2)} seconds")
get_capacity_level(root_id: str, capacity: FlowVolume | StockVolume, info: ComponentInfo) -> str | float abstractmethod

Query capacity level.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
515
516
517
518
@abstractmethod
def get_capacity_level(self, root_id: str, capacity: FlowVolume | StockVolume, info: ComponentInfo) -> str | float:
    """Query capacity level."""
    pass
get_capacity_profile(root_id: str, capacity: FlowVolume | StockVolume, info: ComponentInfo) -> str | float abstractmethod

Query capacity profile.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
520
521
522
523
524
525
526
527
528
@abstractmethod
def get_capacity_profile(
    self,
    root_id: str,
    capacity: FlowVolume | StockVolume,
    info: ComponentInfo,
) -> str | float:
    """Query capacity profile."""
    pass
get_coefficient_level(root_id: str, arrow: Arrow, info: ComponentInfo) -> str | float abstractmethod

Query arrow coefficient level.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
530
531
532
533
@abstractmethod
def get_coefficient_level(self, root_id: str, arrow: Arrow, info: ComponentInfo) -> str | float:
    """Query arrow coefficient level."""
    pass
get_cost_term_level(root_id: str, cost_term: Cost, info: ComponentInfo) -> str | float abstractmethod

Query cost term level.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
535
536
537
538
@abstractmethod
def get_cost_term_level(self, root_id: str, cost_term: Cost, info: ComponentInfo) -> str | float:
    """Query cost term level."""
    pass
get_cost_term_profile(root_id: str, cost_term: Cost, info: ComponentInfo) -> str | float abstractmethod

Query cost term profile.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
540
541
542
543
@abstractmethod
def get_cost_term_profile(self, root_id: str, cost_term: Cost, info: ComponentInfo) -> str | float:
    """Query cost term profile."""
    pass
get_price_level(root_id: str, price: Price, info: ComponentInfo) -> str | float abstractmethod

Query price level.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
505
506
507
508
@abstractmethod
def get_price_level(self, root_id: str, price: Price, info: ComponentInfo) -> str | float:
    """Query price level."""
    pass
get_price_profile(root_id: str, price: Price, info: ComponentInfo) -> str | float abstractmethod

Query price profile.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
510
511
512
513
@abstractmethod
def get_price_profile(self, root_id: str, price: Price, info: ComponentInfo) -> str | float:
    """Query price profile."""
    pass
get_rhs_term_level(rhs_term_id: str, flow: Flow, arrow: Arrow, flow_info: ComponentInfo) -> str | float abstractmethod

Query rhs term level.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
545
546
547
548
549
550
551
552
553
554
@abstractmethod
def get_rhs_term_level(
    self,
    rhs_term_id: str,
    flow: Flow,
    arrow: Arrow,
    flow_info: ComponentInfo,
) -> str | float:
    """Query rhs term level."""
    pass
get_rhs_term_profile(rhs_term_id: str, flow: Flow, arrow: Arrow, flow_info: ComponentInfo) -> str | float abstractmethod

Query rhs term profile.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
556
557
558
559
560
561
562
563
564
565
@abstractmethod
def get_rhs_term_profile(
    self,
    rhs_term_id: str,
    flow: Flow,
    arrow: Arrow,
    flow_info: ComponentInfo,
) -> str | float:
    """Query rhs term profile."""
    pass
get_time_index_id(timeindex: FixedFrequencyTimeIndex) -> str

Return id that works in file name.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
153
154
155
156
157
158
159
160
161
def get_time_index_id(self, timeindex: FixedFrequencyTimeIndex) -> str:
    """Return id that works in file name."""
    type_name = type(timeindex).__name__
    num_periods = timeindex.get_num_periods()
    resolution = int(timeindex.get_period_duration().total_seconds() * 1000.0)
    is_52 = timeindex.is_52_week_years()
    extr_first = timeindex.extrapolate_first_point()
    extr_last = timeindex.extrapolate_last_point()
    return f"timeindex_{type_name}_periods_{num_periods}_ms_{resolution}_{is_52}_{extr_first}_{extr_last}"
stop_if_errors() -> None

Throw RunTimeError if any errors.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
110
111
112
113
114
115
def stop_if_errors(self) -> None:
    """Throw RunTimeError if any errors."""
    if self.errors:
        error_string = "\n".join(self.errors)
        message = f"Errors found:\n{error_string}"
        raise RuntimeError(message)
write_json_file(data: object, filename: str) -> None

Write data to json.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
490
491
492
493
def write_json_file(self, data: object, filename: str) -> None:
    """Write data to json."""
    with Path.open(self.folder / filename, "w") as f:
        json.dump(data, f, indent=self.names.JSON_INDENT)
write_table(time_index_id: str, timeindex: FixedFrequencyTimeIndex) -> tuple[Path, list[str]]

Write all vectors corresponding to time_index_id to a csv file.

Source code in framjules/solve_handler/build_handler/BuildHandler.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def write_table(
    self,
    time_index_id: str,
    timeindex: FixedFrequencyTimeIndex,
) -> tuple[Path, list[str]]:
    """Write all vectors corresponding to time_index_id to a csv file."""
    vectors = self.timevectors[timeindex]
    column_names = list(vectors.keys())
    matrix = np.column_stack([vectors[c] for c in column_names])
    matrix = np.round(matrix, decimals=6)  # quick fix for negative values in time vectors
    # if negative values give warning
    if np.any(matrix < 0):
        self.errors.add(f"Negative values found in time vector for {time_index_id}. This might cause issues.")
    filename = f"timevector_{time_index_id}.csv"
    path = self.folder / filename
    np.savetxt(path, matrix, delimiter=",")
    return path, column_names
DataElementAppender

Functionality to reate data element to JulES.

DataElementAppender

Used to generate list of data elements for JulES.

Source code in framjules/solve_handler/build_handler/DataElementAppender.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
class DataElementAppender:
    """Used to generate list of data elements for JulES."""

    def __init__(self, names: JulESNames) -> None:
        """Initialize new DataElementAppender."""
        self.data_elements: list = []
        self.names: JulESNames = names

    def exogenous_balance(
        self,
        balance_id: str,
        commodiy: str,
        price_param_id: str | float,
    ) -> None:
        """Append exogenous balance data element."""
        self.data_elements.append(
            [
                self.names.BALANCE,
                self.names.EXOGENBALANCE,
                balance_id,
                [self.names.COMMODITY, commodiy],
                [self.names.PRICE, price_param_id],
            ],
        )

    def endogenous_balance(
        self,
        balance_id: str,
        commodiy: str,
    ) -> None:
        """Append endogenous balance data element."""
        self.data_elements.append(
            [
                self.names.BALANCE,
                self.names.BASEBALANCE,
                balance_id,
                [self.names.COMMODITY, commodiy],
            ],
        )

    def mean_series_param(
        self,
        param_id: str,
        level: str | float,
        profile: str | float,
    ) -> None:
        """Append mean series param data element."""
        self.data_elements.append(
            [
                self.names.PARAM,
                self.names.MEANSERIESPARAM,
                param_id,
                [self.names.LEVEL, level],
                [self.names.PROFILE, profile],
            ],
        )

    def base_flow(self, flow_id: str) -> None:
        """Append base flow data element."""
        self.data_elements.append([self.names.FLOW, self.names.BASEFLOW, flow_id])

    def lower_zero_capacity(
        self,
        lower_bound_id: str,
        is_flow: bool,
        flow_or_storage_id: str,
    ) -> None:
        """Append zero lower capacity data element."""
        self.data_elements.append(
            [
                self.names.CAPACITY,
                self.names.LOWERZEROCAPACITY,
                lower_bound_id,
                [self.names.WHICHCONCEPT, self.names.FLOW if is_flow else self.names.STORAGE],
                [self.names.WHICHINSTANCE, flow_or_storage_id],
            ],
        )

    def positive_capacity(
        self,
        bound_id: str,
        is_flow: bool,
        flow_or_storage_id: str,
        is_lower_bound: bool,
        param_id: str,
    ) -> None:
        """Append positive capacity data element."""
        self.data_elements.append(
            [
                self.names.CAPACITY,
                self.names.POSITIVECAPACITY,
                bound_id,
                [self.names.WHICHCONCEPT, self.names.FLOW if is_flow else self.names.STORAGE],
                [self.names.WHICHINSTANCE, flow_or_storage_id],
                [self.names.BOUNDKEY, self.names.BOUNDLOWER if is_lower_bound else self.names.BOUNDUPPER],
                [self.names.PARAM, param_id],
            ],
        )

    def unit_param(
        self,
        unit_param_id: str,
        series_param_id: str,
        info: ComponentInfo,
    ) -> None:
        """Append unit param data element."""
        self.data_elements.append(
            [
                self.names.PARAM,
                info.unit_param_type,
                unit_param_id,
                [self.names.PARAM, series_param_id],
            ],
        )

    def base_arrow(
        self,
        arrow_id: str,
        flow_id: str,
        balance_id: str,
        is_ingoing: bool,
        conversion: str | float,
    ) -> None:
        """Append base arrow data element."""
        self.data_elements.append(
            [
                self.names.ARROW,
                self.names.BASEARROW,
                arrow_id,
                [self.names.FLOW, flow_id],
                [self.names.BALANCE, balance_id],
                [self.names.DIRECTIONKEY, self.names.DIRECTIONIN if is_ingoing else self.names.DIRECTIONOUT],
                [self.names.CONVERSION, conversion],
            ],
        )

    def cost_term(
        self,
        cost_term_id: str,
        flow_or_storage_id: str,
        is_flow: str,
        is_cost: bool,
        cost: str | float,
    ) -> None:
        """Append cost term data element."""
        self.data_elements.append(
            [
                self.names.COST,
                self.names.COSTTERM,
                cost_term_id,
                [self.names.DIRECTIONKEY, self.names.DIRECTIONIN if is_cost else self.names.DIRECTIONOUT],
                [self.names.PARAM, cost],
                [self.names.WHICHCONCEPT, self.names.FLOW if is_flow else self.names.STORAGE],
                [self.names.WHICHINSTANCE, flow_or_storage_id],
            ],
        )

    def base_rhs_term(
        self,
        rhs_term_id: str,
        balance_id: str,
        is_ingoing: bool,
        unit_param_id: str,
    ) -> None:
        """Append base rhs term data element."""
        # TODO: Add residualhint
        self.data_elements.append(
            [
                self.names.RHSTERM,
                self.names.BASERHSTERM,
                rhs_term_id,
                [self.names.BALANCE, balance_id],
                [self.names.DIRECTIONKEY, self.names.DIRECTIONIN if is_ingoing else self.names.DIRECTIONOUT],
                [self.names.PARAM, unit_param_id],
            ],
        )

    def base_storage(
        self,
        storage_id: str,
        balance_id: str,
    ) -> None:
        """Append base storage data element."""
        self.data_elements.append(
            [
                self.names.STORAGE,
                self.names.BASESTORAGE,
                storage_id,
                [self.names.BALANCE, balance_id],
            ],
        )

    def global_eneq(
        self,
        global_eneq_id: str,
        balance_id: str,
        value: float,
    ) -> None:
        """Append global energy equivalent data element."""
        self.data_elements.append(
            [
                self.names.METADATA,
                self.names.GLOBALENEQ,
                global_eneq_id,
                [self.names.BALANCE, balance_id],
                [self.names.VALUE, value],
            ],
        )

    def ms_time_delta(
        self,
        time_delta_id: str,
        milliseconds: float,
    ) -> None:
        """Append ms time delta data element."""
        self.data_elements.append(
            [
                self.names.TIMEDELTA,
                self.names.MSTIMEDELTA,
                time_delta_id,
                [self.names.PERIOD, milliseconds],
            ],
        )

    def range_time_index(
        self,
        time_index_id: str,
        start_time: datetime,
        num_steps: int,
        time_delta_id: str,
    ) -> None:
        """Append range time index data element."""
        self.data_elements.append(
            [
                self.names.TIMEINDEX,
                self.names.RANGETIMEINDEX,
                time_index_id,
                [self.names.START, start_time.strftime(r"%Y-%m-%d %H:%M:%S")],
                [self.names.STEPS, num_steps],
                [self.names.DELTA, time_delta_id],
            ],
        )

    def base_table(
        self,
        table_id: str,
        path_table: str | Path,
        column_names: list[str],
    ) -> None:
        """Append base table data element."""
        self.data_elements.append(
            [
                self.names.TABLE,
                self.names.BASETABLE,
                table_id,
                [self.names.MATRIX, str(path_table)],
                [self.names.NAMES, column_names],
            ],
        )

    def column_time_values(
        self,
        time_values_id: str,
        table_id: str,
        column_id: str,
    ) -> None:
        """Append column time values data element."""
        self.data_elements.append(
            [
                self.names.TIMEVALUES,
                self.names.COLUMNTIMEVALUES,
                time_values_id,
                [self.names.TABLE, table_id],
                [self.names.NAME, column_id],
            ],
        )

    def rotating_time_vector(
        self,
        time_vector_id: str,
        time_index_id: str,
        time_values_id: str,
    ) -> None:
        """Append rotating time vector data element."""
        self.data_elements.append(
            [
                self.names.TIMEVECTOR,
                self.names.ROTATINGTIMEVECTOR,
                time_vector_id,
                [self.names.TIMEINDEX, time_index_id],
                [self.names.TIMEVALUES, time_values_id],
            ],
        )

    def one_year_time_vector(
        self,
        time_vector_id: str,
        time_index_id: str,
        time_values_id: str,
    ) -> None:
        """Append one year time vector data element."""
        self.data_elements.append(
            [
                self.names.TIMEVECTOR,
                self.names.ONEYEARTIMEVECTOR,
                time_vector_id,
                [self.names.TIMEINDEX, time_index_id],
                [self.names.TIMEVALUES, time_values_id],
            ],
        )
__init__(names: JulESNames) -> None

Initialize new DataElementAppender.

Source code in framjules/solve_handler/build_handler/DataElementAppender.py
13
14
15
16
def __init__(self, names: JulESNames) -> None:
    """Initialize new DataElementAppender."""
    self.data_elements: list = []
    self.names: JulESNames = names
base_arrow(arrow_id: str, flow_id: str, balance_id: str, is_ingoing: bool, conversion: str | float) -> None

Append base arrow data element.

Source code in framjules/solve_handler/build_handler/DataElementAppender.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def base_arrow(
    self,
    arrow_id: str,
    flow_id: str,
    balance_id: str,
    is_ingoing: bool,
    conversion: str | float,
) -> None:
    """Append base arrow data element."""
    self.data_elements.append(
        [
            self.names.ARROW,
            self.names.BASEARROW,
            arrow_id,
            [self.names.FLOW, flow_id],
            [self.names.BALANCE, balance_id],
            [self.names.DIRECTIONKEY, self.names.DIRECTIONIN if is_ingoing else self.names.DIRECTIONOUT],
            [self.names.CONVERSION, conversion],
        ],
    )
base_flow(flow_id: str) -> None

Append base flow data element.

Source code in framjules/solve_handler/build_handler/DataElementAppender.py
67
68
69
def base_flow(self, flow_id: str) -> None:
    """Append base flow data element."""
    self.data_elements.append([self.names.FLOW, self.names.BASEFLOW, flow_id])
base_rhs_term(rhs_term_id: str, balance_id: str, is_ingoing: bool, unit_param_id: str) -> None

Append base rhs term data element.

Source code in framjules/solve_handler/build_handler/DataElementAppender.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def base_rhs_term(
    self,
    rhs_term_id: str,
    balance_id: str,
    is_ingoing: bool,
    unit_param_id: str,
) -> None:
    """Append base rhs term data element."""
    # TODO: Add residualhint
    self.data_elements.append(
        [
            self.names.RHSTERM,
            self.names.BASERHSTERM,
            rhs_term_id,
            [self.names.BALANCE, balance_id],
            [self.names.DIRECTIONKEY, self.names.DIRECTIONIN if is_ingoing else self.names.DIRECTIONOUT],
            [self.names.PARAM, unit_param_id],
        ],
    )
base_storage(storage_id: str, balance_id: str) -> None

Append base storage data element.

Source code in framjules/solve_handler/build_handler/DataElementAppender.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def base_storage(
    self,
    storage_id: str,
    balance_id: str,
) -> None:
    """Append base storage data element."""
    self.data_elements.append(
        [
            self.names.STORAGE,
            self.names.BASESTORAGE,
            storage_id,
            [self.names.BALANCE, balance_id],
        ],
    )
base_table(table_id: str, path_table: str | Path, column_names: list[str]) -> None

Append base table data element.

Source code in framjules/solve_handler/build_handler/DataElementAppender.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def base_table(
    self,
    table_id: str,
    path_table: str | Path,
    column_names: list[str],
) -> None:
    """Append base table data element."""
    self.data_elements.append(
        [
            self.names.TABLE,
            self.names.BASETABLE,
            table_id,
            [self.names.MATRIX, str(path_table)],
            [self.names.NAMES, column_names],
        ],
    )
column_time_values(time_values_id: str, table_id: str, column_id: str) -> None

Append column time values data element.

Source code in framjules/solve_handler/build_handler/DataElementAppender.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def column_time_values(
    self,
    time_values_id: str,
    table_id: str,
    column_id: str,
) -> None:
    """Append column time values data element."""
    self.data_elements.append(
        [
            self.names.TIMEVALUES,
            self.names.COLUMNTIMEVALUES,
            time_values_id,
            [self.names.TABLE, table_id],
            [self.names.NAME, column_id],
        ],
    )
cost_term(cost_term_id: str, flow_or_storage_id: str, is_flow: str, is_cost: bool, cost: str | float) -> None

Append cost term data element.

Source code in framjules/solve_handler/build_handler/DataElementAppender.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def cost_term(
    self,
    cost_term_id: str,
    flow_or_storage_id: str,
    is_flow: str,
    is_cost: bool,
    cost: str | float,
) -> None:
    """Append cost term data element."""
    self.data_elements.append(
        [
            self.names.COST,
            self.names.COSTTERM,
            cost_term_id,
            [self.names.DIRECTIONKEY, self.names.DIRECTIONIN if is_cost else self.names.DIRECTIONOUT],
            [self.names.PARAM, cost],
            [self.names.WHICHCONCEPT, self.names.FLOW if is_flow else self.names.STORAGE],
            [self.names.WHICHINSTANCE, flow_or_storage_id],
        ],
    )
endogenous_balance(balance_id: str, commodiy: str) -> None

Append endogenous balance data element.

Source code in framjules/solve_handler/build_handler/DataElementAppender.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def endogenous_balance(
    self,
    balance_id: str,
    commodiy: str,
) -> None:
    """Append endogenous balance data element."""
    self.data_elements.append(
        [
            self.names.BALANCE,
            self.names.BASEBALANCE,
            balance_id,
            [self.names.COMMODITY, commodiy],
        ],
    )
exogenous_balance(balance_id: str, commodiy: str, price_param_id: str | float) -> None

Append exogenous balance data element.

Source code in framjules/solve_handler/build_handler/DataElementAppender.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def exogenous_balance(
    self,
    balance_id: str,
    commodiy: str,
    price_param_id: str | float,
) -> None:
    """Append exogenous balance data element."""
    self.data_elements.append(
        [
            self.names.BALANCE,
            self.names.EXOGENBALANCE,
            balance_id,
            [self.names.COMMODITY, commodiy],
            [self.names.PRICE, price_param_id],
        ],
    )
global_eneq(global_eneq_id: str, balance_id: str, value: float) -> None

Append global energy equivalent data element.

Source code in framjules/solve_handler/build_handler/DataElementAppender.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def global_eneq(
    self,
    global_eneq_id: str,
    balance_id: str,
    value: float,
) -> None:
    """Append global energy equivalent data element."""
    self.data_elements.append(
        [
            self.names.METADATA,
            self.names.GLOBALENEQ,
            global_eneq_id,
            [self.names.BALANCE, balance_id],
            [self.names.VALUE, value],
        ],
    )
lower_zero_capacity(lower_bound_id: str, is_flow: bool, flow_or_storage_id: str) -> None

Append zero lower capacity data element.

Source code in framjules/solve_handler/build_handler/DataElementAppender.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def lower_zero_capacity(
    self,
    lower_bound_id: str,
    is_flow: bool,
    flow_or_storage_id: str,
) -> None:
    """Append zero lower capacity data element."""
    self.data_elements.append(
        [
            self.names.CAPACITY,
            self.names.LOWERZEROCAPACITY,
            lower_bound_id,
            [self.names.WHICHCONCEPT, self.names.FLOW if is_flow else self.names.STORAGE],
            [self.names.WHICHINSTANCE, flow_or_storage_id],
        ],
    )
mean_series_param(param_id: str, level: str | float, profile: str | float) -> None

Append mean series param data element.

Source code in framjules/solve_handler/build_handler/DataElementAppender.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def mean_series_param(
    self,
    param_id: str,
    level: str | float,
    profile: str | float,
) -> None:
    """Append mean series param data element."""
    self.data_elements.append(
        [
            self.names.PARAM,
            self.names.MEANSERIESPARAM,
            param_id,
            [self.names.LEVEL, level],
            [self.names.PROFILE, profile],
        ],
    )
ms_time_delta(time_delta_id: str, milliseconds: float) -> None

Append ms time delta data element.

Source code in framjules/solve_handler/build_handler/DataElementAppender.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def ms_time_delta(
    self,
    time_delta_id: str,
    milliseconds: float,
) -> None:
    """Append ms time delta data element."""
    self.data_elements.append(
        [
            self.names.TIMEDELTA,
            self.names.MSTIMEDELTA,
            time_delta_id,
            [self.names.PERIOD, milliseconds],
        ],
    )
one_year_time_vector(time_vector_id: str, time_index_id: str, time_values_id: str) -> None

Append one year time vector data element.

Source code in framjules/solve_handler/build_handler/DataElementAppender.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def one_year_time_vector(
    self,
    time_vector_id: str,
    time_index_id: str,
    time_values_id: str,
) -> None:
    """Append one year time vector data element."""
    self.data_elements.append(
        [
            self.names.TIMEVECTOR,
            self.names.ONEYEARTIMEVECTOR,
            time_vector_id,
            [self.names.TIMEINDEX, time_index_id],
            [self.names.TIMEVALUES, time_values_id],
        ],
    )
positive_capacity(bound_id: str, is_flow: bool, flow_or_storage_id: str, is_lower_bound: bool, param_id: str) -> None

Append positive capacity data element.

Source code in framjules/solve_handler/build_handler/DataElementAppender.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def positive_capacity(
    self,
    bound_id: str,
    is_flow: bool,
    flow_or_storage_id: str,
    is_lower_bound: bool,
    param_id: str,
) -> None:
    """Append positive capacity data element."""
    self.data_elements.append(
        [
            self.names.CAPACITY,
            self.names.POSITIVECAPACITY,
            bound_id,
            [self.names.WHICHCONCEPT, self.names.FLOW if is_flow else self.names.STORAGE],
            [self.names.WHICHINSTANCE, flow_or_storage_id],
            [self.names.BOUNDKEY, self.names.BOUNDLOWER if is_lower_bound else self.names.BOUNDUPPER],
            [self.names.PARAM, param_id],
        ],
    )
range_time_index(time_index_id: str, start_time: datetime, num_steps: int, time_delta_id: str) -> None

Append range time index data element.

Source code in framjules/solve_handler/build_handler/DataElementAppender.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def range_time_index(
    self,
    time_index_id: str,
    start_time: datetime,
    num_steps: int,
    time_delta_id: str,
) -> None:
    """Append range time index data element."""
    self.data_elements.append(
        [
            self.names.TIMEINDEX,
            self.names.RANGETIMEINDEX,
            time_index_id,
            [self.names.START, start_time.strftime(r"%Y-%m-%d %H:%M:%S")],
            [self.names.STEPS, num_steps],
            [self.names.DELTA, time_delta_id],
        ],
    )
rotating_time_vector(time_vector_id: str, time_index_id: str, time_values_id: str) -> None

Append rotating time vector data element.

Source code in framjules/solve_handler/build_handler/DataElementAppender.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def rotating_time_vector(
    self,
    time_vector_id: str,
    time_index_id: str,
    time_values_id: str,
) -> None:
    """Append rotating time vector data element."""
    self.data_elements.append(
        [
            self.names.TIMEVECTOR,
            self.names.ROTATINGTIMEVECTOR,
            time_vector_id,
            [self.names.TIMEINDEX, time_index_id],
            [self.names.TIMEVALUES, time_values_id],
        ],
    )
unit_param(unit_param_id: str, series_param_id: str, info: ComponentInfo) -> None

Append unit param data element.

Source code in framjules/solve_handler/build_handler/DataElementAppender.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def unit_param(
    self,
    unit_param_id: str,
    series_param_id: str,
    info: ComponentInfo,
) -> None:
    """Append unit param data element."""
    self.data_elements.append(
        [
            self.names.PARAM,
            info.unit_param_type,
            unit_param_id,
            [self.names.PARAM, series_param_id],
        ],
    )
SerialBuildHandler
SerialBuildHandler

Bases: BuildHandler

Specialized methods for serial simulation.

Source code in framjules/solve_handler/build_handler/SerialBuildHandler.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
class SerialBuildHandler(BuildHandler):
    """Specialized methods for serial simulation."""

    _PROFILE_VALUE_WHEN_ZERO = 1.0

    def __init__(
        self,
        folder: Path,
        config: JulESConfig,
        names: JulESNames,
        domain_models: DomainModels,
        graphs: NodeFlowGraphs,
        graph_infos: GraphInfos,
        db: QueryDB,
    ) -> None:
        """See BuildHandler."""
        super().__init__(folder, config, names, domain_models, graphs, graph_infos, db)

        # some validations spesific to serial simulation
        self._data_period = self.config.get_data_period()
        self._check_type(self._data_period, SinglePeriodTimeIndex)

        start_year, num_years = self.config.get_weather_years()
        self._avg_year_range = AverageYearRange(start_year=start_year, num_years=num_years)
        if self._avg_year_range.is_52_week_years():
            message = "Expected AverageYearRange to have is_52_week_years() == False."
            raise ValueError(message)

        self._is_float32 = self.config.is_float32()

    def get_attribute_level(
        self,
        root_id: str,
        attribute: LevelProfile,
        target_unit: str | None,
    ) -> float:
        """Get level value."""
        is_max_level = False
        value = attribute.get_data_value(
            db=self.db,
            unit=target_unit,
            level_period=self._data_period,
            scenario_horizon=self._avg_year_range,
            is_max_level=is_max_level,
        )
        if value < 0:
            self.send_warning_event(f"Attribute {root_id} of type {type(attribute).__name__} returned {value}")
        return value

    def get_attribute_profile(
        self,
        root_id: str,
        attribute: LevelProfile | Arrow,
        default: float,
        unit: str | None,
        info: ComponentInfo,
    ) -> str | float:
        """Add profile vector to timevectors and return profile_id."""
        if not attribute.has_profile():
            return default

        profile_id = f"{root_id}_profile"

        timeindex = self._find_profile_timeindex(attribute, info.has_storage_resolution)

        vector: NDArray = attribute.get_scenario_vector(
            db=self.db,
            level_period=self._data_period,
            scenario_horizon=timeindex,
            is_float32=self._is_float32,
            unit=unit,
        )
        vector = np.round(vector, decimals=6)  # avoid very small numerical noise, e.g negative prices from jules run

        if np.isnan(vector).any():
            message = (
                f"Profile {profile_id} in time index {timeindex} contains NaN values. "
                "This may indicate a problem with the data or the configuration."
            )
            raise ValueError(message)

        if (vector < 0).any():
            message = (
                f"Profile {profile_id} in time index {timeindex} has negative values. "
                "This may indicate a problem with the data or the configuration."
            )
            raise ValueError(message)

        denominator = vector.mean()

        if denominator == 0:
            vector.fill(self._PROFILE_VALUE_WHEN_ZERO)
        else:
            np.multiply(vector, 1 / denominator, out=vector)

        self.timevectors[timeindex][profile_id] = vector

        return profile_id

    def _find_profile_timeindex(
        self,
        attribute: LevelProfile | Arrow,
        has_storage_resolution: bool,
    ) -> ProfileTimeIndex:
        is_52_week_years = False

        start_year, num_years = self.config.get_weather_years()

        if has_storage_resolution:
            period_duration = timedelta(minutes=self.config.get_time_resolution().get_clearing_storage_minutes())
        else:
            period_duration = timedelta(minutes=self.config.get_time_resolution().get_clearing_market_minutes())

        fallback = ProfileTimeIndex(
            start_year,
            num_years,
            period_duration=self._get_closest_valid_profile_duration(period_duration),
            is_52_week_years=is_52_week_years,
        )

        ix_set: set[FixedFrequencyTimeIndex] = attribute.get_profile_timeindex_set(db=self.db)
        if not all(isinstance(ix, FixedFrequencyTimeIndex) for ix in ix_set):
            return fallback

        is_one_year = all(ix.is_one_year() for ix in ix_set)

        candidate = min((ix for ix in ix_set), key=lambda ix: ix.get_period_duration().total_seconds())

        s_fallback = fallback.get_period_duration().total_seconds()
        s_candidate = candidate.get_period_duration().total_seconds()
        best_duration = (candidate if s_fallback < s_candidate else fallback).get_period_duration()

        if is_one_year:
            return ProfileTimeIndex(
                start_year=candidate.get_start_time().isocalendar().year,
                num_years=1,
                period_duration=self._get_closest_valid_profile_duration(best_duration),
                is_52_week_years=is_52_week_years,
            )
        return ProfileTimeIndex(
            start_year=start_year,
            num_years=num_years,
            period_duration=self._get_closest_valid_profile_duration(best_duration),
            is_52_week_years=is_52_week_years,
        )

    def get_price_level(self, root_id: str, price: Price, info: ComponentInfo) -> float:
        """get_price_level for serial simulation."""
        return self.get_attribute_level(root_id, price, info.unit_price)

    def get_cost_term_level(self, root_id: str, cost_term: Cost, info: ComponentInfo) -> float:
        """get_cost_term_level for serial simulation."""
        return self.get_attribute_level(root_id, cost_term, info.unit_cost)

    def get_capacity_level(self, root_id: str, capacity: StockVolume | FlowVolume, info: ComponentInfo) -> str | float:
        """get_capacity_level for serial simulation. Handles stock or flow based on info."""
        return self.get_attribute_level(root_id, capacity, info.unit_flow if info.is_flow else info.unit_stock)

    def get_coefficient_level(self, root_id: str, arrow: Arrow, info: ComponentInfo) -> str | float:
        """get_coefficient_level for serial simulation."""
        return self.get_attribute_level(root_id, arrow, info.unit_coeffs[arrow.get_node()])

    def get_price_profile(self, root_id: str, price: Price, info: ComponentInfo) -> str | float:
        """get_price_profile for serial simulation."""
        return self.get_attribute_profile(root_id, price, 1.0, info.unit_price, info)

    def get_cost_term_profile(self, root_id: str, cost_term: Cost, info: ComponentInfo) -> str | float:
        """get_cost_term_profile for serial simulation."""
        return self.get_attribute_profile(root_id, cost_term, 1.0, info.unit_cost, info)

    def get_capacity_profile(
        self,
        root_id: str,
        capacity: FlowVolume | StockVolume,
        info: ComponentInfo,
    ) -> str | float:
        """get_capacity_profile for serial simulation."""
        unit = info.unit_flow if info.is_flow else info.unit_stock
        return self.get_attribute_profile(root_id, capacity, 1.0, unit, info)

    def get_rhs_term_level(
        self,
        rhs_term_id: str,
        flow: Flow,
        arrow: Arrow,
        flow_info: ComponentInfo,
    ) -> float:
        """Convert volume (main node) to target node volume.

        This may scale the volume using arrow coefficient,
        e.g. due to transportation loss.

        This may also change unit, if target node belongs to
        different commodity than main node, such as for hydropower.
        """
        coeff_value = self.get_coefficient_level(rhs_term_id, arrow, flow_info)

        volume = flow.get_volume()
        if volume.has_level():
            volume_value = self.get_attribute_level(
                root_id=rhs_term_id,
                attribute=volume,
                target_unit=flow_info.unit_flow,
            )
        else:
            max_cap = flow.get_max_capacity()
            min_cap = flow.get_min_capacity()
            if max_cap is not None and max_cap == min_cap:
                volume_value = self.get_attribute_level(
                    root_id=rhs_term_id,
                    attribute=max_cap,
                    target_unit=flow_info.unit_flow,
                )
            else:
                message = f"{rhs_term_id} is not exogenous"
                raise ValueError(message)

        if coeff_value == 0:
            message = (
                f"Got zero coeff_value for {rhs_term_id}.\n"
                f"volume_value = {volume_value}\n"
                f"coeff_value = {coeff_value}\n"
                f"flow.get_main_node() = {flow.get_main_node()}\n"
                f"arrow.get_node() = {arrow.get_node()}\n"
            )
            raise RuntimeError(message)

        return volume_value / coeff_value  # convert from main_unit to target_unit

    def _rank_profile_timeindex(self, ix: ProfileTimeIndex) -> tuple[bool, float]:
        return ix.is_one_year(), ix.get_period_duration().total_seconds()

    def _select_profile_timeindex(self, *candidates: ProfileTimeIndex) -> ProfileTimeIndex:
        """Select the one with not-is_one_year (if any) and finest period duration."""
        return min(candidates, key=self._rank_profile_timeindex)

    def get_rhs_term_profile(
        self,
        rhs_term_id: str,
        flow: Flow,
        arrow: Arrow,
        flow_info: ComponentInfo,
    ) -> str | float:
        """Create profile (possibly) representing volume_profile * coefficient_profile."""
        volume = flow.get_volume()

        if not volume.has_profile():
            volume = flow.get_max_capacity()

        not_volume_profile = volume is None or not volume.has_profile()
        not_arrow_profile = not arrow.has_profile()
        has_volume_profile = not not_volume_profile
        has_arrow_profile = not not_arrow_profile

        if not_volume_profile and not_arrow_profile:
            return 1.0

        if has_volume_profile and not_arrow_profile:
            return self.get_attribute_profile(
                rhs_term_id,
                attribute=volume,
                default=1.0,
                unit=flow_info.unit_flow,
                info=flow_info,
            )

        if not_volume_profile and has_arrow_profile:
            unit = flow_info.unit_coeffs[arrow.get_node()]
            return self.get_attribute_profile(
                rhs_term_id,
                attribute=arrow,
                default=1.0,
                unit=unit,
                info=flow_info,
            )

        # Here we get both profiles (volume and coefficient) and muliply them
        # together and store the resulting product profile in self.timevectors

        profile_id = f"{rhs_term_id}_profile"

        timeindex = self._select_profile_timeindex(
            self._find_profile_timeindex(volume, flow_info.has_storage_resolution),
            self._find_profile_timeindex(arrow, flow_info.has_storage_resolution),
        )

        x: NDArray = volume.get_scenario_vector(
            db=self.db,
            level_period=self._data_period,
            scenario_horizon=timeindex,
            is_float32=self._is_float32,
            unit=flow_info.unit_flow,
        )

        y: NDArray = arrow.get_scenario_vector(
            db=self.db,
            level_period=self._data_period,
            scenario_horizon=timeindex,
            is_float32=self._is_float32,
            unit=flow_info.unit_coeffs[arrow.get_node()],
        )

        x_mean = x.mean()
        y_mean = y.mean()
        if x_mean == 0 or y_mean == 0:
            x.fill(self._PROFILE_VALUE_WHEN_ZERO)
            return x

        np.multiply(x, 1.0 / x_mean, out=x)
        np.multiply(y, 1.0 / y_mean, out=y)

        np.multiply(x, y, out=x)

        self.timevectors[timeindex][profile_id] = x

        return profile_id
__init__(folder: Path, config: JulESConfig, names: JulESNames, domain_models: DomainModels, graphs: NodeFlowGraphs, graph_infos: GraphInfos, db: QueryDB) -> None

See BuildHandler.

Source code in framjules/solve_handler/build_handler/SerialBuildHandler.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def __init__(
    self,
    folder: Path,
    config: JulESConfig,
    names: JulESNames,
    domain_models: DomainModels,
    graphs: NodeFlowGraphs,
    graph_infos: GraphInfos,
    db: QueryDB,
) -> None:
    """See BuildHandler."""
    super().__init__(folder, config, names, domain_models, graphs, graph_infos, db)

    # some validations spesific to serial simulation
    self._data_period = self.config.get_data_period()
    self._check_type(self._data_period, SinglePeriodTimeIndex)

    start_year, num_years = self.config.get_weather_years()
    self._avg_year_range = AverageYearRange(start_year=start_year, num_years=num_years)
    if self._avg_year_range.is_52_week_years():
        message = "Expected AverageYearRange to have is_52_week_years() == False."
        raise ValueError(message)

    self._is_float32 = self.config.is_float32()
get_attribute_level(root_id: str, attribute: LevelProfile, target_unit: str | None) -> float

Get level value.

Source code in framjules/solve_handler/build_handler/SerialBuildHandler.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def get_attribute_level(
    self,
    root_id: str,
    attribute: LevelProfile,
    target_unit: str | None,
) -> float:
    """Get level value."""
    is_max_level = False
    value = attribute.get_data_value(
        db=self.db,
        unit=target_unit,
        level_period=self._data_period,
        scenario_horizon=self._avg_year_range,
        is_max_level=is_max_level,
    )
    if value < 0:
        self.send_warning_event(f"Attribute {root_id} of type {type(attribute).__name__} returned {value}")
    return value
get_attribute_profile(root_id: str, attribute: LevelProfile | Arrow, default: float, unit: str | None, info: ComponentInfo) -> str | float

Add profile vector to timevectors and return profile_id.

Source code in framjules/solve_handler/build_handler/SerialBuildHandler.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def get_attribute_profile(
    self,
    root_id: str,
    attribute: LevelProfile | Arrow,
    default: float,
    unit: str | None,
    info: ComponentInfo,
) -> str | float:
    """Add profile vector to timevectors and return profile_id."""
    if not attribute.has_profile():
        return default

    profile_id = f"{root_id}_profile"

    timeindex = self._find_profile_timeindex(attribute, info.has_storage_resolution)

    vector: NDArray = attribute.get_scenario_vector(
        db=self.db,
        level_period=self._data_period,
        scenario_horizon=timeindex,
        is_float32=self._is_float32,
        unit=unit,
    )
    vector = np.round(vector, decimals=6)  # avoid very small numerical noise, e.g negative prices from jules run

    if np.isnan(vector).any():
        message = (
            f"Profile {profile_id} in time index {timeindex} contains NaN values. "
            "This may indicate a problem with the data or the configuration."
        )
        raise ValueError(message)

    if (vector < 0).any():
        message = (
            f"Profile {profile_id} in time index {timeindex} has negative values. "
            "This may indicate a problem with the data or the configuration."
        )
        raise ValueError(message)

    denominator = vector.mean()

    if denominator == 0:
        vector.fill(self._PROFILE_VALUE_WHEN_ZERO)
    else:
        np.multiply(vector, 1 / denominator, out=vector)

    self.timevectors[timeindex][profile_id] = vector

    return profile_id
get_capacity_level(root_id: str, capacity: StockVolume | FlowVolume, info: ComponentInfo) -> str | float

get_capacity_level for serial simulation. Handles stock or flow based on info.

Source code in framjules/solve_handler/build_handler/SerialBuildHandler.py
176
177
178
def get_capacity_level(self, root_id: str, capacity: StockVolume | FlowVolume, info: ComponentInfo) -> str | float:
    """get_capacity_level for serial simulation. Handles stock or flow based on info."""
    return self.get_attribute_level(root_id, capacity, info.unit_flow if info.is_flow else info.unit_stock)
get_capacity_profile(root_id: str, capacity: FlowVolume | StockVolume, info: ComponentInfo) -> str | float

get_capacity_profile for serial simulation.

Source code in framjules/solve_handler/build_handler/SerialBuildHandler.py
192
193
194
195
196
197
198
199
200
def get_capacity_profile(
    self,
    root_id: str,
    capacity: FlowVolume | StockVolume,
    info: ComponentInfo,
) -> str | float:
    """get_capacity_profile for serial simulation."""
    unit = info.unit_flow if info.is_flow else info.unit_stock
    return self.get_attribute_profile(root_id, capacity, 1.0, unit, info)
get_coefficient_level(root_id: str, arrow: Arrow, info: ComponentInfo) -> str | float

get_coefficient_level for serial simulation.

Source code in framjules/solve_handler/build_handler/SerialBuildHandler.py
180
181
182
def get_coefficient_level(self, root_id: str, arrow: Arrow, info: ComponentInfo) -> str | float:
    """get_coefficient_level for serial simulation."""
    return self.get_attribute_level(root_id, arrow, info.unit_coeffs[arrow.get_node()])
get_cost_term_level(root_id: str, cost_term: Cost, info: ComponentInfo) -> float

get_cost_term_level for serial simulation.

Source code in framjules/solve_handler/build_handler/SerialBuildHandler.py
172
173
174
def get_cost_term_level(self, root_id: str, cost_term: Cost, info: ComponentInfo) -> float:
    """get_cost_term_level for serial simulation."""
    return self.get_attribute_level(root_id, cost_term, info.unit_cost)
get_cost_term_profile(root_id: str, cost_term: Cost, info: ComponentInfo) -> str | float

get_cost_term_profile for serial simulation.

Source code in framjules/solve_handler/build_handler/SerialBuildHandler.py
188
189
190
def get_cost_term_profile(self, root_id: str, cost_term: Cost, info: ComponentInfo) -> str | float:
    """get_cost_term_profile for serial simulation."""
    return self.get_attribute_profile(root_id, cost_term, 1.0, info.unit_cost, info)
get_price_level(root_id: str, price: Price, info: ComponentInfo) -> float

get_price_level for serial simulation.

Source code in framjules/solve_handler/build_handler/SerialBuildHandler.py
168
169
170
def get_price_level(self, root_id: str, price: Price, info: ComponentInfo) -> float:
    """get_price_level for serial simulation."""
    return self.get_attribute_level(root_id, price, info.unit_price)
get_price_profile(root_id: str, price: Price, info: ComponentInfo) -> str | float

get_price_profile for serial simulation.

Source code in framjules/solve_handler/build_handler/SerialBuildHandler.py
184
185
186
def get_price_profile(self, root_id: str, price: Price, info: ComponentInfo) -> str | float:
    """get_price_profile for serial simulation."""
    return self.get_attribute_profile(root_id, price, 1.0, info.unit_price, info)
get_rhs_term_level(rhs_term_id: str, flow: Flow, arrow: Arrow, flow_info: ComponentInfo) -> float

Convert volume (main node) to target node volume.

This may scale the volume using arrow coefficient, e.g. due to transportation loss.

This may also change unit, if target node belongs to different commodity than main node, such as for hydropower.

Source code in framjules/solve_handler/build_handler/SerialBuildHandler.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def get_rhs_term_level(
    self,
    rhs_term_id: str,
    flow: Flow,
    arrow: Arrow,
    flow_info: ComponentInfo,
) -> float:
    """Convert volume (main node) to target node volume.

    This may scale the volume using arrow coefficient,
    e.g. due to transportation loss.

    This may also change unit, if target node belongs to
    different commodity than main node, such as for hydropower.
    """
    coeff_value = self.get_coefficient_level(rhs_term_id, arrow, flow_info)

    volume = flow.get_volume()
    if volume.has_level():
        volume_value = self.get_attribute_level(
            root_id=rhs_term_id,
            attribute=volume,
            target_unit=flow_info.unit_flow,
        )
    else:
        max_cap = flow.get_max_capacity()
        min_cap = flow.get_min_capacity()
        if max_cap is not None and max_cap == min_cap:
            volume_value = self.get_attribute_level(
                root_id=rhs_term_id,
                attribute=max_cap,
                target_unit=flow_info.unit_flow,
            )
        else:
            message = f"{rhs_term_id} is not exogenous"
            raise ValueError(message)

    if coeff_value == 0:
        message = (
            f"Got zero coeff_value for {rhs_term_id}.\n"
            f"volume_value = {volume_value}\n"
            f"coeff_value = {coeff_value}\n"
            f"flow.get_main_node() = {flow.get_main_node()}\n"
            f"arrow.get_node() = {arrow.get_node()}\n"
        )
        raise RuntimeError(message)

    return volume_value / coeff_value  # convert from main_unit to target_unit
get_rhs_term_profile(rhs_term_id: str, flow: Flow, arrow: Arrow, flow_info: ComponentInfo) -> str | float

Create profile (possibly) representing volume_profile * coefficient_profile.

Source code in framjules/solve_handler/build_handler/SerialBuildHandler.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
def get_rhs_term_profile(
    self,
    rhs_term_id: str,
    flow: Flow,
    arrow: Arrow,
    flow_info: ComponentInfo,
) -> str | float:
    """Create profile (possibly) representing volume_profile * coefficient_profile."""
    volume = flow.get_volume()

    if not volume.has_profile():
        volume = flow.get_max_capacity()

    not_volume_profile = volume is None or not volume.has_profile()
    not_arrow_profile = not arrow.has_profile()
    has_volume_profile = not not_volume_profile
    has_arrow_profile = not not_arrow_profile

    if not_volume_profile and not_arrow_profile:
        return 1.0

    if has_volume_profile and not_arrow_profile:
        return self.get_attribute_profile(
            rhs_term_id,
            attribute=volume,
            default=1.0,
            unit=flow_info.unit_flow,
            info=flow_info,
        )

    if not_volume_profile and has_arrow_profile:
        unit = flow_info.unit_coeffs[arrow.get_node()]
        return self.get_attribute_profile(
            rhs_term_id,
            attribute=arrow,
            default=1.0,
            unit=unit,
            info=flow_info,
        )

    # Here we get both profiles (volume and coefficient) and muliply them
    # together and store the resulting product profile in self.timevectors

    profile_id = f"{rhs_term_id}_profile"

    timeindex = self._select_profile_timeindex(
        self._find_profile_timeindex(volume, flow_info.has_storage_resolution),
        self._find_profile_timeindex(arrow, flow_info.has_storage_resolution),
    )

    x: NDArray = volume.get_scenario_vector(
        db=self.db,
        level_period=self._data_period,
        scenario_horizon=timeindex,
        is_float32=self._is_float32,
        unit=flow_info.unit_flow,
    )

    y: NDArray = arrow.get_scenario_vector(
        db=self.db,
        level_period=self._data_period,
        scenario_horizon=timeindex,
        is_float32=self._is_float32,
        unit=flow_info.unit_coeffs[arrow.get_node()],
    )

    x_mean = x.mean()
    y_mean = y.mean()
    if x_mean == 0 or y_mean == 0:
        x.fill(self._PROFILE_VALUE_WHEN_ZERO)
        return x

    np.multiply(x, 1.0 / x_mean, out=x)
    np.multiply(y, 1.0 / y_mean, out=y)

    np.multiply(x, y, out=x)

    self.timevectors[timeindex][profile_id] = x

    return profile_id

dataclasses

ComponentInfo dataclass

All derived info we need during solve.

Source code in framjules/solve_handler/dataclasses.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@dataclass
class ComponentInfo:
    """All derived info we need during solve."""

    is_flow: bool | None = None
    is_node: bool | None = None
    is_storage_node: bool | None = None
    is_market_node: bool | None = None
    is_market_flow: bool | None = None
    is_sss_member: bool | None = None
    is_exogenous: bool | None = None

    has_storage_resolution: bool | None = None

    jules_commodity: str | None = None
    domain_commodity: str | None = None

    # sss = storage subsystem
    sss_id: str | None = None
    sss_market_commodity: str | None = None
    sss_members: set[str] | None = None
    sss_is_short_term: int | None = None
    sss_global_eneq_value: float | None = None
    sss_global_eneq_unit: str | None = None
    sss_initial_storage: float | None = None

    jules_balance_id: str | None = None
    jules_storage_id: str | None = None
    jules_global_eneq_id: str | None = None

    main_node_id: str | None = None
    num_arrows: int | None = None

    unit_price: str | None = None
    unit_stock: str | None = None
    unit_flow: str | None = None
    unit_flow_result: str | None = None
    unit_cost: str | None = None
    unit_coeffs: dict[str : str | None] | None = None
    unit_param_type: str | None = None
    unit_param_unit_flow: str | None = None
    unit_param_unit_stock: str | None = None

    agg_storage_node_id: str | None = None
    agg_market_node_id: str | None = None
DomainModels dataclass

Model instance for each term.

Source code in framjules/solve_handler/dataclasses.py
 7
 8
 9
10
11
12
13
14
@dataclass
class DomainModels:
    """Model instance for each term."""

    clearing: Model
    short_term: Model
    medium_term: Model
    long_term: Model
GraphInfos dataclass

Hold all component info for all graphs.

Source code in framjules/solve_handler/dataclasses.py
74
75
76
77
78
79
80
81
@dataclass
class GraphInfos:
    """Hold all component info for all graphs."""

    clearing: dict[str, ComponentInfo]
    short_term: dict[str, ComponentInfo]
    medium_term: dict[str, ComponentInfo]
    long_term: dict[str, ComponentInfo]
NodeFlowGraphs dataclass

Node-Flow representation of domain model components via get_supported_components.

Source code in framjules/solve_handler/dataclasses.py
17
18
19
20
21
22
23
24
@dataclass
class NodeFlowGraphs:
    """Node-Flow representation of domain model components via get_supported_components."""

    clearing: dict[str, Node | Flow]
    short_term: dict[str, Node | Flow]
    medium_term: dict[str, Node | Flow]
    long_term: dict[str, Node | Flow]

results_handler

SerialResultsHandler

Handling of results produced by running JulES in Serial mode.

SerialResultsHandler

Set serial simulation results.

Source code in framjules/solve_handler/results_handler/SerialResultsHandler.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
class SerialResultsHandler:
    """Set serial simulation results."""

    def __init__(
        self,
        folder: Path | str,
        config: JulESConfig,
        names: JulESNames,
        graphs: NodeFlowGraphs,
        graph_infos: GraphInfos,
    ) -> None:
        """Handle retrieval of results from a JulES Serial simulation.

        Args:
            folder (Path | str): Path to folder of the JulES simulation.
            config (JulESConfig): Simulation config.
            names (JulESNames): JulES namespace.
            graphs (NodeFlowGraphs): Graphs used in the simulation.
            graph_infos (GraphInfos): JulES specific info of each Component in the graphs.

        """
        self._folder = Path(folder)
        self._config = config
        self._names = names

        self.graphs = graphs
        self.graph_infos = graph_infos

        self.units = self._set_units()

    def set_results(self) -> None:
        """Set JulES results of all Components in the clearing graph."""
        is_whole_years = self._config.is_simulation_mode_serial()

        loader = JulESH5TimeVectorLoader(
            source=self._folder,
            units=self.units,
            relative_loc=self._names.FILENAME_H5_OUTPUT,
            is_whole_years=is_whole_years,
        )

        supply_loader = SupplyJulESH5TimeVectorLoader(
            source=self._folder,
            units=self.units,
            relative_loc=self._names.FILENAME_H5_OUTPUT,
            is_whole_years=is_whole_years,
        )

        demand_loader = DemandJulESH5TimeVectorLoader(
            source=self._folder,
            units=self.units,
            relative_loc=self._names.FILENAME_H5_OUTPUT,
            is_whole_years=is_whole_years,
        )

        power_nodes = [
            name
            for name, c in self.graphs.clearing.items()
            if isinstance(c, Node) and c.get_commodity() == JulESNames.POWER
        ]
        for name, c in self.graphs.clearing.items():
            info: ComponentInfo = self.graph_infos.clearing[name]

            if isinstance(c, Node):
                if c.is_exogenous():
                    continue  
                self._set_node_results(c, name, loader)
            if isinstance(c, Flow):
                self._set_flow_results(
                    c,
                    info,
                    name,
                    loader,
                    power_nodes,
                    supply_loader=supply_loader,
                    demand_loader=demand_loader,
                )

    def _set_node_results(self, node: Node, name: str, loader: JulESH5TimeVectorLoader) -> None:
        info = self.graph_infos.clearing[name]

        if info.is_market_node:
            level, profile = self._get_decomposed_level_profile(name, loader, self.units[name])
            price = node.get_price()
            price.set_level(level)
            price.set_profile(profile)

        if info.is_storage_node:
            level, profile = self._get_decomposed_level_profile(
                info.jules_storage_id,
                loader,
                self.units[info.jules_storage_id],
                is_stock=True,
            )
            storage = node.get_storage()
            volume = storage.get_volume()
            volume.set_level(level)
            volume.set_profile(profile)

            level, profile = self._get_decomposed_level_profile(
                info.jules_storage_id + "_sv", loader, self.units[info.jules_storage_id + "_sv"]
            )
            price = node.get_price()
            price.set_level(level)
            price.set_profile(profile)

    def _set_flow_results(
        self,
        flow: Flow,
        info: ComponentInfo,
        name: str,
        loader: JulESH5TimeVectorLoader,
        power_nodes: list[str],
        supply_loader: SupplyJulESH5TimeVectorLoader,
        demand_loader: DemandJulESH5TimeVectorLoader,
    ) -> None:
        if info.is_exogenous and flow.get_volume().get_level():
            return
        if info.is_exogenous:
            max_capacity = flow.get_max_capacity()
            level = max_capacity.get_level()
            profile = max_capacity.get_profile()
        else:
            level, profile = self._get_decomposed_level_profile(
                name,
                loader,
                self.units[name],
                is_flow=True,
            )
        volume = flow.get_volume()
        volume.set_level(level)
        volume.set_profile(profile)

        # temporary workaround, will be fixed in future versions
        n = len(name)
        for arrow, volume in flow.get_arrow_volumes().items():
            arrow_node = arrow.get_node()
            if arrow_node not in power_nodes:
                continue

            is_supply = arrow.is_ingoing()
            loader = supply_loader if is_supply else demand_loader

            unit_flow = self.units[name]
            unit_coeff = info.unit_coeffs[arrow.get_node()]
            unit = f"({unit_flow})*({unit_coeff})" if unit_coeff is not None else unit_flow

            for i in range(n):
                subname = name[: n - i]
                level, profile = self._get_decomposed_level_profile(
                    subname,
                    loader,
                    unit,
                    is_flow=True,
                )
                volume.set_level(level)
                volume.set_profile(profile)
                break

    def _get_decomposed_level_profile(
        self,
        name: str,
        loader: JulESH5TimeVectorLoader,
        unit: str | None = None,
        is_flow: bool = False,
        is_stock: bool = False,
    ) -> tuple[Expr, Expr]:
        """ Decompose price vector into level and profile expressions.
        Note! Support for negative proices will come in the next version. """


        timevector = self._get_timevector(jules_id=name, loader=loader)


        mean_value = timevector.get_vector(self._config.is_float32()).mean()

        scale = float(1 / mean_value) if mean_value != 0 else 1.0
        mean_one_profile_timevector = LinearTransformTimeVector(
            timevector=timevector,
            scale=scale,
            shift=0.0,
            unit=None,
            is_zero_one_profile=False,
        )

        profile_expr = None
        reference_period = None
        if mean_value != 0:
            reference_period = self._get_reference_period()
            profile_expr = ensure_expr(mean_one_profile_timevector, is_profile=True)

        avg_level_timevector = ConstantTimeVector(
            scalar=mean_value,
            unit=unit,
            is_max_level=False,
            reference_period=reference_period,
        )

        level_expr = ensure_expr(
            avg_level_timevector,
            is_level=True,
            is_flow=is_flow,
            is_stock=is_stock,
            profile=profile_expr,
        )

        return level_expr, profile_expr

    def _set_units(self) -> dict[str : str | None]:
        units = {name: info.unit_price for name, info in self.graph_infos.clearing.items() if info.is_market_node}
        units.update(
            {
                info.jules_storage_id: info.unit_stock
                for info in self.graph_infos.clearing.values()
                if info.is_storage_node
            },
        )
        units.update(
            {
                info.jules_storage_id + "_sv": info.unit_cost
                for info in self.graph_infos.clearing.values()
                if info.is_storage_node and info.unit_cost is not None
            },
        )
        units.update(
            {name: info.unit_flow_result for name, info in self.graph_infos.clearing.items() if info.is_flow},
        )
        return units

    def _get_reference_period(self) -> ReferencePeriod:
        first_year, num_years = self._config.get_simulation_years()
        return ReferencePeriod(first_year, num_years)

    def _get_timevector(self, jules_id: str, loader: JulESH5TimeVectorLoader) -> LoadedTimeVector:
        try:
            return LoadedTimeVector(vector_id=jules_id, loader=loader)
        except Exception:
            raise AssertionError from None
__init__(folder: Path | str, config: JulESConfig, names: JulESNames, graphs: NodeFlowGraphs, graph_infos: GraphInfos) -> None

Handle retrieval of results from a JulES Serial simulation.

Parameters:

Name Type Description Default
folder Path | str

Path to folder of the JulES simulation.

required
config JulESConfig

Simulation config.

required
names JulESNames

JulES namespace.

required
graphs NodeFlowGraphs

Graphs used in the simulation.

required
graph_infos GraphInfos

JulES specific info of each Component in the graphs.

required
Source code in framjules/solve_handler/results_handler/SerialResultsHandler.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def __init__(
    self,
    folder: Path | str,
    config: JulESConfig,
    names: JulESNames,
    graphs: NodeFlowGraphs,
    graph_infos: GraphInfos,
) -> None:
    """Handle retrieval of results from a JulES Serial simulation.

    Args:
        folder (Path | str): Path to folder of the JulES simulation.
        config (JulESConfig): Simulation config.
        names (JulESNames): JulES namespace.
        graphs (NodeFlowGraphs): Graphs used in the simulation.
        graph_infos (GraphInfos): JulES specific info of each Component in the graphs.

    """
    self._folder = Path(folder)
    self._config = config
    self._names = names

    self.graphs = graphs
    self.graph_infos = graph_infos

    self.units = self._set_units()
set_results() -> None

Set JulES results of all Components in the clearing graph.

Source code in framjules/solve_handler/results_handler/SerialResultsHandler.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def set_results(self) -> None:
    """Set JulES results of all Components in the clearing graph."""
    is_whole_years = self._config.is_simulation_mode_serial()

    loader = JulESH5TimeVectorLoader(
        source=self._folder,
        units=self.units,
        relative_loc=self._names.FILENAME_H5_OUTPUT,
        is_whole_years=is_whole_years,
    )

    supply_loader = SupplyJulESH5TimeVectorLoader(
        source=self._folder,
        units=self.units,
        relative_loc=self._names.FILENAME_H5_OUTPUT,
        is_whole_years=is_whole_years,
    )

    demand_loader = DemandJulESH5TimeVectorLoader(
        source=self._folder,
        units=self.units,
        relative_loc=self._names.FILENAME_H5_OUTPUT,
        is_whole_years=is_whole_years,
    )

    power_nodes = [
        name
        for name, c in self.graphs.clearing.items()
        if isinstance(c, Node) and c.get_commodity() == JulESNames.POWER
    ]
    for name, c in self.graphs.clearing.items():
        info: ComponentInfo = self.graph_infos.clearing[name]

        if isinstance(c, Node):
            if c.is_exogenous():
                continue  
            self._set_node_results(c, name, loader)
        if isinstance(c, Flow):
            self._set_flow_results(
                c,
                info,
                name,
                loader,
                power_nodes,
                supply_loader=supply_loader,
                demand_loader=demand_loader,
            )

run_handler

SerialRunHandler
SerialRunHandler

Bases: JuliaModel

Handle running the JulES solver in serial simulation mode.

Source code in framjules/solve_handler/run_handler/SerialRunHandler.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class SerialRunHandler(JuliaModel):
    """Handle running the JulES solver in serial simulation mode."""

    # ENV_NAME = "JulES_julia_env"

    def __init__(
        self,
        folder: Path,
        config: JulESConfig,
        names: JulESNames,
        dependencies: list[str | tuple[str, str | None]] | None = None,
    ) -> None:
        """Initialize JulES serial folder.

        The three parameters env_path, depot_path and julia_path sets environment variables for locations of your Julia
        environment, packages and language.
            - If user has not specified locations, the default is to use the current python/conda environment.
            - If a system installation of Python is used, the default is set to the current user location.

        Args:
            folder (Path): Location of JulES model dataset.
            config (JulESConfig): Simulaiton config.
            names (JulESNames): JulES namespace object.
            dependencies (list[str]): Julia packages dependencies. List of str, either package names or urls.

        """
        self._folder = folder
        self._config = config
        self._names = names
        self.ENV_NAME = self._names.JULIA_ENV_NAME
        super().__init__(
            julia_path=self._config.get_julia_exe_path(),
            env_path=self._config.get_julia_env_path(),
            depot_path=self._config.get_julia_depot_path(),
            dependencies=dependencies,
            skip_install_dependencies=config.is_skip_install_dependencies(),
        )

    def run(self) -> None:
        """Run JulES in Series mode."""
        data_year = self._config.get_data_period().get_start_time().year
        weather_year = self._config.get_weather_years()[0]

        config_path = self._folder / self._names.JULES_CONFIG
        output_path = self._folder / self._names.FILENAME_H5_OUTPUT

        def get_all_attrs(obj) -> dict:  # noqa: ANN001
            result = {}
            result.update(
                {k: v for k, v in obj.__class__.__dict__.items() if not k.startswith("__") and not callable(v)},
            )
            result.update(obj.__dict__)
            return result

        names_dict = get_all_attrs(self._names)
        filename_clearing = f"{self._names.ROOT_FILENAME_DATAELEMENTS}_{self._names.CLEARING}.json"
        filename_aggregated = f"{self._names.ROOT_FILENAME_DATAELEMENTS}_{self._names.AGGREGATED}.json"

        self._jl.seval(f"""
        using Distributed, YAML, HDF5
        config = YAML.load_file(\"{config_path.as_posix()}\")
        println("Add cores")
        const numcores = config["main"]["numcores"]
        if nprocs() < numcores
            addprocs(numcores - nprocs())
        end
        @show nprocs()
        println("Load JulES")
        @time @everywhere using JulES
        using Pkg; Pkg.status()
        """)

        self._jl.JulES.run_jules(
            config_path.as_posix(),
            data_year,
            weather_year,
            output_path.as_posix(),
            names_dict,
            filename_clearing,
            filename_aggregated,
        )
__init__(folder: Path, config: JulESConfig, names: JulESNames, dependencies: list[str | tuple[str, str | None]] | None = None) -> None

Initialize JulES serial folder.

The three parameters env_path, depot_path and julia_path sets environment variables for locations of your Julia environment, packages and language. - If user has not specified locations, the default is to use the current python/conda environment. - If a system installation of Python is used, the default is set to the current user location.

Parameters:

Name Type Description Default
folder Path

Location of JulES model dataset.

required
config JulESConfig

Simulaiton config.

required
names JulESNames

JulES namespace object.

required
dependencies list[str]

Julia packages dependencies. List of str, either package names or urls.

None
Source code in framjules/solve_handler/run_handler/SerialRunHandler.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def __init__(
    self,
    folder: Path,
    config: JulESConfig,
    names: JulESNames,
    dependencies: list[str | tuple[str, str | None]] | None = None,
) -> None:
    """Initialize JulES serial folder.

    The three parameters env_path, depot_path and julia_path sets environment variables for locations of your Julia
    environment, packages and language.
        - If user has not specified locations, the default is to use the current python/conda environment.
        - If a system installation of Python is used, the default is set to the current user location.

    Args:
        folder (Path): Location of JulES model dataset.
        config (JulESConfig): Simulaiton config.
        names (JulESNames): JulES namespace object.
        dependencies (list[str]): Julia packages dependencies. List of str, either package names or urls.

    """
    self._folder = folder
    self._config = config
    self._names = names
    self.ENV_NAME = self._names.JULIA_ENV_NAME
    super().__init__(
        julia_path=self._config.get_julia_exe_path(),
        env_path=self._config.get_julia_env_path(),
        depot_path=self._config.get_julia_depot_path(),
        dependencies=dependencies,
        skip_install_dependencies=config.is_skip_install_dependencies(),
    )
run() -> None

Run JulES in Series mode.

Source code in framjules/solve_handler/run_handler/SerialRunHandler.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def run(self) -> None:
    """Run JulES in Series mode."""
    data_year = self._config.get_data_period().get_start_time().year
    weather_year = self._config.get_weather_years()[0]

    config_path = self._folder / self._names.JULES_CONFIG
    output_path = self._folder / self._names.FILENAME_H5_OUTPUT

    def get_all_attrs(obj) -> dict:  # noqa: ANN001
        result = {}
        result.update(
            {k: v for k, v in obj.__class__.__dict__.items() if not k.startswith("__") and not callable(v)},
        )
        result.update(obj.__dict__)
        return result

    names_dict = get_all_attrs(self._names)
    filename_clearing = f"{self._names.ROOT_FILENAME_DATAELEMENTS}_{self._names.CLEARING}.json"
    filename_aggregated = f"{self._names.ROOT_FILENAME_DATAELEMENTS}_{self._names.AGGREGATED}.json"

    self._jl.seval(f"""
    using Distributed, YAML, HDF5
    config = YAML.load_file(\"{config_path.as_posix()}\")
    println("Add cores")
    const numcores = config["main"]["numcores"]
    if nprocs() < numcores
        addprocs(numcores - nprocs())
    end
    @show nprocs()
    println("Load JulES")
    @time @everywhere using JulES
    using Pkg; Pkg.status()
    """)

    self._jl.JulES.run_jules(
        config_path.as_posix(),
        data_year,
        weather_year,
        output_path.as_posix(),
        names_dict,
        filename_clearing,
        filename_aggregated,
    )