Source code for energia.modeling.constraints.bind

"""Bind constraint"""

from __future__ import annotations

import logging
from functools import cached_property
from typing import TYPE_CHECKING

from gana import V
from gana.sets.function import F

from ...utils.decorators import timer
from ...utils.math import normalize

logger = logging.getLogger("energia")


if TYPE_CHECKING:
    from gana import P as Param
    from gana.sets.constraint import C

    from ..._core._component import _Component
    from ..._core._x import _X
    from ..indices.sample import Sample


[docs] class Bind: """Bind constraint :param sample: The sample variable to bind :type sample: Sample :param parameter: The parameter bound :type parameter: float | list[float] | dict[float, float] | tuple[float, float] | list[tuple[float, float]] :param leq: If True, the sample is constrained to be less than or equal to the bound :type leq: bool :param geq: If True, the sample is constrained to be greater than or equal to the bound :type geq: bool :param eq: If True, the sample is constrained to be equal to the bound :type eq: bool :param forall: If provided, the constraint is applied for all elements in this list :type forall: list[_X | _Component] | None :ivar model: The model to which the component belongs. :vartype model: Model :ivar nominal: The nominal value of the sample variable. :vartype nominal: float | None :ivar norm: If True, the sample variable is normalized. :vartype norm: bool :ivar domain: The domain of the sample. :vartype domain: Domain :ivar aspect: The aspect of the sample. :vartype aspect: Aspect :ivar report: If True, the sample variable is reported. :vartype report: bool :ivar program: The program to which the sample belongs. :vartype program: Prg """ def __init__( self, sample: Sample, parameter: ( float | list[float] | dict[float, float] | tuple[float, float] | list[tuple[float, float]] | list[dict[float, float]] ), leq: bool = False, geq: bool = False, eq: bool = False, forall: list[_X | _Component] | None = None, parameter_name: str = "", ): self.sample = sample self._parameter, self.parameter_name = parameter, parameter_name self.leq, self.geq, self.eq = leq, geq, eq self.forall = forall self._handshake() if self.forall: # if as set is passed # write the constraint 'for all' elements in it self._write_forall() return if isinstance(self._parameter, dict): # if a dict is passed, it is assumed to be mode bounds if self.iscalc: self._calc_w_modes() else: self._write_w_modes() return self.write() # try: # self.write() # except TypeError: # # TODO: not yet implemented # # TODO: this is essentially mode of a mode # # TODO: modes will need to made tuple maybe # if any(isinstance(x, dict) for x in self._parameter): # self._write_w_modes_of_modes()
[docs] @timer(logger, kind="bind") def write(self): """Writes the bind constraint""" # the lhs comes from the sample # calling the lhs here, updates the _ = self.lhs if self._check_existing(): return False if self.leq: self.cons: C = self.lhs <= self.rhs elif self.eq: self.cons: C = self.lhs == self.rhs elif self.geq: self.cons: C = self.lhs >= self.rhs else: return False self._categorize() self._inform() # set the constraint setattr( self.program, self.cons_name, self.cons, ) # returned for @timer return self.sample, self.rel
def _listed(self): """Gets the parameter as a list if needed""" if not isinstance(self._parameter, list) and self.forall: return [self._parameter] * len(self.forall) return self._parameter def _normalized(self, _parameter): """Gets the normalized parameter if needed""" if self.norm: return normalize(_parameter) return _parameter def _nominalized(self, _parameter): """Gets the nominalized parameter if needed""" if self.nominal: # if a nominal value for the self.parameter is passed # this is essentially the expectation # skipping an instance check here # if a non iterable is passed, let an error be raised # if the sample needs to be normalized _parameter = [ ( (self.nominal * i[0], self.nominal * i[1]) if isinstance(i, tuple) else self.nominal * i ) for i in _parameter ] return _parameter return _parameter
[docs] @cached_property def parameter( self, ) -> ( list[float] | float | dict[float, float] | tuple[float, float] | list[tuple[float, float]] ): """Parameter bound of the bind constraint""" return self._nominalized(self._normalized(self._listed()))
[docs] @cached_property def lhs(self): """Left hand side of the bind constraint""" # ------Get LHS # lhs needs to be determined here # because V will be spaced and timed if not passed by user # .X(), .Vb() need time and space return self.sample.V(self.parameter)
def _rhs_w_bound(self) -> V | F: """When the rhs is bound by a variable""" _bound = self.sample.X(self.parameter) if self.report else self.sample.Vb() return self.parameter * _bound def _rhs_w_binary(self) -> V | F: """When the rhs is bound by a binary variable""" _bound = self.parameter * self.sample.X(self.parameter) self.aspect.update(self.domain, reporting=True) return _bound def _handle_multiplier(self): """Gets the parameter with multiplier if needed""" if isinstance(self.parameter, list): return [p * self.domain.space.multiplier for p in self.parameter] return self.parameter * self.domain.space.multiplier def _rhs_w_multiplier(self) -> V | F | Param: """When the rhs is bound by a multiplier""" if self.aspect.use_multiplier: _bound = self._handle_multiplier() else: _bound = self.parameter return _bound * self.of(*self.domain.index_spatiotemporal).V(self.parameter) @property def rhs(self) -> V | F | Param: """Right hand side of the bind constraint""" if self.of: return self._rhs_w_multiplier() if self.aspect.bound: return self._rhs_w_bound() if self.report or self.domain.modes is not None: return self._rhs_w_binary() return self.parameter
[docs] @cached_property def rel(self): """Constraint name suffix""" if self.leq: return "ub" elif self.geq: return "lb" # equality if self.iscalc: return "inc_calc" if self.report else "calc" return "eq"
[docs] @cached_property def cons_name(self): """Constraint name""" return rf"{self.aspect.name}{self.domain.idxname}_{self.rel}"
@property def iscalc(self) -> bool: """Is this a calculation bind constraint?""" return self.of is not None def _write_forall(self): """Writes the bind constraint for all elements in the set""" for n, idx in enumerate(self.forall): lhs, rhs = self.sample(idx), self.parameter[n] if self.leq: _ = lhs <= rhs if self.geq: _ = lhs >= rhs if self.eq: _ = lhs == rhs def _calc_w_modes(self): """Write with modes""" # if this is a dict, piece wise linear functions are being passed # Take the example of expenditure and capacity being modeled # sat the input is {(0, 200): 5000, (200, 300): 4000, (300, 400): 3000 # what this implies is that for the capacity between 0 and 200, the expenditure is 5000 # for the capacity between 200 and 300, the expenditure is 4000 # for the capacity between 300 and 400, the expenditure is 3000 # Modes objects index the bin. Three in this case : 1 - (0,200), 2 - (200,300), 3 - (300,400) # We need the following equations: # *1. capacity = capacity(bin0) + capacity(bin1) + capacity(bin2) # *2. x_capacity = 0*capacity(bin0) + 200*capacity(bin1) + 300*capacity(bin2), where is a reporting binary # *3-5. spend(bin0) = 5000*capacity(bin0); spend(bin1) = 4000*capacity(bin1); spend(bin2) = 3000*capacity(bin2) # *6. spend = spend(bin0) + spend(bin1) + spend(bin2) # this takes care of *1 and *2 _ = self.of == dict(enumerate(self.parameter)) # this takes care of *3-*6 # the new modes object would have just been added to the model modes = self.model.modes[-1] _ = self.of(modes)[self.sample(modes)] == list(self.parameter.values()) def _write_w_modes(self): """Writes the bind constraint with modes""" # create modes self.modes = self.model.Modes(size=len(self.parameter), sample=self.sample) mode_bounds = [ ( (self.parameter[i - 1], self.parameter[i]) if i - 1 in self.parameter else (0, self.parameter[i]) ) for i in self.parameter ] _ = self.sample(self.modes) >= [b[0] for b in mode_bounds] _ = self.sample(self.modes) <= [b[1] for b in mode_bounds] # def _write_w_modes_of_modes(self): # """Writes the bind constraint with modes of modes""" # # Consider something like this: # # m.USD.spend(m.PV.capacity, m.PV.construction.modes) == [ # # {100: 1000, 500: 900, 1000: 800}, # # {100: 2000, 500: 1800, 1000: 1600}, # # {100: 3000, 500: 2700, 1000: 2400}, # # ] # # I don't want to run a check for this every time, so just catch the error # if self.domain.modes is not None: # for n, p in enumerate(self._parameter): # s = self.sample.aspect( # *self.domain.edit({'modes': self.domain.modes[n]}) # ) # if self.leq: # _ = s <= p # elif self.geq: # _ = s >= p # elif self.eq: # _ = s == p def _check_existing(self) -> bool: """Checks if aspect already has been bound in that space""" if not self.iscalc and not self.domain.modes: try: if self.model.scenario[self.aspect][self.domain.primary][ self.domain.space ][self.domain.time][self.rel]: return True except KeyError: pass return False def _categorize(self): """Categorizes the constraint""" # categorize the constraint if self.iscalc: self.cons.categorize("Calculations") elif self.domain.modes: self.cons.categorize("Piecewise Linear") else: self.cons.categorize("Binds") def _inform(self): """Informs the aspect and domain about the bind constraint""" # let the aspect know about the new constraint self.aspect.constraints.add(self.cons_name) # let all objects in the domain know that # a constraint with this name contains it self.domain.inform_components_of_cons(self.cons_name) self.model.scenario.update(self.sample, self.rel, self.P) def _handshake(self): """Borrow attributes from sample""" # borrowed from Sample self.model = self.sample.model self.nominal = self.sample.nominal self.norm = self.sample.norm self.domain = self.sample.domain self.aspect = self.sample.aspect self.report = self.sample.report self.program = self.sample.program self.of = self.sample.of @property def P(self): """Gets the parameter set""" if isinstance(self.cons.two, F): return self.cons.two.one if isinstance(self.cons.two, V): return 1.0 return self.cons.two