diff --git a/flixopt/components.py b/flixopt/components.py index e25eaf757..9dd837ace 100644 --- a/flixopt/components.py +++ b/flixopt/components.py @@ -917,25 +917,59 @@ def add_effect_contributions(self, effects_model) -> None: if inv.effects_per_size is not None: factors = inv.effects_per_size size = self.size.sel({dim: factors.coords[dim].values}) - effects_model.add_periodic_contribution(size * factors, contributor_dim=dim) + for eid in factors.coords['effect'].values: + f_single = factors.sel(effect=eid, drop=True) + if (f_single == 0).all(): + continue + effects_model.add_periodic_contribution(size * f_single, contributor_dim=dim, effect=str(eid)) # Investment/retirement effects invested = self.invested if invested is not None: - if (f := inv.effects_of_investment) is not None: - effects_model.add_periodic_contribution( - invested.sel({dim: f.coords[dim].values}) * f, contributor_dim=dim - ) - if (f := inv.effects_of_retirement) is not None: - effects_model.add_periodic_contribution( - invested.sel({dim: f.coords[dim].values}) * (-f), contributor_dim=dim - ) + if (ff := inv.effects_of_investment) is not None: + for eid in ff.coords['effect'].values: + f_single = ff.sel(effect=eid, drop=True) + if (f_single == 0).all(): + continue + effects_model.add_periodic_contribution( + invested.sel({dim: f_single.coords[dim].values}) * f_single, + contributor_dim=dim, + effect=str(eid), + ) + if (ff := inv.effects_of_retirement) is not None: + for eid in ff.coords['effect'].values: + f_single = ff.sel(effect=eid, drop=True) + if (f_single == 0).all(): + continue + effects_model.add_periodic_contribution( + invested.sel({dim: f_single.coords[dim].values}) * (-f_single), + contributor_dim=dim, + effect=str(eid), + ) # === Constants: mandatory fixed + retirement === if inv.effects_of_investment_mandatory is not None: - effects_model.add_periodic_contribution(inv.effects_of_investment_mandatory, contributor_dim=dim) + mandatory = inv.effects_of_investment_mandatory + if 'effect' in mandatory.dims: + for eid in mandatory.coords['effect'].values: + effects_model.add_periodic_contribution( + mandatory.sel(effect=eid, drop=True), + contributor_dim=dim, + effect=str(eid), + ) + else: + effects_model.add_periodic_contribution(mandatory, contributor_dim=dim) if inv.effects_of_retirement_constant is not None: - effects_model.add_periodic_contribution(inv.effects_of_retirement_constant, contributor_dim=dim) + ret_const = inv.effects_of_retirement_constant + if 'effect' in ret_const.dims: + for eid in ret_const.coords['effect'].values: + effects_model.add_periodic_contribution( + ret_const.sel(effect=eid, drop=True), + contributor_dim=dim, + effect=str(eid), + ) + else: + effects_model.add_periodic_contribution(ret_const, contributor_dim=dim) # --- Investment Cached Properties --- diff --git a/flixopt/effects.py b/flixopt/effects.py index b9f446b4e..87ed65776 100644 --- a/flixopt/effects.py +++ b/flixopt/effects.py @@ -345,9 +345,9 @@ def __init__(self, model: FlowSystemModel, data): self.share_periodic: linopy.Variable | None = None # Registered contributions from type models (FlowsModel, StoragesModel, etc.) - # Each entry: a defining_expr with 'contributor' dim - self._temporal_share_defs: list[linopy.LinearExpression] = [] - self._periodic_share_defs: list[linopy.LinearExpression] = [] + # Per-effect, per-contributor accumulation: effect_id -> {contributor_id -> expr (no effect dim)} + self._temporal_shares: dict[str, dict[str, linopy.LinearExpression]] = {} + self._periodic_shares: dict[str, dict[str, linopy.LinearExpression]] = {} # Constant (xr.DataArray) contributions with 'contributor' + 'effect' dims self._temporal_constant_defs: list[xr.DataArray] = [] self._periodic_constant_defs: list[xr.DataArray] = [] @@ -361,35 +361,76 @@ def effect_index(self): """Public access to the effect index for type models.""" return self.data.effect_index - def add_temporal_contribution(self, defining_expr, contributor_dim: str = 'contributor') -> None: + def add_temporal_contribution( + self, + defining_expr, + contributor_dim: str = 'contributor', + effect: str | None = None, + ) -> None: """Register contributors for the share|temporal variable. Args: - defining_expr: Expression with a contributor dimension. - Accepts linopy LinearExpression/Variable or plain xr.DataArray (constants). + defining_expr: Expression with a contributor dimension (no effect dim if effect is given). contributor_dim: Name of the element dimension to rename to 'contributor'. + effect: If provided, the expression is for this specific effect (no effect dim needed). """ if contributor_dim != 'contributor': defining_expr = defining_expr.rename({contributor_dim: 'contributor'}) if isinstance(defining_expr, xr.DataArray): + if effect is not None: + defining_expr = defining_expr.expand_dims(effect=[effect]) + elif 'effect' not in defining_expr.dims: + raise ValueError( + "DataArray contribution must have an 'effect' dimension or an explicit effect= argument." + ) self._temporal_constant_defs.append(defining_expr) else: - self._temporal_share_defs.append(defining_expr) + self._accumulate_shares(self._temporal_shares, self._as_expression(defining_expr), effect) - def add_periodic_contribution(self, defining_expr, contributor_dim: str = 'contributor') -> None: + def add_periodic_contribution( + self, + defining_expr, + contributor_dim: str = 'contributor', + effect: str | None = None, + ) -> None: """Register contributors for the share|periodic variable. Args: - defining_expr: Expression with a contributor dimension. - Accepts linopy LinearExpression/Variable or plain xr.DataArray (constants). + defining_expr: Expression with a contributor dimension (no effect dim if effect is given). contributor_dim: Name of the element dimension to rename to 'contributor'. + effect: If provided, the expression is for this specific effect (no effect dim needed). """ if contributor_dim != 'contributor': defining_expr = defining_expr.rename({contributor_dim: 'contributor'}) if isinstance(defining_expr, xr.DataArray): + if effect is not None: + defining_expr = defining_expr.expand_dims(effect=[effect]) + elif 'effect' not in defining_expr.dims: + raise ValueError( + "DataArray contribution must have an 'effect' dimension or an explicit effect= argument." + ) self._periodic_constant_defs.append(defining_expr) else: - self._periodic_share_defs.append(defining_expr) + self._accumulate_shares(self._periodic_shares, self._as_expression(defining_expr), effect) + + @staticmethod + def _accumulate_shares( + accum: dict[str, list], + expr: linopy.LinearExpression, + effect: str | None = None, + ) -> None: + """Append expression to per-effect list.""" + # accum structure: {effect_id: [(expr, contributor_ids), ...]} + if effect is not None: + # Expression has no effect dim — tagged with specific effect + accum.setdefault(effect, []).append(expr) + elif 'effect' in expr.dims: + # Expression has effect dim — split per effect (DataArray sel is cheap) + for eid in expr.data.coords['effect'].values: + eid_str = str(eid) + accum.setdefault(eid_str, []).append(expr.sel(effect=eid, drop=True)) + else: + raise ValueError('Expression must have effect dim or effect parameter must be given') def create_variables(self) -> None: """Create batched effect variables with 'effect' dimension.""" @@ -542,19 +583,19 @@ def finalize_shares(self) -> None: if (sm := self.model._storages_model) is not None: sm.add_effect_contributions(self) - # === Create share|temporal variable === - if self._temporal_share_defs: - self.share_temporal = self._create_share_var(self._temporal_share_defs, 'share|temporal', temporal=True) + # === Create share|temporal variable (one combined with contributor × effect dims) === + if self._temporal_shares: + self.share_temporal = self._create_share_var(self._temporal_shares, 'share|temporal', temporal=True) self._eq_per_timestep.lhs -= self.share_temporal.sum('contributor') # === Apply temporal constants directly === for const in self._temporal_constant_defs: self._eq_per_timestep.lhs -= const.sum('contributor').reindex({'effect': self.data.effect_index}) - # === Create share|periodic variable === - if self._periodic_share_defs: - self.share_periodic = self._create_share_var(self._periodic_share_defs, 'share|periodic', temporal=False) - self._eq_periodic.lhs -= self.share_periodic.sum('contributor').reindex({'effect': self.data.effect_index}) + # === Create share|periodic variable (one combined with contributor × effect dims) === + if self._periodic_shares: + self.share_periodic = self._create_share_var(self._periodic_shares, 'share|periodic', temporal=False) + self._eq_periodic.lhs -= self.share_periodic.sum('contributor') # === Apply periodic constants directly === for const in self._periodic_constant_defs: @@ -573,39 +614,67 @@ def _share_coords(self, element_dim: str, element_index, temporal: bool = True) def _create_share_var( self, - share_defs: list[linopy.LinearExpression], + accum: dict[str, list[linopy.LinearExpression]], name: str, temporal: bool, ) -> linopy.Variable: - """Create a share variable from registered contributor definitions. + """Create one share variable with (contributor, effect, ...) dims. + + accum structure: {effect_id: [expr1, expr2, ...]} where each expr has + (contributor, ...other_dims) dims — no effect dim. + + Constraints are added per-effect: var.sel(effect=eid) == merged_for_eid, + which avoids cross-effect alignment. - Aligns all contributor expressions (outer join on contributor dimension), - then sums them to produce a single expression with the full contributor dimension. + Returns: + linopy.Variable with dims (contributor, effect, time/period). """ import pandas as pd - # Ensure all share defs have canonical effect order before alignment. - # linopy merge uses join="override" when shapes match, which aligns by - # position not label — mismatched effect order silently shuffles coefficients. + if not accum: + return None + + # Collect all contributor IDs across all effects + all_contributor_ids: set[str] = set() + for expr_list in accum.values(): + for expr in expr_list: + all_contributor_ids.update(str(c) for c in expr.data.coords['contributor'].values) + + contributor_index = pd.Index(sorted(all_contributor_ids), name='contributor') effect_index = self.data.effect_index - normalized = [] - for expr in share_defs: - if 'effect' in expr.dims: - expr_effects = list(expr.data.coords['effect'].values) - if expr_effects != list(effect_index): - expr = linopy.LinearExpression(expr.data.reindex(effect=effect_index), expr.model) - normalized.append(expr) - - aligned = linopy.align(*normalized, join='outer', fill_value=0) - combined_expr = sum(aligned[1:], start=aligned[0]) - - # Extract contributor IDs from the combined expression - all_ids = [str(cid) for cid in combined_expr.data.coords['contributor'].values] - contributor_index = pd.Index(all_ids, name='contributor') coords = self._share_coords('contributor', contributor_index, temporal=temporal) - var = self.model.add_variables(lower=-np.inf, upper=np.inf, coords=coords, name=name) - self.model.add_constraints(var == combined_expr, name=name) + # Build mask: only create variables for (effect, contributor) combos that have expressions + mask = xr.DataArray( + np.zeros((len(contributor_index), len(effect_index)), dtype=bool), + dims=['contributor', 'effect'], + coords={'contributor': contributor_index, 'effect': effect_index}, + ) + covered_map: dict[str, list[str]] = {} + for eid, expr_list in accum.items(): + cids = set() + for expr in expr_list: + cids.update(str(c) for c in expr.data.coords['contributor'].values) + covered_map[eid] = sorted(cids) + mask.loc[dict(effect=eid, contributor=covered_map[eid])] = True + + var = self.model.add_variables(lower=-np.inf, upper=np.inf, coords=coords, name=name, mask=mask) + + # Add per-effect constraints (only for covered combos) + for eid, expr_list in accum.items(): + contributors = covered_map[eid] + if len(expr_list) == 1: + merged = expr_list[0].reindex(contributor=contributors) + else: + # Reindex all to common contributor set, then sum via linopy.merge (_term addition) + aligned = [e.reindex(contributor=contributors) for e in expr_list] + merged = aligned[0] + for a in aligned[1:]: + merged = merged + a + var_slice = var.sel(effect=eid, contributor=contributors) + self.model.add_constraints(var_slice == merged, name=f'{name}({eid})') + + accum.clear() return var def get_periodic(self, effect_id: str) -> linopy.Variable: diff --git a/flixopt/elements.py b/flixopt/elements.py index cbdd23be1..5ad9be90e 100644 --- a/flixopt/elements.py +++ b/flixopt/elements.py @@ -5,6 +5,7 @@ from __future__ import annotations import logging +from collections import defaultdict from functools import cached_property from typing import TYPE_CHECKING @@ -15,7 +16,14 @@ from . import io as fx_io from .config import CONFIG from .core import PlausibilityError -from .features import MaskHelpers, StatusBuilder, fast_notnull, sparse_weighted_sum, stack_along_dim +from .features import ( + MaskHelpers, + StatusBuilder, + fast_notnull, + sparse_multiply_sum, + sparse_weighted_sum, + stack_along_dim, +) from .interface import InvestParameters, StatusParameters from .modeling import ModelingUtilitiesAbstract from .structure import ( @@ -1213,7 +1221,17 @@ def add_effect_contributions(self, effects_model) -> None: factors = self.data.effects_per_flow_hour if factors is not None: rate = self.rate.sel({dim: factors.coords[dim].values}) - effects_model.add_temporal_contribution(rate * (factors * dt), contributor_dim=dim) + for eid in factors.coords['effect'].values: + f_single = factors.sel(effect=eid, drop=True) # (flow,) or (flow, time) — pure DataArray, cheap + # Only include flows with nonzero factor + nonzero = f_single != 0 + if not nonzero.any(): + continue + effects_model.add_temporal_contribution( + rate * (f_single * dt), + contributor_dim=dim, + effect=str(eid), + ) # === Temporal: status effects === if self.status is not None: @@ -1221,38 +1239,94 @@ def add_effect_contributions(self, effects_model) -> None: if factor is not None: flow_ids = factor.coords[dim].values status_subset = self.status.sel({dim: flow_ids}) - effects_model.add_temporal_contribution(status_subset * (factor * dt), contributor_dim=dim) + for eid in factor.coords['effect'].values: + f_single = factor.sel(effect=eid, drop=True) + nonzero = f_single != 0 + if not nonzero.any(): + continue + effects_model.add_temporal_contribution( + status_subset * (f_single * dt), + contributor_dim=dim, + effect=str(eid), + ) factor = self.data.effects_per_startup if self.startup is not None and factor is not None: flow_ids = factor.coords[dim].values startup_subset = self.startup.sel({dim: flow_ids}) - effects_model.add_temporal_contribution(startup_subset * factor, contributor_dim=dim) + for eid in factor.coords['effect'].values: + f_single = factor.sel(effect=eid, drop=True) + nonzero = f_single != 0 + if not nonzero.any(): + continue + effects_model.add_temporal_contribution( + startup_subset * f_single, + contributor_dim=dim, + effect=str(eid), + ) # === Periodic: size * effects_per_size === inv = self.data._investment_data if inv is not None and inv.effects_per_size is not None: factors = inv.effects_per_size size = self.size.sel({dim: factors.coords[dim].values}) - effects_model.add_periodic_contribution(size * factors, contributor_dim=dim) + for eid in factors.coords['effect'].values: + f_single = factors.sel(effect=eid, drop=True) + nonzero = f_single != 0 + if not nonzero.any(): + continue + effects_model.add_periodic_contribution(size * f_single, contributor_dim=dim, effect=str(eid)) # Investment/retirement effects if self.invested is not None: - if (f := inv.effects_of_investment) is not None: - effects_model.add_periodic_contribution( - self.invested.sel({dim: f.coords[dim].values}) * f, contributor_dim=dim - ) - if (f := inv.effects_of_retirement) is not None: - effects_model.add_periodic_contribution( - self.invested.sel({dim: f.coords[dim].values}) * (-f), contributor_dim=dim - ) + if (ff := inv.effects_of_investment) is not None: + for eid in ff.coords['effect'].values: + f_single = ff.sel(effect=eid, drop=True) + nonzero = f_single != 0 + if not nonzero.any(): + continue + effects_model.add_periodic_contribution( + self.invested.sel({dim: f_single.coords[dim].values}) * f_single, + contributor_dim=dim, + effect=str(eid), + ) + if (ff := inv.effects_of_retirement) is not None: + for eid in ff.coords['effect'].values: + f_single = ff.sel(effect=eid, drop=True) + nonzero = f_single != 0 + if not nonzero.any(): + continue + effects_model.add_periodic_contribution( + self.invested.sel({dim: f_single.coords[dim].values}) * (-f_single), + contributor_dim=dim, + effect=str(eid), + ) # === Constants: mandatory fixed + retirement === if inv is not None: if inv.effects_of_investment_mandatory is not None: - effects_model.add_periodic_contribution(inv.effects_of_investment_mandatory, contributor_dim=dim) + # These already have effect dim — split per effect + mandatory = inv.effects_of_investment_mandatory + if 'effect' in mandatory.dims: + for eid in mandatory.coords['effect'].values: + effects_model.add_periodic_contribution( + mandatory.sel(effect=eid, drop=True), + contributor_dim=dim, + effect=str(eid), + ) + else: + effects_model.add_periodic_contribution(mandatory, contributor_dim=dim) if inv.effects_of_retirement_constant is not None: - effects_model.add_periodic_contribution(inv.effects_of_retirement_constant, contributor_dim=dim) + ret_const = inv.effects_of_retirement_constant + if 'effect' in ret_const.dims: + for eid in ret_const.coords['effect'].values: + effects_model.add_periodic_contribution( + ret_const.sel(effect=eid, drop=True), + contributor_dim=dim, + effect=str(eid), + ) + else: + effects_model.add_periodic_contribution(ret_const, contributor_dim=dim) # === Status Variables (cached_property) === @@ -1633,26 +1707,20 @@ def create_constraints(self) -> None: flow_dim = self._flows_model.dim_name # 'flow' bus_dim = self.dim_name # 'bus' - # Get ordered lists for coefficient matrix bus_ids = list(self.elements.keys()) - flow_ids = list(flow_rate.coords[flow_dim].values) - - if not bus_ids or not flow_ids: - logger.debug('BusesModel: no buses or flows, skipping balance constraints') + if not bus_ids: + logger.debug('BusesModel: no buses, skipping balance constraints') return - # Build coefficient matrix: +1 for inputs, -1 for outputs, 0 otherwise - coeffs = np.zeros((len(bus_ids), len(flow_ids)), dtype=np.float64) - for i, bus in enumerate(self.elements.values()): + # Build sparse coefficients: +1 for inputs, -1 for outputs + coefficients: dict[tuple[str, str], float] = {} + for bus in self.elements.values(): for f in bus.inputs: - coeffs[i, flow_ids.index(f.label_full)] = 1.0 + coefficients[(bus.label_full, f.label_full)] = 1.0 for f in bus.outputs: - coeffs[i, flow_ids.index(f.label_full)] = -1.0 + coefficients[(bus.label_full, f.label_full)] = -1.0 - coeffs_da = xr.DataArray(coeffs, dims=[bus_dim, flow_dim], coords={bus_dim: bus_ids, flow_dim: flow_ids}) - - # Balance = sum(inputs) - sum(outputs) - balance = sparse_weighted_sum(flow_rate, coeffs_da, sum_dim=flow_dim, group_dim=bus_dim) + balance = sparse_multiply_sum(flow_rate, coefficients, sum_dim=flow_dim, group_dim=bus_dim) if self.buses_with_imbalance: imbalance_ids = [b.label_full for b in self.buses_with_imbalance] @@ -2278,29 +2346,6 @@ def _max_equations(self) -> int: return 0 return max(len(c.conversion_factors) for c in self.converters_with_factors) - @cached_property - def _flow_sign(self) -> xr.DataArray: - """(converter, flow) sign: +1 for inputs, -1 for outputs, 0 if not involved.""" - all_flow_ids = self._flows_model.element_ids - - # Build sign array - sign_data = np.zeros((len(self._factor_element_ids), len(all_flow_ids))) - for i, conv in enumerate(self.converters_with_factors): - for flow in conv.inputs: - if flow.label_full in all_flow_ids: - j = all_flow_ids.index(flow.label_full) - sign_data[i, j] = 1.0 # inputs are positive - for flow in conv.outputs: - if flow.label_full in all_flow_ids: - j = all_flow_ids.index(flow.label_full) - sign_data[i, j] = -1.0 # outputs are negative - - return xr.DataArray( - sign_data, - dims=['converter', 'flow'], - coords={'converter': self._factor_element_ids, 'flow': all_flow_ids}, - ) - @cached_property def _equation_mask(self) -> xr.DataArray: """(converter, equation_idx) mask: 1 if equation exists, 0 otherwise.""" @@ -2318,95 +2363,45 @@ def _equation_mask(self) -> xr.DataArray: ) @cached_property - def _coefficients(self) -> xr.DataArray: - """(converter, equation_idx, flow, [time, ...]) conversion coefficients. + def _signed_coefficients(self) -> dict[tuple[str, str], float | xr.DataArray]: + """Sparse (converter_id, flow_id) -> signed coefficient mapping. - Returns DataArray with dims (converter, equation_idx, flow) for constant coefficients, - or (converter, equation_idx, flow, time, ...) for time-varying coefficients. - Values are 0 where flow is not involved in equation. + Returns a dict where keys are (converter_id, flow_id) tuples and values + are the signed coefficients (positive for inputs, negative for outputs). + For converters with multiple equations, values are DataArrays with an + equation_idx dimension. """ max_eq = self._max_equations - all_flow_ids = self._flows_model.element_ids - n_conv = len(self._factor_element_ids) - n_flows = len(all_flow_ids) + all_flow_ids_set = set(self._flows_model.element_ids) + + # Collect signed coefficients per (converter, flow) across equations + intermediate: dict[tuple[str, str], list[tuple[int, float | xr.DataArray]]] = defaultdict(list) - # Build flow_label -> flow_id mapping for each converter - conv_flow_maps = [] for conv in self.converters_with_factors: flow_map = {fl.label: fl.label_full for fl in conv.flows.values()} - conv_flow_maps.append(flow_map) - - # First pass: collect all coefficients and check for time-varying - coeff_values = {} # (i, eq_idx, j) -> value - has_dataarray = False - extra_coords = {} + # +1 for inputs, -1 for outputs + flow_signs = {f.label_full: 1.0 for f in conv.inputs if f.label_full in all_flow_ids_set} + flow_signs.update({f.label_full: -1.0 for f in conv.outputs if f.label_full in all_flow_ids_set}) - flow_id_to_idx = {fid: j for j, fid in enumerate(all_flow_ids)} - - for i, (conv, flow_map) in enumerate(zip(self.converters_with_factors, conv_flow_maps, strict=False)): for eq_idx, conv_factors in enumerate(conv.conversion_factors): for flow_label, coeff in conv_factors.items(): flow_id = flow_map.get(flow_label) - if flow_id and flow_id in flow_id_to_idx: - j = flow_id_to_idx[flow_id] - coeff_values[(i, eq_idx, j)] = coeff - if isinstance(coeff, xr.DataArray) and coeff.ndim > 0: - has_dataarray = True - for d in coeff.dims: - if d not in extra_coords: - extra_coords[d] = coeff.coords[d].values - - # Build the coefficient array - if not has_dataarray: - # Fast path: all scalars - use simple numpy array - data = np.zeros((n_conv, max_eq, n_flows), dtype=np.float64) - for (i, eq_idx, j), val in coeff_values.items(): - if isinstance(val, xr.DataArray): - data[i, eq_idx, j] = float(val.values) - else: - data[i, eq_idx, j] = float(val) - - return xr.DataArray( - data, - dims=['converter', 'equation_idx', 'flow'], - coords={ - 'converter': self._factor_element_ids, - 'equation_idx': list(range(max_eq)), - 'flow': all_flow_ids, - }, - ) - else: - # Slow path: some time-varying coefficients - broadcast all to common shape - extra_dims = list(extra_coords.keys()) - extra_shape = [len(c) for c in extra_coords.values()] - full_shape = [n_conv, max_eq, n_flows] + extra_shape - full_dims = ['converter', 'equation_idx', 'flow'] + extra_dims - - data = np.zeros(full_shape, dtype=np.float64) - - # Create template for broadcasting - template = xr.DataArray(coords=extra_coords, dims=extra_dims) if extra_coords else None - - for (i, eq_idx, j), val in coeff_values.items(): - if isinstance(val, xr.DataArray): - if val.ndim == 0: - data[i, eq_idx, j, ...] = float(val.values) - elif template is not None: - broadcasted = val.broadcast_like(template) - data[i, eq_idx, j, ...] = broadcasted.values - else: - data[i, eq_idx, j, ...] = val.values - else: - data[i, eq_idx, j, ...] = float(val) + sign = flow_signs.get(flow_id, 0.0) if flow_id else 0.0 + if sign != 0.0: + intermediate[(conv.label, flow_id)].append((eq_idx, coeff * sign)) + + # Stack each (converter, flow) pair's per-equation values into a DataArray + result: dict[tuple[str, str], float | xr.DataArray] = {} + eq_coords = list(range(max_eq)) - full_coords = { - 'converter': self._factor_element_ids, - 'equation_idx': list(range(max_eq)), - 'flow': all_flow_ids, - } - full_coords.update(extra_coords) + for key, entries in intermediate.items(): + # Build a list indexed by equation_idx (0.0 where equation doesn't use this flow) + per_eq: list[float | xr.DataArray] = [0.0] * max_eq + for eq_idx, val in entries: + per_eq[eq_idx] = val + result[key] = stack_along_dim(per_eq, dim='equation_idx', coords=eq_coords) - return xr.DataArray(data, dims=full_dims, coords=full_coords) + return result def create_linear_constraints(self) -> None: """Create batched linear conversion factor constraints. @@ -2414,17 +2409,16 @@ def create_linear_constraints(self) -> None: For each converter c with equation i: sum_f(flow_rate[f] * coefficient[c,i,f] * sign[c,f]) == 0 - Uses sparse_weighted_sum: each converter only touches its own 2-3 flows - instead of broadcasting across all flows in the system. + Uses sparse_multiply_sum: each converter only touches its own 2-3 flows + instead of allocating a dense coefficient array across all flows. """ if not self.converters_with_factors: return flow_rate = self._flows_model[FlowVarName.RATE] - signed_coeffs = self._coefficients * self._flow_sign # Sparse sum: only multiplies non-zero (converter, flow) pairs - flow_sum = sparse_weighted_sum(flow_rate, signed_coeffs, sum_dim='flow', group_dim='converter') + flow_sum = sparse_multiply_sum(flow_rate, self._signed_coefficients, sum_dim='flow', group_dim='converter') # Build valid mask: True where converter HAS that equation n_equations_per_converter = xr.DataArray( diff --git a/flixopt/features.py b/flixopt/features.py index b816bd25f..5b2ba139c 100644 --- a/flixopt/features.py +++ b/flixopt/features.py @@ -24,6 +24,9 @@ # ============================================================================= +Numeric = int | float | xr.DataArray + + def sparse_weighted_sum(var, coeffs: xr.DataArray, sum_dim: str, group_dim: str): """Compute (var * coeffs).sum(sum_dim) efficiently using sparse groupby. @@ -100,6 +103,63 @@ def sparse_weighted_sum(var, coeffs: xr.DataArray, sum_dim: str, group_dim: str) return result.drop_vars(sum_dim, errors='ignore') +def sparse_multiply_sum( + var, + coefficients: dict[tuple[str, str], Numeric], + sum_dim: str, + group_dim: str, +): + """Compute weighted sum of var over sum_dim, grouped by group_dim, from sparse coefficients. + + Unlike sparse_weighted_sum (which takes a dense DataArray and finds nonzeros), + this function takes an already-sparse dict of coefficients, avoiding the need + to ever allocate a dense array. + + Args: + var: linopy Variable with sum_dim as a dimension. + coefficients: dict mapping (group_id, sum_id) to scalar or DataArray coefficient. + Only non-zero entries should be included. + sum_dim: Dimension of var to select from and sum over (e.g. 'flow'). + group_dim: Output dimension name (e.g. 'converter'). + + Returns: + linopy expression with sum_dim removed, group_dim present. + """ + if not coefficients: + raise ValueError('coefficients dict is empty') + + # Unzip the sparse dict into parallel lists + group_ids_seen: dict[str, None] = {} + pair_group_ids: list[str] = [] + pair_sum_ids: list[str] = [] + pair_coeffs_list: list[Numeric] = [] + + for (gid, sid), coeff in coefficients.items(): + group_ids_seen[gid] = None + pair_group_ids.append(gid) + pair_sum_ids.append(sid) + pair_coeffs_list.append(coeff) + + group_ids = list(group_ids_seen) + + # Stack mixed scalar/DataArray coefficients into a single DataArray + pair_coords = list(range(len(pair_group_ids))) + pair_coeffs = stack_along_dim(pair_coeffs_list, dim='pair', coords=pair_coords) + + # Select var for active pairs, multiply by coefficients, group-sum + selected = var.sel({sum_dim: xr.DataArray(pair_sum_ids, dims=['pair'])}) + weighted = selected * pair_coeffs + + mapping = xr.DataArray(pair_group_ids, dims=['pair'], name=group_dim) + result = weighted.groupby(mapping).sum() + + # Reindex to original group order (groupby sorts alphabetically) + result = result.sel({group_dim: group_ids}) + + # Drop sum_dim coord left by vectorized sel + return result.drop_vars(sum_dim, errors='ignore') + + def fast_notnull(arr: xr.DataArray) -> xr.DataArray: """Fast notnull check using numpy (~55x faster than xr.DataArray.notnull()). diff --git a/flixopt/results.py b/flixopt/results.py index 66f738465..3d95357eb 100644 --- a/flixopt/results.py +++ b/flixopt/results.py @@ -797,7 +797,6 @@ def get_effect_shares( share_var_name = f'share|{mode}' if share_var_name in self.solution: share_var = self.solution[share_var_name] - # Find the contributor dimension contributor_dim = None for dim in ['contributor', 'flow', 'storage', 'component', 'source']: if dim in share_var.dims: diff --git a/flixopt/statistics_accessor.py b/flixopt/statistics_accessor.py index 50471e2d8..9b088bad8 100644 --- a/flixopt/statistics_accessor.py +++ b/flixopt/statistics_accessor.py @@ -805,22 +805,23 @@ def _create_effects_dataset(self, mode: Literal['temporal', 'periodic', 'total'] # Determine modes to process modes_to_process = ['temporal', 'periodic'] if mode == 'total' else [mode] - share_var_map = {'temporal': 'share|temporal', 'periodic': 'share|periodic'} - - # Detect contributors from batched share variables + # Detect contributors from combined share variables (share|temporal, share|periodic) detected_contributors: set[str] = set() for current_mode in modes_to_process: - share_name = share_var_map[current_mode] - if share_name in solution: - share_da = solution[share_name] - for c in share_da.coords['contributor'].values: - # Exclude effect-to-effect shares - base_name = str(c).split('(')[0] if '(' in str(c) else str(c) - if base_name not in effect_labels: - detected_contributors.add(str(c)) + share_name = f'share|{current_mode}' + if share_name not in solution: + continue + share_da = solution[share_name] + for c in share_da.coords['contributor'].values: + base_name = str(c).split('(')[0] if '(' in str(c) else str(c) + if base_name not in effect_labels: + detected_contributors.add(str(c)) contributors = sorted(detected_contributors) + if not contributors: + return xr.Dataset() + # Build metadata for each contributor def get_parent_component(contributor: str) -> str: if contributor in self._fs.flows: @@ -851,15 +852,6 @@ def get_contributor_type(contributor: str) -> str: share_total: xr.DataArray | None = None for current_mode in modes_to_process: - share_name = share_var_map[current_mode] - if share_name not in solution: - continue - share_da = solution[share_name] - - # Check if this contributor exists in the share variable - if contributor not in share_da.coords['contributor'].values: - continue - # Get conversion factors: which source effects contribute to this target effect conversion_factors = { key[0]: value @@ -869,9 +861,15 @@ def get_contributor_type(contributor: str) -> str: conversion_factors[effect] = 1 # Direct contribution for source_effect, factor in conversion_factors.items(): + share_name = f'share|{current_mode}' + if share_name not in solution: + continue + share_da = solution[share_name] if source_effect not in share_da.coords['effect'].values: continue - da = share_da.sel(contributor=contributor, effect=source_effect) * factor + if contributor not in share_da.coords['contributor'].values: + continue + da = share_da.sel(effect=source_effect, contributor=contributor, drop=True) * factor # For total mode, sum temporal over time (apply cluster_weight for proper weighting) if mode == 'total' and current_mode == 'temporal' and 'time' in da.dims: weighted = da * self._fs.weights.get('cluster', 1.0) diff --git a/flixopt/transform_accessor.py b/flixopt/transform_accessor.py index b8951bf56..1123b9f66 100644 --- a/flixopt/transform_accessor.py +++ b/flixopt/transform_accessor.py @@ -469,11 +469,8 @@ def _cluster_indices_per_timestep(self) -> xr.DataArray: @staticmethod def _get_mode(var_name: str) -> ExpansionMode: - """Look up expansion mode for a variable name via suffix matching.""" - for suffix, mode in NAME_TO_EXPANSION.items(): - if var_name == suffix or var_name.endswith(suffix): - return mode - return ExpansionMode.REPEAT + """Look up expansion mode for a variable name.""" + return NAME_TO_EXPANSION.get(var_name, ExpansionMode.REPEAT) def _append_final_state(self, expanded: xr.DataArray, da: xr.DataArray) -> xr.DataArray: """Append final state value from original data to expanded data.""" diff --git a/tests/test_flow.py b/tests/test_flow.py index 64b9bf84d..2feabf39e 100644 --- a/tests/test_flow.py +++ b/tests/test_flow.py @@ -102,7 +102,9 @@ def test_effects_per_flow_hour(self, basic_flow_system_linopy_coords, coords_con model = create_linopy_model(flow_system) # Batched temporal shares are managed by the EffectsModel - assert 'share|temporal' in model.constraints, 'Batched temporal share constraint should exist' + assert any(c.startswith('share|temporal') for c in model.constraints), ( + 'Temporal share constraint(s) should exist' + ) # Check batched effect variables exist assert 'effect|per_timestep' in model.variables, 'Batched effect per_timestep should exist' diff --git a/tests/test_functional.py b/tests/test_functional.py index 509be5d04..f309b52de 100644 --- a/tests/test_functional.py +++ b/tests/test_functional.py @@ -129,7 +129,7 @@ def test_minimal_model(solver_fixture, time_steps_fixture): ) assert_allclose( - flow_system.solution['share|temporal'].sel(contributor='Gastarif(Gas)', effect='costs').values[:-1], + flow_system.solution['share|temporal'].sel(effect='costs', contributor='Gastarif(Gas)').values[:-1], [-0.0, 20.0, 40.0, -0.0, 20.0], rtol=1e-5, atol=1e-10,