diff --git a/CHANGELOG.md b/CHANGELOG.md index 68d3d6b92..d53b1279a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -182,7 +182,17 @@ The following items are deprecated and will be removed in **v7.0.0**: - `Results` class → Use `flow_system.solution` and `flow_system.statistics` - `SegmentedResults` class → Use segment FlowSystems directly -**FlowSystem methods** (use `transform` or `topology` accessor instead): +Note: `topology.plot()` now renders a Sankey diagram. The old PyVis visualization is available via `topology.plot_legacy()`. + +### 🔥 Removed + +**Clustering classes removed** (deprecated in v5.0.0): + +- `ClusteredOptimization` class - Use `flow_system.transform.cluster()` then `optimize()` +- `ClusteringParameters` class - Parameters are now passed directly to `transform.cluster()` +- `flixopt/clustering.py` module - Restructured to `flixopt/clustering/` package with new classes + +**FlowSystem deprecated methods removed** (use `transform` or `topology` accessor instead): - `flow_system.sel()` → Use `flow_system.transform.sel()` - `flow_system.isel()` → Use `flow_system.transform.isel()` @@ -192,28 +202,9 @@ The following items are deprecated and will be removed in **v7.0.0**: - `flow_system.stop_network_app()` → Use `flow_system.topology.stop_app()` - `flow_system.network_infos()` → Use `flow_system.topology.infos()` -**Parameters:** - -- `normalize_weights` parameter in `create_model()`, `build_model()`, `optimize()` +**Parameters removed:** -**Topology method name simplifications** (old names still work with deprecation warnings, removal in v7.0.0): - -| Old (v5.x) | New (v6.0.0) | -|------------|--------------| -| `topology.plot_network()` | `topology.plot()` | -| `topology.start_network_app()` | `topology.start_app()` | -| `topology.stop_network_app()` | `topology.stop_app()` | -| `topology.network_infos()` | `topology.infos()` | - -Note: `topology.plot()` now renders a Sankey diagram. The old PyVis visualization is available via `topology.plot_legacy()`. - -### 🔥 Removed - -**Clustering classes removed** (deprecated in v5.0.0): - -- `ClusteredOptimization` class - Use `flow_system.transform.cluster()` then `optimize()` -- `ClusteringParameters` class - Parameters are now passed directly to `transform.cluster()` -- `flixopt/clustering.py` module - Restructured to `flixopt/clustering/` package with new classes +- `normalize_weights` parameter removed from `create_model()`, `build_model()`, `optimize()` (weights are always normalized) #### Migration from ClusteredOptimization diff --git a/flixopt/flow_system.py b/flixopt/flow_system.py index 7e12029ed..6db7823ff 100644 --- a/flixopt/flow_system.py +++ b/flixopt/flow_system.py @@ -9,7 +9,7 @@ import pathlib import warnings from itertools import chain -from typing import TYPE_CHECKING, Any, Literal +from typing import TYPE_CHECKING import numpy as np import pandas as pd @@ -36,8 +36,6 @@ if TYPE_CHECKING: from collections.abc import Collection - import pyvis - from .clustering import Clustering from .solvers import _Solver from .types import Effect_TPS, Numeric_S, Numeric_TPS, NumericOrBool @@ -1330,20 +1328,8 @@ def flow_carriers(self) -> dict[str, str]: return self._flow_carriers - def create_model(self, normalize_weights: bool | None = None) -> FlowSystemModel: - """ - Create a linopy model from the FlowSystem. - - Args: - normalize_weights: Deprecated. Scenario weights are now always normalized in FlowSystem. - """ - if normalize_weights is not None: - warnings.warn( - f'\n\nnormalize_weights parameter is deprecated and will be removed in {DEPRECATION_REMOVAL_VERSION}. ' - 'Scenario weights are now always normalized when set on FlowSystem.\n', - DeprecationWarning, - stacklevel=2, - ) + def create_model(self) -> FlowSystemModel: + """Create a linopy model from the FlowSystem.""" if not self.connected_and_transformed: raise RuntimeError( 'FlowSystem is not connected_and_transformed. Call FlowSystem.connect_and_transform() first.' @@ -1352,7 +1338,7 @@ def create_model(self, normalize_weights: bool | None = None) -> FlowSystemModel self.model = FlowSystemModel(self) return self.model - def build_model(self, normalize_weights: bool | None = None) -> FlowSystem: + def build_model(self) -> FlowSystem: """ Build the optimization model for this FlowSystem. @@ -1365,9 +1351,6 @@ def build_model(self, normalize_weights: bool | None = None) -> FlowSystem: After calling this method, `self.model` will be available for inspection before solving. - Args: - normalize_weights: Deprecated. Scenario weights are now always normalized in FlowSystem. - Returns: Self, for method chaining. @@ -1376,13 +1359,6 @@ def build_model(self, normalize_weights: bool | None = None) -> FlowSystem: >>> print(flow_system.model.variables) # Inspect variables before solving >>> flow_system.solve(solver) """ - if normalize_weights is not None: - warnings.warn( - f'\n\nnormalize_weights parameter is deprecated and will be removed in {DEPRECATION_REMOVAL_VERSION}. ' - 'Scenario weights are now always normalized when set on FlowSystem.\n', - DeprecationWarning, - stacklevel=2, - ) self.connect_and_transform() self.create_model() @@ -1681,70 +1657,6 @@ def topology(self) -> TopologyAccessor: self._topology = TopologyAccessor(self) return self._topology - def plot_network( - self, - path: bool | str | pathlib.Path = 'flow_system.html', - controls: bool - | list[ - Literal['nodes', 'edges', 'layout', 'interaction', 'manipulation', 'physics', 'selection', 'renderer'] - ] = True, - show: bool | None = None, - ) -> pyvis.network.Network | None: - """ - Deprecated: Use `flow_system.topology.plot()` instead. - - Visualizes the network structure of a FlowSystem using PyVis. - """ - warnings.warn( - f'plot_network() is deprecated and will be removed in v{DEPRECATION_REMOVAL_VERSION}. ' - 'Use flow_system.topology.plot() instead.', - DeprecationWarning, - stacklevel=2, - ) - return self.topology.plot_legacy(path=path, controls=controls, show=show) - - def start_network_app(self) -> None: - """ - Deprecated: Use `flow_system.topology.start_app()` instead. - - Visualizes the network structure using Dash and Cytoscape. - """ - warnings.warn( - f'start_network_app() is deprecated and will be removed in v{DEPRECATION_REMOVAL_VERSION}. ' - 'Use flow_system.topology.start_app() instead.', - DeprecationWarning, - stacklevel=2, - ) - self.topology.start_app() - - def stop_network_app(self) -> None: - """ - Deprecated: Use `flow_system.topology.stop_app()` instead. - - Stop the network visualization server. - """ - warnings.warn( - f'stop_network_app() is deprecated and will be removed in v{DEPRECATION_REMOVAL_VERSION}. ' - 'Use flow_system.topology.stop_app() instead.', - DeprecationWarning, - stacklevel=2, - ) - self.topology.stop_app() - - def network_infos(self) -> tuple[dict[str, dict[str, str]], dict[str, dict[str, str]]]: - """ - Deprecated: Use `flow_system.topology.infos()` instead. - - Get network topology information as dictionaries. - """ - warnings.warn( - f'network_infos() is deprecated and will be removed in v{DEPRECATION_REMOVAL_VERSION}. ' - 'Use flow_system.topology.infos() instead.', - DeprecationWarning, - stacklevel=2, - ) - return self.topology.infos() - def _check_if_element_is_unique(self, element: Element) -> None: """ checks if element or label of element already exists in list @@ -2224,270 +2136,6 @@ def scenario_independent_flow_rates(self, value: bool | list[str]) -> None: self._validate_scenario_parameter(value, 'scenario_independent_flow_rates', 'Flow.label_full') self._scenario_independent_flow_rates = value - @classmethod - def _dataset_sel( - cls, - dataset: xr.Dataset, - time: str | slice | list[str] | pd.Timestamp | pd.DatetimeIndex | None = None, - period: int | slice | list[int] | pd.Index | None = None, - scenario: str | slice | list[str] | pd.Index | None = None, - hours_of_last_timestep: int | float | None = None, - hours_of_previous_timesteps: int | float | np.ndarray | None = None, - ) -> xr.Dataset: - """ - Select subset of dataset by label (for power users to avoid conversion overhead). - - This method operates directly on xarray Datasets, allowing power users to chain - operations efficiently without repeated FlowSystem conversions: - - Example: - # Power user pattern (single conversion): - >>> ds = flow_system.to_dataset() - >>> ds = FlowSystem._dataset_sel(ds, time='2020-01') - >>> ds = FlowSystem._dataset_resample(ds, freq='2h', method='mean') - >>> result = FlowSystem.from_dataset(ds) - - # vs. simple pattern (multiple conversions): - >>> result = flow_system.sel(time='2020-01').resample('2h') - - Args: - dataset: xarray Dataset from FlowSystem.to_dataset() - time: Time selection (e.g., '2020-01', slice('2020-01-01', '2020-06-30')) - period: Period selection (e.g., 2020, slice(2020, 2022)) - scenario: Scenario selection (e.g., 'Base Case', ['Base Case', 'High Demand']) - hours_of_last_timestep: Duration of the last timestep. If None, computed from the selected time index. - hours_of_previous_timesteps: Duration of previous timesteps. If None, computed from the selected time index. - Can be a scalar or array. - - Returns: - xr.Dataset: Selected dataset - """ - warnings.warn( - f'\n_dataset_sel() is deprecated and will be removed in {DEPRECATION_REMOVAL_VERSION}. ' - 'Use TransformAccessor._dataset_sel() instead.', - DeprecationWarning, - stacklevel=2, - ) - from .transform_accessor import TransformAccessor - - return TransformAccessor._dataset_sel( - dataset, - time=time, - period=period, - scenario=scenario, - hours_of_last_timestep=hours_of_last_timestep, - hours_of_previous_timesteps=hours_of_previous_timesteps, - ) - - def sel( - self, - time: str | slice | list[str] | pd.Timestamp | pd.DatetimeIndex | None = None, - period: int | slice | list[int] | pd.Index | None = None, - scenario: str | slice | list[str] | pd.Index | None = None, - ) -> FlowSystem: - """ - Select a subset of the flowsystem by label. - - .. deprecated:: - Use ``flow_system.transform.sel()`` instead. Will be removed in v6.0.0. - - Args: - time: Time selection (e.g., slice('2023-01-01', '2023-12-31'), '2023-06-15') - period: Period selection (e.g., slice(2023, 2024), or list of periods) - scenario: Scenario selection (e.g., 'scenario1', or list of scenarios) - - Returns: - FlowSystem: New FlowSystem with selected data (no solution). - """ - warnings.warn( - f'\nsel() is deprecated and will be removed in {DEPRECATION_REMOVAL_VERSION}. ' - 'Use flow_system.transform.sel() instead.', - DeprecationWarning, - stacklevel=2, - ) - return self.transform.sel(time=time, period=period, scenario=scenario) - - @classmethod - def _dataset_isel( - cls, - dataset: xr.Dataset, - time: int | slice | list[int] | None = None, - period: int | slice | list[int] | None = None, - scenario: int | slice | list[int] | None = None, - hours_of_last_timestep: int | float | None = None, - hours_of_previous_timesteps: int | float | np.ndarray | None = None, - ) -> xr.Dataset: - """ - Select subset of dataset by integer index (for power users to avoid conversion overhead). - - See _dataset_sel() for usage pattern. - - Args: - dataset: xarray Dataset from FlowSystem.to_dataset() - time: Time selection by index (e.g., slice(0, 100), [0, 5, 10]) - period: Period selection by index - scenario: Scenario selection by index - hours_of_last_timestep: Duration of the last timestep. If None, computed from the selected time index. - hours_of_previous_timesteps: Duration of previous timesteps. If None, computed from the selected time index. - Can be a scalar or array. - - Returns: - xr.Dataset: Selected dataset - """ - warnings.warn( - f'\n_dataset_isel() is deprecated and will be removed in {DEPRECATION_REMOVAL_VERSION}. ' - 'Use TransformAccessor._dataset_isel() instead.', - DeprecationWarning, - stacklevel=2, - ) - from .transform_accessor import TransformAccessor - - return TransformAccessor._dataset_isel( - dataset, - time=time, - period=period, - scenario=scenario, - hours_of_last_timestep=hours_of_last_timestep, - hours_of_previous_timesteps=hours_of_previous_timesteps, - ) - - def isel( - self, - time: int | slice | list[int] | None = None, - period: int | slice | list[int] | None = None, - scenario: int | slice | list[int] | None = None, - ) -> FlowSystem: - """ - Select a subset of the flowsystem by integer indices. - - .. deprecated:: - Use ``flow_system.transform.isel()`` instead. Will be removed in v6.0.0. - - Args: - time: Time selection by integer index (e.g., slice(0, 100), 50, or [0, 5, 10]) - period: Period selection by integer index - scenario: Scenario selection by integer index - - Returns: - FlowSystem: New FlowSystem with selected data (no solution). - """ - warnings.warn( - f'\nisel() is deprecated and will be removed in {DEPRECATION_REMOVAL_VERSION}. ' - 'Use flow_system.transform.isel() instead.', - DeprecationWarning, - stacklevel=2, - ) - return self.transform.isel(time=time, period=period, scenario=scenario) - - @classmethod - def _dataset_resample( - cls, - dataset: xr.Dataset, - freq: str, - method: Literal['mean', 'sum', 'max', 'min', 'first', 'last', 'std', 'var', 'median', 'count'] = 'mean', - hours_of_last_timestep: int | float | None = None, - hours_of_previous_timesteps: int | float | np.ndarray | None = None, - **kwargs: Any, - ) -> xr.Dataset: - """ - Resample dataset along time dimension (for power users to avoid conversion overhead). - Preserves only the attrs of the Dataset. - - Uses optimized _resample_by_dimension_groups() to avoid broadcasting issues. - See _dataset_sel() for usage pattern. - - Args: - dataset: xarray Dataset from FlowSystem.to_dataset() - freq: Resampling frequency (e.g., '2h', '1D', '1M') - method: Resampling method (e.g., 'mean', 'sum', 'first') - hours_of_last_timestep: Duration of the last timestep after resampling. If None, computed from the last time interval. - hours_of_previous_timesteps: Duration of previous timesteps after resampling. If None, computed from the first time interval. - Can be a scalar or array. - **kwargs: Additional arguments passed to xarray.resample() - - Returns: - xr.Dataset: Resampled dataset - """ - warnings.warn( - f'\n_dataset_resample() is deprecated and will be removed in {DEPRECATION_REMOVAL_VERSION}. ' - 'Use TransformAccessor._dataset_resample() instead.', - DeprecationWarning, - stacklevel=2, - ) - from .transform_accessor import TransformAccessor - - return TransformAccessor._dataset_resample( - dataset, - freq=freq, - method=method, - hours_of_last_timestep=hours_of_last_timestep, - hours_of_previous_timesteps=hours_of_previous_timesteps, - **kwargs, - ) - - @classmethod - def _resample_by_dimension_groups( - cls, - time_dataset: xr.Dataset, - time: str, - method: str, - **kwargs: Any, - ) -> xr.Dataset: - """ - Resample variables grouped by their dimension structure to avoid broadcasting. - - .. deprecated:: - Use ``TransformAccessor._resample_by_dimension_groups()`` instead. - Will be removed in v6.0.0. - """ - warnings.warn( - f'\n_resample_by_dimension_groups() is deprecated and will be removed in {DEPRECATION_REMOVAL_VERSION}. ' - 'Use TransformAccessor._resample_by_dimension_groups() instead.', - DeprecationWarning, - stacklevel=2, - ) - from .transform_accessor import TransformAccessor - - return TransformAccessor._resample_by_dimension_groups(time_dataset, time, method, **kwargs) - - def resample( - self, - time: str, - method: Literal['mean', 'sum', 'max', 'min', 'first', 'last', 'std', 'var', 'median', 'count'] = 'mean', - hours_of_last_timestep: int | float | None = None, - hours_of_previous_timesteps: int | float | np.ndarray | None = None, - **kwargs: Any, - ) -> FlowSystem: - """ - Create a resampled FlowSystem by resampling data along the time dimension. - - .. deprecated:: - Use ``flow_system.transform.resample()`` instead. Will be removed in v6.0.0. - - Args: - time: Resampling frequency (e.g., '3h', '2D', '1M') - method: Resampling method. Recommended: 'mean', 'first', 'last', 'max', 'min' - hours_of_last_timestep: Duration of the last timestep after resampling. - hours_of_previous_timesteps: Duration of previous timesteps after resampling. - **kwargs: Additional arguments passed to xarray.resample() - - Returns: - FlowSystem: New resampled FlowSystem (no solution). - """ - warnings.warn( - f'\nresample() is deprecated and will be removed in {DEPRECATION_REMOVAL_VERSION}. ' - 'Use flow_system.transform.resample() instead.', - DeprecationWarning, - stacklevel=2, - ) - return self.transform.resample( - time=time, - method=method, - hours_of_last_timestep=hours_of_last_timestep, - hours_of_previous_timesteps=hours_of_previous_timesteps, - **kwargs, - ) - @property def connected_and_transformed(self) -> bool: return self._connected_and_transformed diff --git a/flixopt/optimization.py b/flixopt/optimization.py index 0b567387f..73361b0f1 100644 --- a/flixopt/optimization.py +++ b/flixopt/optimization.py @@ -82,7 +82,6 @@ def _initialize_optimization_common( name: str, flow_system: FlowSystem, folder: pathlib.Path | None = None, - normalize_weights: bool | None = None, ) -> None: """ Shared initialization logic for all optimization types. @@ -95,7 +94,6 @@ def _initialize_optimization_common( name: Name of the optimization flow_system: FlowSystem to optimize folder: Directory for saving results - normalize_weights: Deprecated. Scenario weights are now always normalized in FlowSystem. """ obj.name = name @@ -106,9 +104,6 @@ def _initialize_optimization_common( ) flow_system = flow_system.copy() - # normalize_weights is deprecated but kept for backwards compatibility - obj.normalize_weights = True # Always True now - flow_system._used_in_optimization = True obj.flow_system = flow_system @@ -138,7 +133,6 @@ class Optimization: name: name of optimization flow_system: flow_system which should be optimized folder: folder where results should be saved. If None, then the current working directory is used. - normalize_weights: Whether to automatically normalize the weights of scenarios to sum up to 1 when solving. Examples: Basic usage: @@ -159,14 +153,12 @@ class Optimization: results: Results | None durations: dict[str, float] model: FlowSystemModel | None - normalize_weights: bool def __init__( self, name: str, flow_system: FlowSystem, folder: pathlib.Path | None = None, - normalize_weights: bool = True, ): warnings.warn( f'Optimization is deprecated and will be removed in v{DEPRECATION_REMOVAL_VERSION}. ' @@ -180,7 +172,6 @@ def __init__( name=name, flow_system=flow_system, folder=folder, - normalize_weights=normalize_weights, ) def do_modeling(self) -> Optimization: @@ -470,7 +461,6 @@ class SegmentedOptimization: results: SegmentedResults | None durations: dict[str, float] model: None # SegmentedOptimization doesn't use a single model - normalize_weights: bool def __init__( self, @@ -540,7 +530,9 @@ def _create_sub_optimizations(self): for i, (segment_name, timesteps_of_segment) in enumerate( zip(self.segment_names, self._timesteps_per_segment, strict=True) ): - calc = Optimization(f'{self.name}-{segment_name}', self.flow_system.sel(time=timesteps_of_segment)) + calc = Optimization( + f'{self.name}-{segment_name}', self.flow_system.transform.sel(time=timesteps_of_segment) + ) calc.flow_system._connect_network() # Connect to have Correct names of Flows! self.sub_optimizations.append(calc) diff --git a/flixopt/optimize_accessor.py b/flixopt/optimize_accessor.py index 7aee930a4..9f3c00d10 100644 --- a/flixopt/optimize_accessor.py +++ b/flixopt/optimize_accessor.py @@ -53,7 +53,7 @@ def __init__(self, flow_system: FlowSystem) -> None: """ self._fs = flow_system - def __call__(self, solver: _Solver, normalize_weights: bool | None = None) -> FlowSystem: + def __call__(self, solver: _Solver) -> FlowSystem: """ Build and solve the optimization model in one step. @@ -64,7 +64,6 @@ def __call__(self, solver: _Solver, normalize_weights: bool | None = None) -> Fl Args: solver: The solver to use (e.g., HighsSolver, GurobiSolver). - normalize_weights: Deprecated. Scenario weights are now always normalized in FlowSystem. Returns: The FlowSystem, for method chaining. @@ -85,17 +84,6 @@ def __call__(self, solver: _Solver, normalize_weights: bool | None = None) -> Fl >>> solution = flow_system.optimize(solver).solution """ - if normalize_weights is not None: - import warnings - - from .config import DEPRECATION_REMOVAL_VERSION - - warnings.warn( - f'\n\nnormalize_weights parameter is deprecated and will be removed in {DEPRECATION_REMOVAL_VERSION}. ' - 'Scenario weights are now always normalized when set on FlowSystem.\n', - DeprecationWarning, - stacklevel=2, - ) self._fs.build_model() self._fs.solve(solver) return self._fs diff --git a/tests/deprecated/examples/00_Minmal/minimal_example.py b/tests/deprecated/examples/00_Minmal/minimal_example.py deleted file mode 100644 index 207faa9a9..000000000 --- a/tests/deprecated/examples/00_Minmal/minimal_example.py +++ /dev/null @@ -1,36 +0,0 @@ -""" -This script shows how to use the flixopt framework to model a super minimalistic energy system in the most concise way possible. -THis can also be used to create proposals for new features, bug reports etc -""" - -import numpy as np -import pandas as pd - -import flixopt as fx - -if __name__ == '__main__': - fx.CONFIG.silent() - flow_system = fx.FlowSystem(pd.date_range('2020-01-01', periods=3, freq='h')) - - flow_system.add_elements( - fx.Bus('Heat'), - fx.Bus('Gas'), - fx.Effect('Costs', '€', 'Cost', is_standard=True, is_objective=True), - fx.linear_converters.Boiler( - 'Boiler', - thermal_efficiency=0.5, - thermal_flow=fx.Flow(label='Heat', bus='Heat', size=50), - fuel_flow=fx.Flow(label='Gas', bus='Gas'), - ), - fx.Sink( - 'Sink', - inputs=[fx.Flow(label='Demand', bus='Heat', size=1, fixed_relative_profile=np.array([30, 0, 20]))], - ), - fx.Source( - 'Source', - outputs=[fx.Flow(label='Gas', bus='Gas', size=1000, effects_per_flow_hour=0.04)], - ), - ) - - flow_system.optimize(fx.solvers.HighsSolver(0.01, 60)) - flow_system.statistics.plot.balance('Heat') diff --git a/tests/deprecated/examples/01_Simple/simple_example.py b/tests/deprecated/examples/01_Simple/simple_example.py deleted file mode 100644 index b63260ece..000000000 --- a/tests/deprecated/examples/01_Simple/simple_example.py +++ /dev/null @@ -1,126 +0,0 @@ -""" -This script shows how to use the flixopt framework to model a simple energy system. -""" - -import numpy as np -import pandas as pd - -import flixopt as fx - -if __name__ == '__main__': - fx.CONFIG.exploring() - - # --- Create Time Series Data --- - # Heat demand profile (e.g., kW) over time and corresponding power prices - heat_demand_per_h = np.array([30, 0, 90, 110, 110, 20, 20, 20, 20]) - power_prices = 1 / 1000 * np.array([80, 80, 80, 80, 80, 80, 80, 80, 80]) - - # Create datetime array starting from '2020-01-01' for the given time period - timesteps = pd.date_range('2020-01-01', periods=len(heat_demand_per_h), freq='h') - flow_system = fx.FlowSystem(timesteps=timesteps) - - # --- Define Energy Buses --- - # These represent nodes, where the used medias are balanced (electricity, heat, and gas) - # Carriers provide automatic color assignment in plots (yellow for electricity, red for heat, etc.) - flow_system.add_elements( - fx.Bus(label='Strom', carrier='electricity'), - fx.Bus(label='Fernwärme', carrier='heat'), - fx.Bus(label='Gas', carrier='gas'), - ) - - # --- Define Effects (Objective and CO2 Emissions) --- - # Cost effect: used as the optimization objective --> minimizing costs - costs = fx.Effect( - label='costs', - unit='€', - description='Kosten', - is_standard=True, # standard effect: no explicit value needed for costs - is_objective=True, # Minimizing costs as the optimization objective - share_from_temporal={'CO2': 0.2}, - ) - - # CO2 emissions effect with an associated cost impact - CO2 = fx.Effect( - label='CO2', - unit='kg', - description='CO2_e-Emissionen', - maximum_per_hour=1000, # Max CO2 emissions per hour - ) - - # --- Define Flow System Components --- - # Boiler: Converts fuel (gas) into thermal energy (heat) - boiler = fx.linear_converters.Boiler( - label='Boiler', - thermal_efficiency=0.5, - thermal_flow=fx.Flow(label='Q_th', bus='Fernwärme', size=50, relative_minimum=0.1, relative_maximum=1), - fuel_flow=fx.Flow(label='Q_fu', bus='Gas'), - ) - - # Combined Heat and Power (CHP): Generates both electricity and heat from fuel - chp = fx.linear_converters.CHP( - label='CHP', - thermal_efficiency=0.5, - electrical_efficiency=0.4, - electrical_flow=fx.Flow('P_el', bus='Strom', size=60, relative_minimum=5 / 60), - thermal_flow=fx.Flow('Q_th', bus='Fernwärme'), - fuel_flow=fx.Flow('Q_fu', bus='Gas'), - ) - - # Storage: Energy storage system with charging and discharging capabilities - storage = fx.Storage( - label='Storage', - charging=fx.Flow('Q_th_load', bus='Fernwärme', size=1000), - discharging=fx.Flow('Q_th_unload', bus='Fernwärme', size=1000), - capacity_in_flow_hours=fx.InvestParameters(effects_of_investment=20, fixed_size=30, mandatory=True), - initial_charge_state=0, # Initial storage state: empty - relative_maximum_charge_state=1 / 100 * np.array([80, 70, 80, 80, 80, 80, 80, 80, 80]), - relative_maximum_final_charge_state=0.8, - eta_charge=0.9, - eta_discharge=1, # Efficiency factors for charging/discharging - relative_loss_per_hour=0.08, # 8% loss per hour. Absolute loss depends on current charge state - prevent_simultaneous_charge_and_discharge=True, # Prevent charging and discharging at the same time - ) - - # Heat Demand Sink: Represents a fixed heat demand profile - heat_sink = fx.Sink( - label='Heat Demand', - inputs=[fx.Flow(label='Q_th_Last', bus='Fernwärme', size=1, fixed_relative_profile=heat_demand_per_h)], - ) - - # Gas Source: Gas tariff source with associated costs and CO2 emissions - gas_source = fx.Source( - label='Gastarif', - outputs=[ - fx.Flow(label='Q_Gas', bus='Gas', size=1000, effects_per_flow_hour={costs.label: 0.04, CO2.label: 0.3}) - ], - ) - - # Power Sink: Represents the export of electricity to the grid - power_sink = fx.Sink( - label='Einspeisung', inputs=[fx.Flow(label='P_el', bus='Strom', effects_per_flow_hour=-1 * power_prices)] - ) - - # --- Build the Flow System --- - # Add all defined components and effects to the flow system - flow_system.add_elements(costs, CO2, boiler, storage, chp, heat_sink, gas_source, power_sink) - - # Visualize the flow system for validation purposes - flow_system.topology.plot() - - # --- Define and Solve Optimization --- - flow_system.optimize(fx.solvers.HighsSolver(mip_gap=0, time_limit_seconds=30)) - - # --- Analyze Results --- - # Plotting through statistics accessor - returns PlotResult with .data and .figure - flow_system.statistics.plot.balance('Fernwärme') - flow_system.statistics.plot.balance('Storage') - flow_system.statistics.plot.heatmap('CHP(Q_th)') - flow_system.statistics.plot.heatmap('Storage') - - # Access data as xarray Datasets - print(flow_system.statistics.flow_rates) - print(flow_system.statistics.charge_states) - - # Duration curve and effects analysis - flow_system.statistics.plot.duration_curve('Boiler(Q_th)') - print(flow_system.statistics.temporal_effects) diff --git a/tests/deprecated/examples/02_Complex/complex_example.py b/tests/deprecated/examples/02_Complex/complex_example.py deleted file mode 100644 index f21fd0533..000000000 --- a/tests/deprecated/examples/02_Complex/complex_example.py +++ /dev/null @@ -1,207 +0,0 @@ -""" -This script shows how to use the flixopt framework to model a more complex energy system. -""" - -import numpy as np -import pandas as pd - -import flixopt as fx - -if __name__ == '__main__': - fx.CONFIG.exploring() - - # --- Experiment Options --- - # Configure options for testing various parameters and behaviors - check_penalty = False - imbalance_penalty = 1e5 - use_chp_with_piecewise_conversion = True - - # --- Define Demand and Price Profiles --- - # Input data for electricity and heat demands, as well as electricity price - electricity_demand = np.array([70, 80, 90, 90, 90, 90, 90, 90, 90]) - heat_demand = ( - np.array([30, 0, 90, 110, 2000, 20, 20, 20, 20]) - if check_penalty - else np.array([30, 0, 90, 110, 110, 20, 20, 20, 20]) - ) - electricity_price = np.array([40, 40, 40, 40, 40, 40, 40, 40, 40]) - - # --- Define the Flow System, that will hold all elements, and the time steps you want to model --- - timesteps = pd.date_range('2020-01-01', periods=len(heat_demand), freq='h') - flow_system = fx.FlowSystem(timesteps) # Create FlowSystem - - # --- Define Energy Buses --- - # Represent node balances (inputs=outputs) for the different energy carriers (electricity, heat, gas) in the system - # Carriers provide automatic color assignment in plots (yellow for electricity, red for heat, blue for gas) - flow_system.add_elements( - fx.Bus('Strom', carrier='electricity', imbalance_penalty_per_flow_hour=imbalance_penalty), - fx.Bus('Fernwärme', carrier='heat', imbalance_penalty_per_flow_hour=imbalance_penalty), - fx.Bus('Gas', carrier='gas', imbalance_penalty_per_flow_hour=imbalance_penalty), - ) - - # --- Define Effects --- - # Specify effects related to costs, CO2 emissions, and primary energy consumption - Costs = fx.Effect('costs', '€', 'Kosten', is_standard=True, is_objective=True, share_from_temporal={'CO2': 0.2}) - CO2 = fx.Effect('CO2', 'kg', 'CO2_e-Emissionen') - PE = fx.Effect('PE', 'kWh_PE', 'Primärenergie', maximum_total=3.5e3) - - # --- Define Components --- - # 1. Define Boiler Component - # A gas boiler that converts fuel into thermal output, with investment and on-inactive parameters - Gaskessel = fx.linear_converters.Boiler( - 'Kessel', - thermal_efficiency=0.5, # Efficiency ratio - status_parameters=fx.StatusParameters( - effects_per_active_hour={Costs.label: 0, CO2.label: 1000} - ), # CO2 emissions per hour - thermal_flow=fx.Flow( - label='Q_th', # Thermal output - bus='Fernwärme', # Linked bus - size=fx.InvestParameters( - effects_of_investment=1000, # Fixed investment costs - fixed_size=50, # Fixed size - mandatory=True, # Forced investment - effects_of_investment_per_size={Costs.label: 10, PE.label: 2}, # Specific costs - ), - load_factor_max=1.0, # Maximum load factor (50 kW) - load_factor_min=0.1, # Minimum load factor (5 kW) - relative_minimum=5 / 50, # Minimum part load - relative_maximum=1, # Maximum part load - previous_flow_rate=50, # Previous flow rate - flow_hours_max=1e6, # Total energy flow limit - status_parameters=fx.StatusParameters( - active_hours_min=0, # Minimum operating hours - active_hours_max=1000, # Maximum operating hours - max_uptime=10, # Max consecutive operating hours - min_uptime=np.array([1, 1, 1, 1, 1, 2, 2, 2, 2]), # min consecutive operation hours - max_downtime=10, # Max consecutive inactive hours - effects_per_startup={Costs.label: 0.01}, # Cost per startup - startup_limit=1000, # Max number of starts - ), - ), - fuel_flow=fx.Flow(label='Q_fu', bus='Gas', size=200), - ) - - # 2. Define CHP Unit - # Combined Heat and Power unit that generates both electricity and heat from fuel - bhkw = fx.linear_converters.CHP( - 'BHKW2', - thermal_efficiency=0.5, - electrical_efficiency=0.4, - status_parameters=fx.StatusParameters(effects_per_startup={Costs.label: 0.01}), - electrical_flow=fx.Flow('P_el', bus='Strom', size=60, relative_minimum=5 / 60), - thermal_flow=fx.Flow('Q_th', bus='Fernwärme', size=1e3), - fuel_flow=fx.Flow('Q_fu', bus='Gas', size=1e3, previous_flow_rate=20), # The CHP was ON previously - ) - - # 3. Define CHP with Piecewise Conversion - # This CHP unit uses piecewise conversion for more dynamic behavior over time - P_el = fx.Flow('P_el', bus='Strom', size=60, previous_flow_rate=20) - Q_th = fx.Flow('Q_th', bus='Fernwärme', size=100) # Size required for status_parameters - Q_fu = fx.Flow('Q_fu', bus='Gas', size=200) # Size required for status_parameters - piecewise_conversion = fx.PiecewiseConversion( - { - P_el.label: fx.Piecewise([fx.Piece(5, 30), fx.Piece(40, 60)]), - Q_th.label: fx.Piecewise([fx.Piece(6, 35), fx.Piece(45, 100)]), - Q_fu.label: fx.Piecewise([fx.Piece(12, 70), fx.Piece(90, 200)]), - } - ) - - bhkw_2 = fx.LinearConverter( - 'BHKW2', - inputs=[Q_fu], - outputs=[P_el, Q_th], - piecewise_conversion=piecewise_conversion, - status_parameters=fx.StatusParameters(effects_per_startup={Costs.label: 0.01}), - ) - - # 4. Define Storage Component - # Storage with variable size and piecewise investment effects - segmented_investment_effects = fx.PiecewiseEffects( - piecewise_origin=fx.Piecewise([fx.Piece(5, 25), fx.Piece(25, 100)]), - piecewise_shares={ - Costs.label: fx.Piecewise([fx.Piece(50, 250), fx.Piece(250, 800)]), - PE.label: fx.Piecewise([fx.Piece(5, 25), fx.Piece(25, 100)]), - }, - ) - - speicher = fx.Storage( - 'Speicher', - charging=fx.Flow('Q_th_load', bus='Fernwärme', size=1e4), - discharging=fx.Flow('Q_th_unload', bus='Fernwärme', size=1e4), - capacity_in_flow_hours=fx.InvestParameters( - piecewise_effects_of_investment=segmented_investment_effects, # Investment effects - mandatory=True, # Forced investment - minimum_size=0, - maximum_size=1000, # Optimizing between 0 and 1000 kWh - ), - initial_charge_state=0, # Initial charge state - maximal_final_charge_state=10, # Maximum final charge state - eta_charge=0.9, - eta_discharge=1, # Charge/discharge efficiency - relative_loss_per_hour=0.08, # Energy loss per hour, relative to current charge state - prevent_simultaneous_charge_and_discharge=True, # Prevent simultaneous charge/discharge - ) - - # 5. Define Sinks and Sources - # 5.a) Heat demand profile - Waermelast = fx.Sink( - 'Wärmelast', - inputs=[ - fx.Flow( - 'Q_th_Last', # Heat sink - bus='Fernwärme', # Linked bus - size=1, - fixed_relative_profile=heat_demand, # Fixed demand profile - ) - ], - ) - - # 5.b) Gas tariff - Gasbezug = fx.Source( - 'Gastarif', - outputs=[ - fx.Flow( - 'Q_Gas', - bus='Gas', # Gas source - size=1000, # Nominal size - effects_per_flow_hour={Costs.label: 0.04, CO2.label: 0.3}, - ) - ], - ) - - # 5.c) Feed-in of electricity - Stromverkauf = fx.Sink( - 'Einspeisung', - inputs=[ - fx.Flow( - 'P_el', - bus='Strom', # Feed-in tariff for electricity - effects_per_flow_hour=-1 * electricity_price, # Negative price for feed-in - ) - ], - ) - - # --- Build FlowSystem --- - # Select components to be included in the flow system - flow_system.add_elements(Costs, CO2, PE, Gaskessel, Waermelast, Gasbezug, Stromverkauf, speicher) - flow_system.add_elements(bhkw_2) if use_chp_with_piecewise_conversion else flow_system.add_elements(bhkw) - - print(flow_system) # Get a string representation of the FlowSystem - try: - flow_system.topology.start_app() # Start the network app - except ImportError as e: - print(f'Network app requires extra dependencies: {e}') - - # --- Solve FlowSystem --- - flow_system.optimize(fx.solvers.HighsSolver(0.01, 60)) - - # --- Results --- - # Save the flow system with solution to file for later analysis - flow_system.to_netcdf('results/complex_example.nc') - - # Plot results using the statistics accessor - flow_system.statistics.plot.heatmap('BHKW2(Q_th)') # Flow label - auto-resolves to flow_rate - flow_system.statistics.plot.balance('BHKW2') - flow_system.statistics.plot.heatmap('Speicher') # Storage label - auto-resolves to charge_state - flow_system.statistics.plot.balance('Fernwärme') diff --git a/tests/deprecated/examples/02_Complex/complex_example_results.py b/tests/deprecated/examples/02_Complex/complex_example_results.py deleted file mode 100644 index 6978caff1..000000000 --- a/tests/deprecated/examples/02_Complex/complex_example_results.py +++ /dev/null @@ -1,38 +0,0 @@ -""" -This script shows how to load results of a prior optimization and how to analyze them. -""" - -import flixopt as fx - -if __name__ == '__main__': - fx.CONFIG.exploring() - - # --- Load FlowSystem with Solution --- - try: - flow_system = fx.FlowSystem.from_netcdf('results/complex_example.nc') - except FileNotFoundError as e: - raise FileNotFoundError( - f"Results file not found ('results/complex_example.nc'). " - f"Please ensure that the file is generated by running 'complex_example.py'. " - f'Original error: {e}' - ) from e - - # --- Basic overview --- - flow_system.topology.plot() - flow_system.statistics.plot.balance('Fernwärme') - - # --- Detailed Plots --- - # In-depth plot for individual flow rates - flow_system.statistics.plot.heatmap('Wärmelast(Q_th_Last)|flow_rate') - - # Plot balances for all buses - for bus in flow_system.buses.values(): - flow_system.statistics.plot.balance(bus.label).to_html(f'results/{bus.label}--balance.html') - - # --- Plotting internal variables manually --- - flow_system.statistics.plot.heatmap('BHKW2(Q_th)|status') - flow_system.statistics.plot.heatmap('Kessel(Q_th)|status') - - # Access data as DataFrames: - print(flow_system.statistics.flow_rates.to_dataframe()) - print(flow_system.solution.to_dataframe()) diff --git a/tests/deprecated/examples/03_Optimization_modes/example_optimization_modes.py b/tests/deprecated/examples/03_Optimization_modes/example_optimization_modes.py deleted file mode 100644 index 02e167c40..000000000 --- a/tests/deprecated/examples/03_Optimization_modes/example_optimization_modes.py +++ /dev/null @@ -1,259 +0,0 @@ -""" -This script demonstrates how to use the different calculation types in the flixopt framework -to model the same energy system. The results will be compared to each other. -""" - -import pathlib - -import pandas as pd -import xarray as xr - -import flixopt as fx - - -# Get solutions for plotting for different optimizations -def get_solutions(optimizations: list, variable: str) -> xr.Dataset: - dataarrays = [] - for optimization in optimizations: - if optimization.name == 'Segmented': - # SegmentedOptimization requires special handling to remove overlaps - dataarrays.append(optimization.results.solution_without_overlap(variable).rename(optimization.name)) - else: - # For Full and Clustered, access solution from the flow_system - dataarrays.append(optimization.flow_system.solution[variable].rename(optimization.name)) - return xr.merge(dataarrays, join='outer') - - -if __name__ == '__main__': - fx.CONFIG.exploring() - - # Calculation Types - full, segmented, aggregated = True, True, True - - # Segmented Properties - segment_length, overlap_length = 96, 1 - - # Clustering Properties - n_clusters = 4 - cluster_duration = '6h' - include_storage = False - keep_extreme_periods = True - imbalance_penalty = 1e5 # or set to None if not needed - - # Data Import - data_import = pd.read_csv( - pathlib.Path(__file__).parents[4] / 'docs' / 'notebooks' / 'data' / 'Zeitreihen2020.csv', index_col=0 - ).sort_index() - filtered_data = data_import['2020-01-01':'2020-01-07 23:45:00'] - # filtered_data = data_import[0:500] # Alternatively filter by index - - filtered_data.index = pd.to_datetime(filtered_data.index) - timesteps = filtered_data.index - - # Access specific columns and convert to 1D-numpy array - electricity_demand = filtered_data['P_Netz/MW'].to_numpy() - heat_demand = filtered_data['Q_Netz/MW'].to_numpy() - electricity_price = filtered_data['Strompr.€/MWh'].to_numpy() - gas_price = filtered_data['Gaspr.€/MWh'].to_numpy() - - # TimeSeriesData objects - TS_heat_demand = fx.TimeSeriesData(heat_demand) - TS_electricity_demand = fx.TimeSeriesData(electricity_demand, clustering_weight=0.7) - TS_electricity_price_sell = fx.TimeSeriesData(-(electricity_price - 0.5), clustering_group='p_el') - TS_electricity_price_buy = fx.TimeSeriesData(electricity_price + 0.5, clustering_group='p_el') - - flow_system = fx.FlowSystem(timesteps) - flow_system.add_elements( - fx.Bus('Strom', carrier='electricity', imbalance_penalty_per_flow_hour=imbalance_penalty), - fx.Bus('Fernwärme', carrier='heat', imbalance_penalty_per_flow_hour=imbalance_penalty), - fx.Bus('Gas', carrier='gas', imbalance_penalty_per_flow_hour=imbalance_penalty), - fx.Bus('Kohle', carrier='fuel', imbalance_penalty_per_flow_hour=imbalance_penalty), - ) - - # Effects - costs = fx.Effect('costs', '€', 'Kosten', is_standard=True, is_objective=True) - CO2 = fx.Effect('CO2', 'kg', 'CO2_e-Emissionen') - PE = fx.Effect('PE', 'kWh_PE', 'Primärenergie') - - # Component Definitions - - # 1. Boiler - a_gaskessel = fx.linear_converters.Boiler( - 'Kessel', - thermal_efficiency=0.85, - thermal_flow=fx.Flow(label='Q_th', bus='Fernwärme'), - fuel_flow=fx.Flow( - label='Q_fu', - bus='Gas', - size=95, - relative_minimum=12 / 95, - previous_flow_rate=20, - status_parameters=fx.StatusParameters(effects_per_startup=1000), - ), - ) - - # 2. CHP - a_kwk = fx.linear_converters.CHP( - 'BHKW2', - thermal_efficiency=0.58, - electrical_efficiency=0.22, - status_parameters=fx.StatusParameters(effects_per_startup=24000), - electrical_flow=fx.Flow('P_el', bus='Strom', size=200), - thermal_flow=fx.Flow('Q_th', bus='Fernwärme', size=200), - fuel_flow=fx.Flow('Q_fu', bus='Kohle', size=288, relative_minimum=87 / 288, previous_flow_rate=100), - ) - - # 3. Storage - a_speicher = fx.Storage( - 'Speicher', - capacity_in_flow_hours=684, - initial_charge_state=137, - minimal_final_charge_state=137, - maximal_final_charge_state=158, - eta_charge=1, - eta_discharge=1, - relative_loss_per_hour=0.001, - prevent_simultaneous_charge_and_discharge=True, - charging=fx.Flow('Q_th_load', size=137, bus='Fernwärme'), - discharging=fx.Flow('Q_th_unload', size=158, bus='Fernwärme'), - ) - - # 4. Sinks and Sources - # Heat Load Profile - a_waermelast = fx.Sink( - 'Wärmelast', inputs=[fx.Flow('Q_th_Last', bus='Fernwärme', size=1, fixed_relative_profile=TS_heat_demand)] - ) - - # Electricity Feed-in - a_strom_last = fx.Sink( - 'Stromlast', inputs=[fx.Flow('P_el_Last', bus='Strom', size=1, fixed_relative_profile=TS_electricity_demand)] - ) - - # Gas Tariff - a_gas_tarif = fx.Source( - 'Gastarif', - outputs=[ - fx.Flow('Q_Gas', bus='Gas', size=1000, effects_per_flow_hour={costs.label: gas_price, CO2.label: 0.3}) - ], - ) - - # Coal Tariff - a_kohle_tarif = fx.Source( - 'Kohletarif', - outputs=[fx.Flow('Q_Kohle', bus='Kohle', size=1000, effects_per_flow_hour={costs.label: 4.6, CO2.label: 0.3})], - ) - - # Electricity Tariff and Feed-in - a_strom_einspeisung = fx.Sink( - 'Einspeisung', inputs=[fx.Flow('P_el', bus='Strom', size=1000, effects_per_flow_hour=TS_electricity_price_sell)] - ) - - a_strom_tarif = fx.Source( - 'Stromtarif', - outputs=[ - fx.Flow( - 'P_el', - bus='Strom', - size=1000, - effects_per_flow_hour={costs.label: TS_electricity_price_buy, CO2.label: 0.3}, - ) - ], - ) - - # Flow System Setup - flow_system.add_elements(costs, CO2, PE) - flow_system.add_elements( - a_gaskessel, - a_waermelast, - a_strom_last, - a_gas_tarif, - a_kohle_tarif, - a_strom_einspeisung, - a_strom_tarif, - a_kwk, - a_speicher, - ) - flow_system.topology.plot() - - # Optimizations - optimizations: list[fx.Optimization | fx.ClusteredOptimization | fx.SegmentedOptimization] = [] - - if full: - optimization = fx.Optimization('Full', flow_system.copy()) - optimization.do_modeling() - optimization.solve(fx.solvers.HighsSolver(0.01 / 100, 60)) - optimizations.append(optimization) - - if segmented: - optimization = fx.SegmentedOptimization('Segmented', flow_system.copy(), segment_length, overlap_length) - optimization.do_modeling_and_solve(fx.solvers.HighsSolver(0.01 / 100, 60)) - optimizations.append(optimization) - - if aggregated: - # Use the new transform.cluster() API - # Note: time_series_for_high_peaks/low_peaks expect string labels matching dataset variables - time_series_for_high_peaks = ['Wärmelast(Q_th_Last)|fixed_relative_profile'] if keep_extreme_periods else None - time_series_for_low_peaks = ( - ['Stromlast(P_el_Last)|fixed_relative_profile', 'Wärmelast(Q_th_Last)|fixed_relative_profile'] - if keep_extreme_periods - else None - ) - - clustered_fs = flow_system.copy().transform.cluster( - n_clusters=n_clusters, - cluster_duration=cluster_duration, - time_series_for_high_peaks=time_series_for_high_peaks, - time_series_for_low_peaks=time_series_for_low_peaks, - ) - clustered_fs.optimize(fx.solvers.HighsSolver(0.01 / 100, 60)) - - # Wrap in a simple object for compatibility with comparison code - class ClusteredResult: - def __init__(self, name, fs): - self.name = name - self.flow_system = fs - self.durations = {'total': 0} # Placeholder - - optimization = ClusteredResult('Clustered', clustered_fs) - optimizations.append(optimization) - - # --- Plotting for comparison --- - fx.plotting.with_plotly( - get_solutions(optimizations, 'Speicher|charge_state'), - mode='line', - title='Charge State Comparison', - ylabel='Charge state', - xlabel='Time in h', - ).write_html('results/Charge State.html') - - fx.plotting.with_plotly( - get_solutions(optimizations, 'BHKW2(Q_th)|flow_rate'), - mode='line', - title='BHKW2(Q_th) Flow Rate Comparison', - ylabel='Flow rate', - xlabel='Time in h', - ).write_html('results/BHKW2 Thermal Power.html') - - fx.plotting.with_plotly( - get_solutions(optimizations, 'costs(temporal)|per_timestep'), - mode='line', - title='Operation Cost Comparison', - ylabel='Costs [€]', - xlabel='Time in h', - ).write_html('results/Operation Costs.html') - - fx.plotting.with_plotly( - get_solutions(optimizations, 'costs(temporal)|per_timestep').sum('time'), - mode='stacked_bar', - title='Total Cost Comparison', - ylabel='Costs [€]', - ).update_layout(barmode='group').write_html('results/Total Costs.html') - - fx.plotting.with_plotly( - pd.DataFrame( - [calc.durations for calc in optimizations], index=[calc.name for calc in optimizations] - ).to_xarray(), - mode='stacked_bar', - ).update_layout(title='Duration Comparison', xaxis_title='Optimization type', yaxis_title='Time (s)').write_html( - 'results/Speed Comparison.html' - ) diff --git a/tests/deprecated/examples/04_Scenarios/scenario_example.py b/tests/deprecated/examples/04_Scenarios/scenario_example.py deleted file mode 100644 index 820336e93..000000000 --- a/tests/deprecated/examples/04_Scenarios/scenario_example.py +++ /dev/null @@ -1,214 +0,0 @@ -""" -This script shows how to use the flixopt framework to model a simple energy system. -""" - -import numpy as np -import pandas as pd - -import flixopt as fx - -if __name__ == '__main__': - fx.CONFIG.exploring() - - # Create datetime array starting from '2020-01-01' for one week - timesteps = pd.date_range('2020-01-01', periods=24 * 7, freq='h') - scenarios = pd.Index(['Base Case', 'High Demand']) - periods = pd.Index([2020, 2021, 2022]) - - # --- Create Time Series Data --- - # Realistic daily patterns: morning/evening peaks, night/midday lows - np.random.seed(42) - n_hours = len(timesteps) - - # Heat demand: 24-hour patterns (kW) for Base Case and High Demand scenarios - base_daily_pattern = np.array( - [22, 20, 18, 18, 20, 25, 40, 70, 95, 110, 85, 65, 60, 58, 62, 68, 75, 88, 105, 125, 130, 122, 95, 35] - ) - high_daily_pattern = np.array( - [28, 25, 22, 22, 24, 30, 52, 88, 118, 135, 105, 80, 75, 72, 75, 82, 92, 108, 128, 148, 155, 145, 115, 48] - ) - - # Tile and add variation - base_demand = np.tile(base_daily_pattern, n_hours // 24 + 1)[:n_hours] * ( - 1 + np.random.uniform(-0.05, 0.05, n_hours) - ) - high_demand = np.tile(high_daily_pattern, n_hours // 24 + 1)[:n_hours] * ( - 1 + np.random.uniform(-0.07, 0.07, n_hours) - ) - - heat_demand_per_h = pd.DataFrame({'Base Case': base_demand, 'High Demand': high_demand}, index=timesteps) - - # Power prices: hourly factors (night low, peak high) and period escalation (2020-2022) - hourly_price_factors = np.array( - [ - 0.70, - 0.65, - 0.62, - 0.60, - 0.62, - 0.70, - 0.95, - 1.15, - 1.30, - 1.25, - 1.10, - 1.00, - 0.95, - 0.90, - 0.88, - 0.92, - 1.00, - 1.10, - 1.25, - 1.40, - 1.35, - 1.20, - 0.95, - 0.80, - ] - ) - period_base_prices = np.array([0.075, 0.095, 0.135]) # €/kWh for 2020, 2021, 2022 - - price_series = np.zeros((n_hours, 3)) - for period_idx, base_price in enumerate(period_base_prices): - price_series[:, period_idx] = ( - np.tile(hourly_price_factors, n_hours // 24 + 1)[:n_hours] - * base_price - * (1 + np.random.uniform(-0.03, 0.03, n_hours)) - ) - - power_prices = price_series.mean(axis=0) - - # Scenario weights: probability of each scenario occurring - # Base Case: 60% probability, High Demand: 40% probability - scenario_weights = np.array([0.6, 0.4]) - - flow_system = fx.FlowSystem( - timesteps=timesteps, periods=periods, scenarios=scenarios, scenario_weights=scenario_weights - ) - - # --- Define Energy Buses --- - # These represent nodes, where the used medias are balanced (electricity, heat, and gas) - # Carriers provide automatic color assignment in plots (yellow for electricity, red for heat, blue for gas) - flow_system.add_elements( - fx.Bus(label='Strom', carrier='electricity'), - fx.Bus(label='Fernwärme', carrier='heat'), - fx.Bus(label='Gas', carrier='gas'), - ) - - # --- Define Effects (Objective and CO2 Emissions) --- - # Cost effect: used as the optimization objective --> minimizing costs - costs = fx.Effect( - label='costs', - unit='€', - description='Kosten', - is_standard=True, # standard effect: no explicit value needed for costs - is_objective=True, # Minimizing costs as the optimization objective - share_from_temporal={'CO2': 0.2}, # Carbon price: 0.2 €/kg CO2 (e.g., carbon tax) - ) - - # CO2 emissions effect with constraint - # Maximum of 1000 kg CO2/hour represents a regulatory or voluntary emissions limit - CO2 = fx.Effect( - label='CO2', - unit='kg', - description='CO2_e-Emissionen', - maximum_per_hour=1000, # Regulatory emissions limit: 1000 kg CO2/hour - ) - - # --- Define Flow System Components --- - # Boiler: Converts fuel (gas) into thermal energy (heat) - # Modern condensing gas boiler with realistic efficiency - boiler = fx.linear_converters.Boiler( - label='Boiler', - thermal_efficiency=0.92, # Realistic efficiency for modern condensing gas boiler (92%) - thermal_flow=fx.Flow( - label='Q_th', - bus='Fernwärme', - size=100, - relative_minimum=0.1, - relative_maximum=1, - status_parameters=fx.StatusParameters(), - ), - fuel_flow=fx.Flow(label='Q_fu', bus='Gas'), - ) - - # Combined Heat and Power (CHP): Generates both electricity and heat from fuel - # Modern CHP unit with realistic efficiencies (total efficiency ~88%) - chp = fx.linear_converters.CHP( - label='CHP', - thermal_efficiency=0.48, # Realistic thermal efficiency (48%) - electrical_efficiency=0.40, # Realistic electrical efficiency (40%) - electrical_flow=fx.Flow( - 'P_el', bus='Strom', size=80, relative_minimum=5 / 80, status_parameters=fx.StatusParameters() - ), - thermal_flow=fx.Flow('Q_th', bus='Fernwärme'), - fuel_flow=fx.Flow('Q_fu', bus='Gas'), - ) - - # Storage: Thermal energy storage system with charging and discharging capabilities - # Realistic thermal storage parameters (e.g., insulated hot water tank) - storage = fx.Storage( - label='Storage', - charging=fx.Flow('Q_th_load', bus='Fernwärme', size=1000), - discharging=fx.Flow('Q_th_unload', bus='Fernwärme', size=1000), - capacity_in_flow_hours=fx.InvestParameters(effects_of_investment=20, fixed_size=30, mandatory=True), - initial_charge_state=0, # Initial storage state: empty - relative_maximum_final_charge_state=np.array([0.8, 0.5, 0.1]), - eta_charge=0.95, # Realistic charging efficiency (~95%) - eta_discharge=0.98, # Realistic discharging efficiency (~98%) - relative_loss_per_hour=np.array([0.008, 0.015]), # Realistic thermal losses: 0.8-1.5% per hour - prevent_simultaneous_charge_and_discharge=True, # Prevent charging and discharging at the same time - ) - - # Heat Demand Sink: Represents a fixed heat demand profile - heat_sink = fx.Sink( - label='Heat Demand', - inputs=[fx.Flow(label='Q_th_Last', bus='Fernwärme', size=1, fixed_relative_profile=heat_demand_per_h)], - ) - - # Gas Source: Gas tariff source with associated costs and CO2 emissions - # Realistic gas prices varying by period (reflecting 2020-2022 energy crisis) - # 2020: 0.04 €/kWh, 2021: 0.06 €/kWh, 2022: 0.11 €/kWh - gas_prices_per_period = np.array([0.04, 0.06, 0.11]) - - # CO2 emissions factor for natural gas: ~0.202 kg CO2/kWh (realistic value) - gas_co2_emissions = 0.202 - - gas_source = fx.Source( - label='Gastarif', - outputs=[ - fx.Flow( - label='Q_Gas', - bus='Gas', - size=1000, - effects_per_flow_hour={costs.label: gas_prices_per_period, CO2.label: gas_co2_emissions}, - ) - ], - ) - - # Power Sink: Represents the export of electricity to the grid - power_sink = fx.Sink( - label='Einspeisung', inputs=[fx.Flow(label='P_el', bus='Strom', effects_per_flow_hour=-1 * power_prices)] - ) - - # --- Build the Flow System --- - # Add all defined components and effects to the flow system - flow_system.add_elements(costs, CO2, boiler, storage, chp, heat_sink, gas_source, power_sink) - - # Visualize the flow system for validation purposes - flow_system.topology.plot() - - # --- Define and Solve Optimization --- - flow_system.optimize(fx.solvers.HighsSolver(mip_gap=0, time_limit_seconds=30)) - - # --- Analyze Results --- - # Plotting through statistics accessor - returns PlotResult with .data and .figure - flow_system.statistics.plot.heatmap('CHP(Q_th)') # Flow label - auto-resolves to flow_rate - flow_system.statistics.plot.balance('Fernwärme') - flow_system.statistics.plot.balance('Storage') - flow_system.statistics.plot.heatmap('Storage') # Storage label - auto-resolves to charge_state - - # Access data as xarray Datasets - print(flow_system.statistics.flow_rates) - print(flow_system.statistics.charge_states) diff --git a/tests/deprecated/examples/05_Two-stage-optimization/two_stage_optimization.py b/tests/deprecated/examples/05_Two-stage-optimization/two_stage_optimization.py deleted file mode 100644 index 155c6303f..000000000 --- a/tests/deprecated/examples/05_Two-stage-optimization/two_stage_optimization.py +++ /dev/null @@ -1,192 +0,0 @@ -""" -This script demonstrates how to use downsampling of a FlowSystem to effectively reduce the size of a model. -This can be very useful when working with large models or during development, -as it can drastically reduce the computational time. -This leads to faster results and easier debugging. -A common use case is to optimize the investments of a model with a downsampled version of the original model, and then fix the computed sizes when calculating the actual dispatch. -While the final optimum might differ from the global optimum, the solving will be much faster. -""" - -import logging -import pathlib -import timeit - -import numpy as np -import pandas as pd -import xarray as xr - -import flixopt as fx - -logger = logging.getLogger('flixopt') - -if __name__ == '__main__': - fx.CONFIG.exploring() - - # Data Import - data_import = pd.read_csv( - pathlib.Path(__file__).parents[4] / 'docs' / 'notebooks' / 'data' / 'Zeitreihen2020.csv', index_col=0 - ).sort_index() - filtered_data = data_import[:500] - - filtered_data.index = pd.to_datetime(filtered_data.index) - timesteps = filtered_data.index - - # Access specific columns and convert to 1D-numpy array - electricity_demand = filtered_data['P_Netz/MW'].to_numpy() - heat_demand = filtered_data['Q_Netz/MW'].to_numpy() - electricity_price = filtered_data['Strompr.€/MWh'].to_numpy() - gas_price = filtered_data['Gaspr.€/MWh'].to_numpy() - - flow_system = fx.FlowSystem(timesteps) - # Carriers provide automatic color assignment in plots - # Bus imbalance penalties allow slack when two-stage sizing doesn't meet peak demand - imbalance_penalty = 1e5 - flow_system.add_elements( - fx.Bus('Strom', carrier='electricity', imbalance_penalty_per_flow_hour=imbalance_penalty), - fx.Bus('Fernwärme', carrier='heat', imbalance_penalty_per_flow_hour=imbalance_penalty), - fx.Bus('Gas', carrier='gas', imbalance_penalty_per_flow_hour=imbalance_penalty), - fx.Bus('Kohle', carrier='fuel', imbalance_penalty_per_flow_hour=imbalance_penalty), - fx.Effect('costs', '€', 'Kosten', is_standard=True, is_objective=True), - fx.Effect('CO2', 'kg', 'CO2_e-Emissionen'), - fx.Effect('PE', 'kWh_PE', 'Primärenergie'), - fx.linear_converters.Boiler( - 'Kessel', - thermal_efficiency=0.85, - thermal_flow=fx.Flow(label='Q_th', bus='Fernwärme'), - fuel_flow=fx.Flow( - label='Q_fu', - bus='Gas', - size=fx.InvestParameters( - effects_of_investment_per_size={'costs': 1_000}, minimum_size=10, maximum_size=600 - ), - relative_minimum=0.2, - previous_flow_rate=20, - status_parameters=fx.StatusParameters(effects_per_startup=300), - ), - ), - fx.linear_converters.CHP( - 'BHKW2', - thermal_efficiency=0.58, - electrical_efficiency=0.22, - status_parameters=fx.StatusParameters(effects_per_startup=1_000, min_uptime=10, min_downtime=10), - electrical_flow=fx.Flow( - 'P_el', bus='Strom', size=1000 - ), # Large size for big-M (won't constrain optimization) - thermal_flow=fx.Flow( - 'Q_th', bus='Fernwärme', size=1000 - ), # Large size for big-M (won't constrain optimization) - fuel_flow=fx.Flow( - 'Q_fu', - bus='Kohle', - size=fx.InvestParameters( - effects_of_investment_per_size={'costs': 3_000}, minimum_size=10, maximum_size=500 - ), - relative_minimum=0.3, - previous_flow_rate=100, - ), - ), - fx.Storage( - 'Speicher', - capacity_in_flow_hours=fx.InvestParameters( - minimum_size=10, maximum_size=1000, effects_of_investment_per_size={'costs': 60} - ), - initial_charge_state='equals_final', - eta_charge=1, - eta_discharge=1, - relative_loss_per_hour=0.001, - prevent_simultaneous_charge_and_discharge=True, - charging=fx.Flow('Q_th_load', size=200, bus='Fernwärme'), - discharging=fx.Flow('Q_th_unload', size=200, bus='Fernwärme'), - ), - fx.Sink( - 'Wärmelast', inputs=[fx.Flow('Q_th_Last', bus='Fernwärme', size=1, fixed_relative_profile=heat_demand)] - ), - fx.Source( - 'Gastarif', - outputs=[fx.Flow('Q_Gas', bus='Gas', size=1000, effects_per_flow_hour={'costs': gas_price, 'CO2': 0.3})], - ), - fx.Source( - 'Kohletarif', - outputs=[fx.Flow('Q_Kohle', bus='Kohle', size=1000, effects_per_flow_hour={'costs': 4.6, 'CO2': 0.3})], - ), - fx.Source( - 'Einspeisung', - outputs=[ - fx.Flow( - 'P_el', bus='Strom', size=1000, effects_per_flow_hour={'costs': electricity_price + 0.5, 'CO2': 0.3} - ) - ], - ), - fx.Sink( - 'Stromlast', - inputs=[fx.Flow('P_el_Last', bus='Strom', size=1, fixed_relative_profile=electricity_demand)], - ), - fx.Source( - 'Stromtarif', - outputs=[ - fx.Flow('P_el', bus='Strom', size=1000, effects_per_flow_hour={'costs': electricity_price, 'CO2': 0.3}) - ], - ), - ) - - # Separate optimization of flow sizes and dispatch - # Stage 1: Optimize sizes using downsampled (2h) data - start = timeit.default_timer() - calculation_sizing = fx.Optimization('Sizing', flow_system.resample('2h')) - calculation_sizing.do_modeling() - calculation_sizing.solve(fx.solvers.HighsSolver(0.1 / 100, 60)) - timer_sizing = timeit.default_timer() - start - - # Stage 2: Optimize dispatch with fixed sizes from Stage 1 - start = timeit.default_timer() - calculation_dispatch = fx.Optimization('Dispatch', flow_system) - calculation_dispatch.do_modeling() - calculation_dispatch.fix_sizes(calculation_sizing.flow_system.solution) - calculation_dispatch.solve(fx.solvers.HighsSolver(0.1 / 100, 60)) - timer_dispatch = timeit.default_timer() - start - - # Verify sizes were correctly fixed - dispatch_sizes = calculation_dispatch.flow_system.statistics.sizes - sizing_sizes = calculation_sizing.flow_system.statistics.sizes - if np.allclose(dispatch_sizes.to_dataarray(), sizing_sizes.to_dataarray(), rtol=1e-5): - logger.info('Sizes were correctly equalized') - else: - raise RuntimeError('Sizes were not correctly equalized') - - # Combined optimization: optimize both sizes and dispatch together - start = timeit.default_timer() - calculation_combined = fx.Optimization('Combined', flow_system) - calculation_combined.do_modeling() - calculation_combined.solve(fx.solvers.HighsSolver(0.1 / 100, 600)) - timer_combined = timeit.default_timer() - start - - # Comparison of results - access solutions from flow_system - comparison = xr.concat( - [calculation_combined.flow_system.solution, calculation_dispatch.flow_system.solution], dim='mode' - ).assign_coords(mode=['Combined', 'Two-stage']) - comparison['Duration [s]'] = xr.DataArray([timer_combined, timer_sizing + timer_dispatch], dims='mode') - - comparison_main = comparison[ - [ - 'Duration [s]', - 'costs', - 'costs(periodic)', - 'costs(temporal)', - 'BHKW2(Q_fu)|size', - 'Kessel(Q_fu)|size', - 'Speicher|size', - ] - ] - comparison_main = xr.concat( - [ - comparison_main, - ( - (comparison_main.sel(mode='Two-stage') - comparison_main.sel(mode='Combined')) - / comparison_main.sel(mode='Combined') - * 100 - ).assign_coords(mode='Diff [%]'), - ], - dim='mode', - ) - - print(comparison_main.to_pandas().T.round(2)) diff --git a/tests/deprecated/test_flow_system_resample.py b/tests/deprecated/test_flow_system_resample.py deleted file mode 100644 index 549f05208..000000000 --- a/tests/deprecated/test_flow_system_resample.py +++ /dev/null @@ -1,313 +0,0 @@ -"""Integration tests for FlowSystem.resample() - verifies correct data resampling and structure preservation.""" - -import numpy as np -import pandas as pd -import pytest -from numpy.testing import assert_allclose - -import flixopt as fx - - -@pytest.fixture -def simple_fs(): - """Simple FlowSystem with basic components.""" - timesteps = pd.date_range('2023-01-01', periods=24, freq='h') - fs = fx.FlowSystem(timesteps) - fs.add_elements( - fx.Bus('heat'), fx.Effect('costs', unit='€', description='costs', is_objective=True, is_standard=True) - ) - fs.add_elements( - fx.Sink( - label='demand', - inputs=[fx.Flow(label='in', bus='heat', fixed_relative_profile=np.linspace(10, 20, 24), size=1)], - ), - fx.Source( - label='source', outputs=[fx.Flow(label='out', bus='heat', size=50, effects_per_flow_hour={'costs': 0.05})] - ), - ) - return fs - - -@pytest.fixture -def complex_fs(): - """FlowSystem with complex elements (storage, piecewise, invest).""" - timesteps = pd.date_range('2023-01-01', periods=48, freq='h') - fs = fx.FlowSystem(timesteps) - - fs.add_elements( - fx.Bus('heat'), - fx.Bus('elec'), - fx.Effect('costs', unit='€', description='costs', is_objective=True, is_standard=True), - ) - - # Storage - fs.add_elements( - fx.Storage( - label='battery', - charging=fx.Flow('charge', bus='elec', size=10), - discharging=fx.Flow('discharge', bus='elec', size=10), - capacity_in_flow_hours=fx.InvestParameters(fixed_size=100), - ) - ) - - # Piecewise converter - converter = fx.linear_converters.Boiler( - 'boiler', thermal_efficiency=0.9, fuel_flow=fx.Flow('gas', bus='elec'), thermal_flow=fx.Flow('heat', bus='heat') - ) - converter.thermal_flow.size = 100 - fs.add_elements(converter) - - # Component with investment - fs.add_elements( - fx.Source( - label='pv', - outputs=[ - fx.Flow( - 'gen', - bus='elec', - size=fx.InvestParameters(maximum_size=1000, effects_of_investment_per_size={'costs': 100}), - ) - ], - ) - ) - - return fs - - -# === Basic Functionality === - - -@pytest.mark.parametrize('freq,method', [('2h', 'mean'), ('4h', 'sum'), ('6h', 'first')]) -def test_basic_resample(simple_fs, freq, method): - """Test basic resampling preserves structure.""" - fs_r = simple_fs.resample(freq, method=method) - assert len(fs_r.components) == len(simple_fs.components) - assert len(fs_r.buses) == len(simple_fs.buses) - assert len(fs_r.timesteps) < len(simple_fs.timesteps) - - -@pytest.mark.parametrize( - 'method,expected', - [ - ('mean', [15.0, 35.0]), - ('sum', [30.0, 70.0]), - ('first', [10.0, 30.0]), - ('last', [20.0, 40.0]), - ], -) -def test_resample_methods(method, expected): - """Test different resampling methods.""" - ts = pd.date_range('2023-01-01', periods=4, freq='h') - fs = fx.FlowSystem(ts) - fs.add_elements(fx.Bus('b'), fx.Effect('costs', unit='€', description='costs', is_objective=True, is_standard=True)) - fs.add_elements( - fx.Sink( - label='s', - inputs=[fx.Flow(label='in', bus='b', fixed_relative_profile=np.array([10.0, 20.0, 30.0, 40.0]), size=1)], - ) - ) - - fs_r = fs.resample('2h', method=method) - assert_allclose(fs_r.flows['s(in)'].fixed_relative_profile.values, expected, rtol=1e-10) - - -def test_structure_preserved(simple_fs): - """Test all structural elements preserved.""" - fs_r = simple_fs.resample('2h', method='mean') - assert set(simple_fs.components.keys()) == set(fs_r.components.keys()) - assert set(simple_fs.buses.keys()) == set(fs_r.buses.keys()) - assert set(simple_fs.effects.keys()) == set(fs_r.effects.keys()) - - # Flow connections preserved - for label in simple_fs.flows.keys(): - assert simple_fs.flows[label].bus == fs_r.flows[label].bus - assert simple_fs.flows[label].component == fs_r.flows[label].component - - -def test_time_metadata_updated(simple_fs): - """Test time metadata correctly updated.""" - fs_r = simple_fs.resample('3h', method='mean') - assert len(fs_r.timesteps) == 8 - assert_allclose(fs_r.timestep_duration.values, 3.0) - assert fs_r.hours_of_last_timestep == 3.0 - - -# === Advanced Dimensions === - - -@pytest.mark.parametrize( - 'dim_name,dim_value', - [ - ('periods', pd.Index([2023, 2024], name='period')), - ('scenarios', pd.Index(['base', 'high'], name='scenario')), - ], -) -def test_with_dimensions(simple_fs, dim_name, dim_value): - """Test resampling preserves period/scenario dimensions.""" - fs = fx.FlowSystem(simple_fs.timesteps, **{dim_name: dim_value}) - fs.add_elements(fx.Bus('h'), fx.Effect('costs', unit='€', description='costs', is_objective=True, is_standard=True)) - fs.add_elements( - fx.Sink(label='d', inputs=[fx.Flow(label='in', bus='h', fixed_relative_profile=np.ones(24), size=1)]) - ) - - fs_r = fs.resample('2h', method='mean') - assert getattr(fs_r, dim_name) is not None - pd.testing.assert_index_equal(getattr(fs_r, dim_name), dim_value) - - -# === Complex Elements === - - -def test_storage_resample(complex_fs): - """Test storage component resampling.""" - fs_r = complex_fs.resample('4h', method='mean') - assert 'battery' in fs_r.components - storage = fs_r.components['battery'] - assert storage.charging.label == 'charge' - assert storage.discharging.label == 'discharge' - - -def test_converter_resample(complex_fs): - """Test converter component resampling.""" - fs_r = complex_fs.resample('4h', method='mean') - assert 'boiler' in fs_r.components - boiler = fs_r.components['boiler'] - assert hasattr(boiler, 'thermal_efficiency') - - -def test_invest_resample(complex_fs): - """Test investment parameters preserved.""" - fs_r = complex_fs.resample('4h', method='mean') - pv_flow = fs_r.flows['pv(gen)'] - assert isinstance(pv_flow.size, fx.InvestParameters) - assert pv_flow.size.maximum_size == 1000 - - -# === Modeling Integration === - - -@pytest.mark.filterwarnings('ignore::DeprecationWarning') -@pytest.mark.parametrize('with_dim', [None, 'periods', 'scenarios']) -def test_modeling(with_dim): - """Test resampled FlowSystem can be modeled.""" - ts = pd.date_range('2023-01-01', periods=48, freq='h') - kwargs = {} - if with_dim == 'periods': - kwargs['periods'] = pd.Index([2023, 2024], name='period') - elif with_dim == 'scenarios': - kwargs['scenarios'] = pd.Index(['base', 'high'], name='scenario') - - fs = fx.FlowSystem(ts, **kwargs) - fs.add_elements(fx.Bus('h'), fx.Effect('costs', unit='€', description='costs', is_objective=True, is_standard=True)) - fs.add_elements( - fx.Sink( - label='d', inputs=[fx.Flow(label='in', bus='h', fixed_relative_profile=np.linspace(10, 30, 48), size=1)] - ), - fx.Source(label='s', outputs=[fx.Flow(label='out', bus='h', size=100, effects_per_flow_hour={'costs': 0.05})]), - ) - - fs_r = fs.resample('4h', method='mean') - calc = fx.Optimization('test', fs_r) - calc.do_modeling() - - assert calc.model is not None - assert len(calc.model.variables) > 0 - - -@pytest.mark.filterwarnings('ignore::DeprecationWarning') -def test_model_structure_preserved(): - """Test model structure (var/constraint types) preserved.""" - ts = pd.date_range('2023-01-01', periods=48, freq='h') - fs = fx.FlowSystem(ts) - fs.add_elements(fx.Bus('h'), fx.Effect('costs', unit='€', description='costs', is_objective=True, is_standard=True)) - fs.add_elements( - fx.Sink( - label='d', inputs=[fx.Flow(label='in', bus='h', fixed_relative_profile=np.linspace(10, 30, 48), size=1)] - ), - fx.Source(label='s', outputs=[fx.Flow(label='out', bus='h', size=100, effects_per_flow_hour={'costs': 0.05})]), - ) - - calc_orig = fx.Optimization('orig', fs) - calc_orig.do_modeling() - - fs_r = fs.resample('4h', method='mean') - calc_r = fx.Optimization('resamp', fs_r) - calc_r.do_modeling() - - # Same number of variable/constraint types - assert len(calc_orig.model.variables) == len(calc_r.model.variables) - assert len(calc_orig.model.constraints) == len(calc_r.model.constraints) - - # Same names - assert set(calc_orig.model.variables.labels.data_vars.keys()) == set(calc_r.model.variables.labels.data_vars.keys()) - assert set(calc_orig.model.constraints.labels.data_vars.keys()) == set( - calc_r.model.constraints.labels.data_vars.keys() - ) - - -# === Advanced Features === - - -def test_dataset_roundtrip(simple_fs): - """Test dataset serialization.""" - fs_r = simple_fs.resample('2h', method='mean') - assert fx.FlowSystem.from_dataset(fs_r.to_dataset()) == fs_r - - -def test_dataset_chaining(simple_fs): - """Test power user pattern.""" - ds = simple_fs.to_dataset() - ds = fx.FlowSystem._dataset_sel(ds, time='2023-01-01') - ds = fx.FlowSystem._dataset_resample(ds, freq='2h', method='mean') - fs_result = fx.FlowSystem.from_dataset(ds) - - fs_simple = simple_fs.sel(time='2023-01-01').resample('2h', method='mean') - assert fs_result == fs_simple - - -@pytest.mark.parametrize('freq,exp_len', [('2h', 84), ('6h', 28), ('1D', 7)]) -def test_frequencies(freq, exp_len): - """Test various frequencies.""" - ts = pd.date_range('2023-01-01', periods=168, freq='h') - fs = fx.FlowSystem(ts) - fs.add_elements(fx.Bus('b'), fx.Effect('costs', unit='€', description='costs', is_objective=True, is_standard=True)) - fs.add_elements( - fx.Sink(label='s', inputs=[fx.Flow(label='in', bus='b', fixed_relative_profile=np.ones(168), size=1)]) - ) - - assert len(fs.resample(freq, method='mean').timesteps) == exp_len - - -def test_irregular_timesteps_error(): - """Test that resampling irregular timesteps to finer resolution raises error without fill_gaps.""" - ts = pd.DatetimeIndex(['2023-01-01 00:00', '2023-01-01 01:00', '2023-01-01 03:00'], name='time') - fs = fx.FlowSystem(ts) - fs.add_elements(fx.Bus('b'), fx.Effect('costs', unit='€', description='costs', is_objective=True, is_standard=True)) - fs.add_elements( - fx.Sink(label='s', inputs=[fx.Flow(label='in', bus='b', fixed_relative_profile=np.ones(3), size=1)]) - ) - - with pytest.raises(ValueError, match='Resampling created gaps'): - fs.resample('1h', method='mean') - - -def test_irregular_timesteps_with_fill_gaps(): - """Test that resampling irregular timesteps works with explicit fill_gaps strategy.""" - ts = pd.DatetimeIndex(['2023-01-01 00:00', '2023-01-01 01:00', '2023-01-01 03:00'], name='time') - fs = fx.FlowSystem(ts) - fs.add_elements(fx.Bus('b'), fx.Effect('costs', unit='€', description='costs', is_objective=True, is_standard=True)) - fs.add_elements( - fx.Sink( - label='s', inputs=[fx.Flow(label='in', bus='b', fixed_relative_profile=np.array([1.0, 2.0, 4.0]), size=1)] - ) - ) - - # Test with ffill (using deprecated method) - fs_r = fs.resample('1h', method='mean', fill_gaps='ffill') - assert len(fs_r.timesteps) == 4 - # Gap at 02:00 should be filled with previous value (2.0) - assert_allclose(fs_r.flows['s(in)'].fixed_relative_profile.values, [1.0, 2.0, 2.0, 4.0]) - - -if __name__ == '__main__': - pytest.main(['-v', __file__]) diff --git a/tests/deprecated/test_network_app.py b/tests/deprecated/test_network_app.py index f3f250797..95d8fd2c1 100644 --- a/tests/deprecated/test_network_app.py +++ b/tests/deprecated/test_network_app.py @@ -19,6 +19,6 @@ def flow_system(request): def test_network_app(flow_system): - """Test that flow model constraints are correctly generated.""" - flow_system.start_network_app() - flow_system.stop_network_app() + """Test that network app can be started and stopped.""" + flow_system.topology.start_app() + flow_system.topology.stop_app() diff --git a/tests/deprecated/test_resample_equivalence.py b/tests/deprecated/test_resample_equivalence.py deleted file mode 100644 index 19144b6a1..000000000 --- a/tests/deprecated/test_resample_equivalence.py +++ /dev/null @@ -1,310 +0,0 @@ -""" -Tests to ensure the dimension grouping optimization in _resample_by_dimension_groups -is equivalent to naive Dataset resampling. - -These tests verify that the optimization (grouping variables by dimensions before -resampling) produces identical results to simply calling Dataset.resample() directly. -""" - -import numpy as np -import pandas as pd -import pytest -import xarray as xr - -import flixopt as fx - - -def naive_dataset_resample(dataset: xr.Dataset, freq: str, method: str) -> xr.Dataset: - """ - Naive resampling: simply call Dataset.resample().method() directly. - - This is the straightforward approach without dimension grouping optimization. - """ - return getattr(dataset.resample(time=freq), method)() - - -def create_dataset_with_mixed_dimensions(n_timesteps=48, seed=42): - """ - Create a dataset with variables having different dimension structures. - - This mimics realistic data with: - - Variables with only time dimension - - Variables with time + one other dimension - - Variables with time + multiple dimensions - """ - np.random.seed(seed) - timesteps = pd.date_range('2020-01-01', periods=n_timesteps, freq='h') - - ds = xr.Dataset( - coords={ - 'time': timesteps, - 'component': ['comp1', 'comp2'], - 'bus': ['bus1', 'bus2'], - 'scenario': ['base', 'alt'], - } - ) - - # Variable with only time dimension - ds['total_demand'] = xr.DataArray( - np.random.randn(n_timesteps), - dims=['time'], - coords={'time': ds.time}, - ) - - # Variable with time + component - ds['component_flow'] = xr.DataArray( - np.random.randn(n_timesteps, 2), - dims=['time', 'component'], - coords={'time': ds.time, 'component': ds.component}, - ) - - # Variable with time + bus - ds['bus_balance'] = xr.DataArray( - np.random.randn(n_timesteps, 2), - dims=['time', 'bus'], - coords={'time': ds.time, 'bus': ds.bus}, - ) - - # Variable with time + component + bus - ds['flow_on_bus'] = xr.DataArray( - np.random.randn(n_timesteps, 2, 2), - dims=['time', 'component', 'bus'], - coords={'time': ds.time, 'component': ds.component, 'bus': ds.bus}, - ) - - # Variable with time + scenario - ds['scenario_demand'] = xr.DataArray( - np.random.randn(n_timesteps, 2), - dims=['time', 'scenario'], - coords={'time': ds.time, 'scenario': ds.scenario}, - ) - - # Variable with time + component + scenario - ds['component_scenario_flow'] = xr.DataArray( - np.random.randn(n_timesteps, 2, 2), - dims=['time', 'component', 'scenario'], - coords={'time': ds.time, 'component': ds.component, 'scenario': ds.scenario}, - ) - - return ds - - -@pytest.mark.parametrize('method', ['mean', 'sum', 'max', 'min', 'first', 'last']) -@pytest.mark.parametrize('freq', ['2h', '4h', '1D']) -def test_resample_equivalence_mixed_dimensions(method, freq): - """ - Test that _resample_by_dimension_groups produces same results as naive resampling. - - Uses a dataset with variables having different dimension structures. - """ - ds = create_dataset_with_mixed_dimensions(n_timesteps=100) - - # Method 1: Optimized approach (with dimension grouping) - result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, freq, method) - - # Method 2: Naive approach (direct Dataset resampling) - result_naive = naive_dataset_resample(ds, freq, method) - - # Compare results - xr.testing.assert_allclose(result_optimized, result_naive) - - -@pytest.mark.parametrize('method', ['mean', 'sum', 'max', 'min', 'first', 'last', 'std', 'var', 'median']) -def test_resample_equivalence_single_dimension(method): - """ - Test with variables having only time dimension. - """ - timesteps = pd.date_range('2020-01-01', periods=48, freq='h') - - ds = xr.Dataset(coords={'time': timesteps}) - ds['var1'] = xr.DataArray(np.random.randn(48), dims=['time'], coords={'time': ds.time}) - ds['var2'] = xr.DataArray(np.random.randn(48) * 10, dims=['time'], coords={'time': ds.time}) - ds['var3'] = xr.DataArray(np.random.randn(48) / 5, dims=['time'], coords={'time': ds.time}) - - # Optimized approach - result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, '2h', method) - - # Naive approach - result_naive = naive_dataset_resample(ds, '2h', method) - - # Compare results - xr.testing.assert_allclose(result_optimized, result_naive) - - -def test_resample_equivalence_empty_dataset(): - """ - Test with an empty dataset (edge case). - """ - timesteps = pd.date_range('2020-01-01', periods=48, freq='h') - ds = xr.Dataset(coords={'time': timesteps}) - - # Both should handle empty dataset gracefully - result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, '2h', 'mean') - result_naive = naive_dataset_resample(ds, '2h', 'mean') - - xr.testing.assert_allclose(result_optimized, result_naive) - - -def test_resample_equivalence_single_variable(): - """ - Test with a single variable. - """ - timesteps = pd.date_range('2020-01-01', periods=48, freq='h') - ds = xr.Dataset(coords={'time': timesteps}) - ds['single_var'] = xr.DataArray(np.random.randn(48), dims=['time'], coords={'time': ds.time}) - - # Test multiple methods - for method in ['mean', 'sum', 'max', 'min']: - result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, '3h', method) - result_naive = naive_dataset_resample(ds, '3h', method) - - xr.testing.assert_allclose(result_optimized, result_naive) - - -def test_resample_equivalence_with_nans(): - """ - Test with NaN values to ensure they're handled consistently. - """ - timesteps = pd.date_range('2020-01-01', periods=48, freq='h') - - ds = xr.Dataset(coords={'time': timesteps, 'component': ['a', 'b']}) - - # Create variable with some NaN values - data = np.random.randn(48, 2) - data[5:10, 0] = np.nan - data[20:25, 1] = np.nan - - ds['var_with_nans'] = xr.DataArray( - data, dims=['time', 'component'], coords={'time': ds.time, 'component': ds.component} - ) - - # Test with methods that handle NaNs - for method in ['mean', 'sum', 'max', 'min', 'first', 'last']: - result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, '2h', method) - result_naive = naive_dataset_resample(ds, '2h', method) - - xr.testing.assert_allclose(result_optimized, result_naive) - - -def test_resample_equivalence_different_dimension_orders(): - """ - Test that dimension order doesn't affect the equivalence. - """ - timesteps = pd.date_range('2020-01-01', periods=48, freq='h') - - ds = xr.Dataset( - coords={ - 'time': timesteps, - 'x': ['x1', 'x2'], - 'y': ['y1', 'y2'], - } - ) - - # Variable with time first - ds['var_time_first'] = xr.DataArray( - np.random.randn(48, 2, 2), - dims=['time', 'x', 'y'], - coords={'time': ds.time, 'x': ds.x, 'y': ds.y}, - ) - - # Variable with time in middle - ds['var_time_middle'] = xr.DataArray( - np.random.randn(2, 48, 2), - dims=['x', 'time', 'y'], - coords={'x': ds.x, 'time': ds.time, 'y': ds.y}, - ) - - # Variable with time last - ds['var_time_last'] = xr.DataArray( - np.random.randn(2, 2, 48), - dims=['x', 'y', 'time'], - coords={'x': ds.x, 'y': ds.y, 'time': ds.time}, - ) - - for method in ['mean', 'sum', 'max', 'min']: - result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, '2h', method) - result_naive = naive_dataset_resample(ds, '2h', method) - - xr.testing.assert_allclose(result_optimized, result_naive) - - -def test_resample_equivalence_multiple_variables_same_dims(): - """ - Test with multiple variables sharing the same dimensions. - - This is the key optimization case - variables with same dims should be - grouped and resampled together. - """ - timesteps = pd.date_range('2020-01-01', periods=48, freq='h') - - ds = xr.Dataset(coords={'time': timesteps, 'location': ['A', 'B', 'C']}) - - # Multiple variables with same dimensions (time, location) - for i in range(3): - ds[f'var_{i}'] = xr.DataArray( - np.random.randn(48, 3), - dims=['time', 'location'], - coords={'time': ds.time, 'location': ds.location}, - ) - - for method in ['mean', 'sum', 'max', 'min']: - result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, '2h', method) - result_naive = naive_dataset_resample(ds, '2h', method) - - xr.testing.assert_allclose(result_optimized, result_naive) - - -def test_resample_equivalence_large_dataset(): - """ - Test with a larger, more realistic dataset. - """ - timesteps = pd.date_range('2020-01-01', periods=168, freq='h') # One week - - ds = xr.Dataset( - coords={ - 'time': timesteps, - 'component': [f'comp_{i}' for i in range(5)], - 'bus': [f'bus_{i}' for i in range(3)], - } - ) - - # Various variable types - ds['simple_var'] = xr.DataArray(np.random.randn(168), dims=['time'], coords={'time': ds.time}) - ds['component_var'] = xr.DataArray( - np.random.randn(168, 5), dims=['time', 'component'], coords={'time': ds.time, 'component': ds.component} - ) - ds['bus_var'] = xr.DataArray(np.random.randn(168, 3), dims=['time', 'bus'], coords={'time': ds.time, 'bus': ds.bus}) - ds['complex_var'] = xr.DataArray( - np.random.randn(168, 5, 3), - dims=['time', 'component', 'bus'], - coords={'time': ds.time, 'component': ds.component, 'bus': ds.bus}, - ) - - # Test with a subset of methods (to keep test time reasonable) - for method in ['mean', 'sum', 'first']: - result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, '1D', method) - result_naive = naive_dataset_resample(ds, '1D', method) - - xr.testing.assert_allclose(result_optimized, result_naive) - - -def test_resample_equivalence_with_kwargs(): - """ - Test that kwargs are properly forwarded to resample(). - - Verifies that additional arguments like label and closed are correctly - passed through the optimization path. - """ - timesteps = pd.date_range('2020-01-01', periods=48, freq='h') - ds = xr.Dataset(coords={'time': timesteps}) - ds['var'] = xr.DataArray(np.random.randn(48), dims=['time'], coords={'time': ds.time}) - - kwargs = {'label': 'right', 'closed': 'right'} - result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, '2h', 'mean', **kwargs) - result_naive = ds.resample(time='2h', **kwargs).mean() - - xr.testing.assert_allclose(result_optimized, result_naive) - - -if __name__ == '__main__': - pytest.main(['-v', __file__]) diff --git a/tests/deprecated/test_results_io.py b/tests/deprecated/test_results_io.py index a42ca542b..ff611abf2 100644 --- a/tests/deprecated/test_results_io.py +++ b/tests/deprecated/test_results_io.py @@ -50,7 +50,7 @@ def test_flow_system_file_io(flow_system, highs_solver, request): calculation_0 = fx.Optimization(f'IO-{test_id}', flow_system=flow_system) calculation_0.do_modeling() calculation_0.solve(highs_solver) - calculation_0.flow_system.plot_network() + calculation_0.flow_system.topology.plot() calculation_0.results.to_file() paths = ResultsPaths(calculation_0.folder, calculation_0.name) @@ -59,7 +59,7 @@ def test_flow_system_file_io(flow_system, highs_solver, request): calculation_1 = fx.Optimization(f'Loaded_IO-{test_id}', flow_system=flow_system_1) calculation_1.do_modeling() calculation_1.solve(highs_solver) - calculation_1.flow_system.plot_network() + calculation_1.flow_system.topology.plot() assert_almost_equal_numeric( calculation_0.results.model.objective.value, diff --git a/tests/deprecated/test_scenarios.py b/tests/deprecated/test_scenarios.py index 2699647ad..dd46d3c80 100644 --- a/tests/deprecated/test_scenarios.py +++ b/tests/deprecated/test_scenarios.py @@ -337,7 +337,7 @@ def test_scenarios_selection(flow_system_piecewise_conversion_scenarios): scenarios = flow_system_full.scenarios scenario_weights = np.linspace(0.5, 1, len(scenarios)) / np.sum(np.linspace(0.5, 1, len(scenarios))) flow_system_full.scenario_weights = scenario_weights - flow_system = flow_system_full.sel(scenario=scenarios[0:2]) + flow_system = flow_system_full.transform.sel(scenario=scenarios[0:2]) assert flow_system.scenarios.equals(flow_system_full.scenarios[0:2]) @@ -740,7 +740,7 @@ def test_weights_io_persistence(): def test_weights_selection(): - """Test that weights are correctly sliced when using FlowSystem.sel().""" + """Test that weights are correctly sliced when using flow_system.transform.sel().""" timesteps = pd.date_range('2023-01-01', periods=24, freq='h') scenarios = pd.Index(['base', 'mid', 'high'], name='scenario') custom_scenario_weights = np.array([0.3, 0.5, 0.2]) @@ -767,7 +767,7 @@ def test_weights_selection(): fs_full.add_elements(bus, source, fx.Effect('cost', 'Total cost', '€', is_objective=True)) # Select a subset of scenarios - fs_subset = fs_full.sel(scenario=['base', 'high']) + fs_subset = fs_full.transform.sel(scenario=['base', 'high']) # Verify weights are correctly sliced assert fs_subset.scenarios.equals(pd.Index(['base', 'high'], name='scenario')) diff --git a/tests/test_flow_system_resample.py b/tests/test_flow_system_resample.py index dd5e19176..7f5e1b26e 100644 --- a/tests/test_flow_system_resample.py +++ b/tests/test_flow_system_resample.py @@ -1,4 +1,4 @@ -"""Integration tests for FlowSystem.resample() - verifies correct data resampling and structure preservation.""" +"""Integration tests for flow_system.transform.resample() - verifies correct data resampling and structure preservation.""" import numpy as np import pandas as pd @@ -80,7 +80,7 @@ def complex_fs(): @pytest.mark.parametrize('freq,method', [('2h', 'mean'), ('4h', 'sum'), ('6h', 'first')]) def test_basic_resample(simple_fs, freq, method): """Test basic resampling preserves structure.""" - fs_r = simple_fs.resample(freq, method=method) + fs_r = simple_fs.transform.resample(freq, method=method) assert len(fs_r.components) == len(simple_fs.components) assert len(fs_r.buses) == len(simple_fs.buses) assert len(fs_r.timesteps) < len(simple_fs.timesteps) @@ -107,13 +107,13 @@ def test_resample_methods(method, expected): ) ) - fs_r = fs.resample('2h', method=method) + fs_r = fs.transform.resample('2h', method=method) assert_allclose(fs_r.flows['s(in)'].fixed_relative_profile.values, expected, rtol=1e-10) def test_structure_preserved(simple_fs): """Test all structural elements preserved.""" - fs_r = simple_fs.resample('2h', method='mean') + fs_r = simple_fs.transform.resample('2h', method='mean') assert set(simple_fs.components.keys()) == set(fs_r.components.keys()) assert set(simple_fs.buses.keys()) == set(fs_r.buses.keys()) assert set(simple_fs.effects.keys()) == set(fs_r.effects.keys()) @@ -126,7 +126,7 @@ def test_structure_preserved(simple_fs): def test_time_metadata_updated(simple_fs): """Test time metadata correctly updated.""" - fs_r = simple_fs.resample('3h', method='mean') + fs_r = simple_fs.transform.resample('3h', method='mean') assert len(fs_r.timesteps) == 8 assert_allclose(fs_r.timestep_duration.values, 3.0) assert fs_r.hours_of_last_timestep == 3.0 @@ -150,7 +150,7 @@ def test_with_dimensions(simple_fs, dim_name, dim_value): fx.Sink(label='d', inputs=[fx.Flow(label='in', bus='h', fixed_relative_profile=np.ones(24), size=1)]) ) - fs_r = fs.resample('2h', method='mean') + fs_r = fs.transform.resample('2h', method='mean') assert getattr(fs_r, dim_name) is not None pd.testing.assert_index_equal(getattr(fs_r, dim_name), dim_value) @@ -160,7 +160,7 @@ def test_with_dimensions(simple_fs, dim_name, dim_value): def test_storage_resample(complex_fs): """Test storage component resampling.""" - fs_r = complex_fs.resample('4h', method='mean') + fs_r = complex_fs.transform.resample('4h', method='mean') assert 'battery' in fs_r.components storage = fs_r.components['battery'] assert storage.charging.label == 'charge' @@ -169,7 +169,7 @@ def test_storage_resample(complex_fs): def test_converter_resample(complex_fs): """Test converter component resampling.""" - fs_r = complex_fs.resample('4h', method='mean') + fs_r = complex_fs.transform.resample('4h', method='mean') assert 'boiler' in fs_r.components boiler = fs_r.components['boiler'] assert hasattr(boiler, 'thermal_efficiency') @@ -177,7 +177,7 @@ def test_converter_resample(complex_fs): def test_invest_resample(complex_fs): """Test investment parameters preserved.""" - fs_r = complex_fs.resample('4h', method='mean') + fs_r = complex_fs.transform.resample('4h', method='mean') pv_flow = fs_r.flows['pv(gen)'] assert isinstance(pv_flow.size, fx.InvestParameters) assert pv_flow.size.maximum_size == 1000 @@ -205,7 +205,7 @@ def test_modeling(with_dim): fx.Source(label='s', outputs=[fx.Flow(label='out', bus='h', size=100, effects_per_flow_hour={'costs': 0.05})]), ) - fs_r = fs.resample('4h', method='mean') + fs_r = fs.transform.resample('4h', method='mean') fs_r.build_model() assert fs_r.model is not None @@ -226,7 +226,7 @@ def test_model_structure_preserved(): fs.build_model() - fs_r = fs.resample('4h', method='mean') + fs_r = fs.transform.resample('4h', method='mean') fs_r.build_model() # Same number of variable/constraint types @@ -243,18 +243,20 @@ def test_model_structure_preserved(): def test_dataset_roundtrip(simple_fs): """Test dataset serialization.""" - fs_r = simple_fs.resample('2h', method='mean') + fs_r = simple_fs.transform.resample('2h', method='mean') assert fx.FlowSystem.from_dataset(fs_r.to_dataset()) == fs_r def test_dataset_chaining(simple_fs): - """Test power user pattern.""" + """Test power user pattern with TransformAccessor class methods.""" + from flixopt.transform_accessor import TransformAccessor + ds = simple_fs.to_dataset() - ds = fx.FlowSystem._dataset_sel(ds, time='2023-01-01') - ds = fx.FlowSystem._dataset_resample(ds, freq='2h', method='mean') + ds = TransformAccessor._dataset_sel(ds, time='2023-01-01') + ds = TransformAccessor._dataset_resample(ds, freq='2h', method='mean') fs_result = fx.FlowSystem.from_dataset(ds) - fs_simple = simple_fs.sel(time='2023-01-01').resample('2h', method='mean') + fs_simple = simple_fs.transform.sel(time='2023-01-01').transform.resample('2h', method='mean') assert fs_result == fs_simple @@ -268,7 +270,7 @@ def test_frequencies(freq, exp_len): fx.Sink(label='s', inputs=[fx.Flow(label='in', bus='b', fixed_relative_profile=np.ones(168), size=1)]) ) - assert len(fs.resample(freq, method='mean').timesteps) == exp_len + assert len(fs.transform.resample(freq, method='mean').timesteps) == exp_len def test_irregular_timesteps_error(): diff --git a/tests/test_network_app.py b/tests/test_network_app.py index f3f250797..95d8fd2c1 100644 --- a/tests/test_network_app.py +++ b/tests/test_network_app.py @@ -19,6 +19,6 @@ def flow_system(request): def test_network_app(flow_system): - """Test that flow model constraints are correctly generated.""" - flow_system.start_network_app() - flow_system.stop_network_app() + """Test that network app can be started and stopped.""" + flow_system.topology.start_app() + flow_system.topology.stop_app() diff --git a/tests/test_resample_equivalence.py b/tests/test_resample_equivalence.py index 19144b6a1..7520b5407 100644 --- a/tests/test_resample_equivalence.py +++ b/tests/test_resample_equivalence.py @@ -11,7 +11,7 @@ import pytest import xarray as xr -import flixopt as fx +from flixopt.transform_accessor import TransformAccessor def naive_dataset_resample(dataset: xr.Dataset, freq: str, method: str) -> xr.Dataset: @@ -100,7 +100,7 @@ def test_resample_equivalence_mixed_dimensions(method, freq): ds = create_dataset_with_mixed_dimensions(n_timesteps=100) # Method 1: Optimized approach (with dimension grouping) - result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, freq, method) + result_optimized = TransformAccessor._resample_by_dimension_groups(ds, freq, method) # Method 2: Naive approach (direct Dataset resampling) result_naive = naive_dataset_resample(ds, freq, method) @@ -122,7 +122,7 @@ def test_resample_equivalence_single_dimension(method): ds['var3'] = xr.DataArray(np.random.randn(48) / 5, dims=['time'], coords={'time': ds.time}) # Optimized approach - result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, '2h', method) + result_optimized = TransformAccessor._resample_by_dimension_groups(ds, '2h', method) # Naive approach result_naive = naive_dataset_resample(ds, '2h', method) @@ -139,7 +139,7 @@ def test_resample_equivalence_empty_dataset(): ds = xr.Dataset(coords={'time': timesteps}) # Both should handle empty dataset gracefully - result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, '2h', 'mean') + result_optimized = TransformAccessor._resample_by_dimension_groups(ds, '2h', 'mean') result_naive = naive_dataset_resample(ds, '2h', 'mean') xr.testing.assert_allclose(result_optimized, result_naive) @@ -155,7 +155,7 @@ def test_resample_equivalence_single_variable(): # Test multiple methods for method in ['mean', 'sum', 'max', 'min']: - result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, '3h', method) + result_optimized = TransformAccessor._resample_by_dimension_groups(ds, '3h', method) result_naive = naive_dataset_resample(ds, '3h', method) xr.testing.assert_allclose(result_optimized, result_naive) @@ -180,7 +180,7 @@ def test_resample_equivalence_with_nans(): # Test with methods that handle NaNs for method in ['mean', 'sum', 'max', 'min', 'first', 'last']: - result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, '2h', method) + result_optimized = TransformAccessor._resample_by_dimension_groups(ds, '2h', method) result_naive = naive_dataset_resample(ds, '2h', method) xr.testing.assert_allclose(result_optimized, result_naive) @@ -222,7 +222,7 @@ def test_resample_equivalence_different_dimension_orders(): ) for method in ['mean', 'sum', 'max', 'min']: - result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, '2h', method) + result_optimized = TransformAccessor._resample_by_dimension_groups(ds, '2h', method) result_naive = naive_dataset_resample(ds, '2h', method) xr.testing.assert_allclose(result_optimized, result_naive) @@ -248,7 +248,7 @@ def test_resample_equivalence_multiple_variables_same_dims(): ) for method in ['mean', 'sum', 'max', 'min']: - result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, '2h', method) + result_optimized = TransformAccessor._resample_by_dimension_groups(ds, '2h', method) result_naive = naive_dataset_resample(ds, '2h', method) xr.testing.assert_allclose(result_optimized, result_naive) @@ -282,7 +282,7 @@ def test_resample_equivalence_large_dataset(): # Test with a subset of methods (to keep test time reasonable) for method in ['mean', 'sum', 'first']: - result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, '1D', method) + result_optimized = TransformAccessor._resample_by_dimension_groups(ds, '1D', method) result_naive = naive_dataset_resample(ds, '1D', method) xr.testing.assert_allclose(result_optimized, result_naive) @@ -300,7 +300,7 @@ def test_resample_equivalence_with_kwargs(): ds['var'] = xr.DataArray(np.random.randn(48), dims=['time'], coords={'time': ds.time}) kwargs = {'label': 'right', 'closed': 'right'} - result_optimized = fx.FlowSystem._resample_by_dimension_groups(ds, '2h', 'mean', **kwargs) + result_optimized = TransformAccessor._resample_by_dimension_groups(ds, '2h', 'mean', **kwargs) result_naive = ds.resample(time='2h', **kwargs).mean() xr.testing.assert_allclose(result_optimized, result_naive) diff --git a/tests/test_scenarios.py b/tests/test_scenarios.py index 2699647ad..dd46d3c80 100644 --- a/tests/test_scenarios.py +++ b/tests/test_scenarios.py @@ -337,7 +337,7 @@ def test_scenarios_selection(flow_system_piecewise_conversion_scenarios): scenarios = flow_system_full.scenarios scenario_weights = np.linspace(0.5, 1, len(scenarios)) / np.sum(np.linspace(0.5, 1, len(scenarios))) flow_system_full.scenario_weights = scenario_weights - flow_system = flow_system_full.sel(scenario=scenarios[0:2]) + flow_system = flow_system_full.transform.sel(scenario=scenarios[0:2]) assert flow_system.scenarios.equals(flow_system_full.scenarios[0:2]) @@ -740,7 +740,7 @@ def test_weights_io_persistence(): def test_weights_selection(): - """Test that weights are correctly sliced when using FlowSystem.sel().""" + """Test that weights are correctly sliced when using flow_system.transform.sel().""" timesteps = pd.date_range('2023-01-01', periods=24, freq='h') scenarios = pd.Index(['base', 'mid', 'high'], name='scenario') custom_scenario_weights = np.array([0.3, 0.5, 0.2]) @@ -767,7 +767,7 @@ def test_weights_selection(): fs_full.add_elements(bus, source, fx.Effect('cost', 'Total cost', '€', is_objective=True)) # Select a subset of scenarios - fs_subset = fs_full.sel(scenario=['base', 'high']) + fs_subset = fs_full.transform.sel(scenario=['base', 'high']) # Verify weights are correctly sliced assert fs_subset.scenarios.equals(pd.Index(['base', 'high'], name='scenario')) diff --git a/tests/test_solution_persistence.py b/tests/test_solution_persistence.py index f825f64a8..e38786ff2 100644 --- a/tests/test_solution_persistence.py +++ b/tests/test_solution_persistence.py @@ -382,13 +382,6 @@ def test_build_model_creates_model(self, simple_flow_system): # Model should have variables assert len(simple_flow_system.model.variables) > 0 - def test_build_model_with_normalize_weights_false(self, simple_flow_system): - """build_model() should respect normalize_weights parameter.""" - simple_flow_system.build_model(normalize_weights=False) - - # Model should be created - assert simple_flow_system.model is not None - def test_solve_without_build_model_raises(self, simple_flow_system, highs_solver): """solve() should raise if model not built.""" with pytest.raises(RuntimeError, match='Model has not been built'): @@ -437,12 +430,6 @@ def test_optimize_method_chaining(self, simple_flow_system, highs_solver): assert isinstance(solution, xr.Dataset) assert len(solution.data_vars) > 0 - def test_optimize_with_normalize_weights_false(self, simple_flow_system, highs_solver): - """optimize() should respect normalize_weights parameter.""" - simple_flow_system.optimize(highs_solver, normalize_weights=False) - - assert simple_flow_system.solution is not None - def test_model_accessible_after_build(self, simple_flow_system): """Model should be inspectable after build_model().""" simple_flow_system.build_model() diff --git a/tests/test_topology_accessor.py b/tests/test_topology_accessor.py index 09f789b2b..3a1fc2a3d 100644 --- a/tests/test_topology_accessor.py +++ b/tests/test_topology_accessor.py @@ -152,32 +152,3 @@ def test_plot_legacy_with_controls_list(self, flow_system): # Should not raise flow_system.topology.plot_legacy(path=False, controls=['nodes', 'layout'], show=False) - - -class TestDeprecatedMethods: - """Tests for deprecated FlowSystem methods that delegate to topology.""" - - def test_network_infos_deprecation_warning(self, flow_system): - """Test that network_infos() raises a DeprecationWarning.""" - with pytest.warns(DeprecationWarning, match='topology.infos'): - flow_system.network_infos() - - def test_plot_network_deprecation_warning(self, flow_system): - """Test that plot_network() raises a DeprecationWarning.""" - with pytest.warns(DeprecationWarning, match='topology.plot'): - flow_system.plot_network(path=False, show=False) - - def test_deprecated_methods_return_same_results(self, flow_system): - """Test that deprecated methods return the same results as topology accessor.""" - import warnings - - # Get results from new API - new_nodes, new_edges = flow_system.topology.infos() - - # Get results from deprecated API (suppress warning) - with warnings.catch_warnings(): - warnings.simplefilter('ignore', DeprecationWarning) - old_nodes, old_edges = flow_system.network_infos() - - assert new_nodes == old_nodes - assert new_edges == old_edges