Source code for energia.modeling.constraints.vmap

"""Map"""

from __future__ import annotations

import logging
from functools import cached_property
from operator import is_
from typing import TYPE_CHECKING

from gana import sigma

from ...utils.decorators import timer

logger = logging.getLogger("energia")

if TYPE_CHECKING:
    from gana.sets.constraint import C

    from ..._core._x import _X
    from ...components.temporal.periods import Periods
    from ..indices.domain import Domain
    from ..variables.aspect import Aspect


[docs] class Map: """ Maps between domains :param aspect: Aspect to which the constraint is applied. :type aspect: Aspect :param domain: Domain over which the aspect is defined. :type domain: Domain :param reporting: If True, the map is for a reporting variable. :type reporting: bool :param label: Label for the constraint. Defaults to "". :type label: str """ def __init__( self, aspect: Aspect, domain: Domain, reporting=False, label: str = "" ): self.aspect = aspect self.domain = domain self.reporting = reporting self.label = label self._handshake() if self.domain.lag: return # these are periods denser and sparser than the current domain self._map_across_time() # Space mapping --- self._map_across_space() # Bind mapping --- self._map_across_samples() # Mode mapping --- self._map_across_modes() # -------------------------------------------------------------------# # Helper functions # -------------------------------------------------------------------#
[docs] @timer(logger, kind='map') def write(self, from_domain: Domain, to_domain: Domain, tsum=False, msum=False): """Scales up variable to a lower dimension""" # if the variable is being defined for the first time, do not bother with the rest # Also note that if a variable already exists then a new is not created # thus map_domain is not called in Bind.V() # Essentially, the domain being added is always new! # 1. a new domain is being added to a new disposition # e.g.: # (water, dam, goa, year) with disposition ['resource', 'operation', 'space', 'time'] exists # (dam, goa, q) with disposition ['operation', 'space', 'time'] is added # # 2. a new domain is being added to an existing disposition # # e.g.: # # (dam, goa, year) with disposition ['operation', 'space', 'time'] exists # # (dam, goa, q) with disposition ['operation', 'space', 'time'] is added # # The variable will need to be mapped across 'time # mapping happens between: # 1. sparser time to denser time in same space # 2. contained location to parent location # 3. (sparser time, contained location) to (denser time, parent location) # you cant map at the same level # consider the State Goa, with two towns Madgaon and Ponje # Goa with Maharashtra, make up India (for the purpose of modeling) # Aspect from Goa can be mapped to India # Goa does not need to be mapped to Maharashtra # Madgaon and Ponje are both mapped to Just Goa # note that .alsohas() checks at lower levels # the operators <, > wont work since they will add any space a higher order # this will lead to adding twice. India = Goa + Madgaon + Ponje (WRONG) # We scale only one level up if self._check_validity(): return exists = self._check_existing(to_domain, from_domain) self.cons_name = self._give_cname(self.var, from_domain, to_domain, tsum, msum) v_lower = self(*to_domain).X() if self.reporting else self(*to_domain).V() if not tsum and not msum and exists: cons_existing: C = getattr(self.program, self.cons_name) setattr( self.program, self.cons_name, cons_existing - self.rhs(from_domain, tsum, msum), ) else: cons = v_lower == self.rhs(from_domain, tsum, msum) setattr(self.program, self.cons_name, cons) cons.categorize("Mapping") self._inform(from_domain) return (self.aspect, from_domain, to_domain)
[docs] @cached_property def var(self): return self.aspect.reporting if self.reporting else self.aspect
[docs] @cached_property def name(self) -> str: return f"{self.aspect.name}_map"
[docs] @cached_property def maps(self) -> dict[Domain, list[Domain]]: return ( self.aspect.maps_report if self.reporting is not None else self.aspect.maps )
def _map_to_sparser_time(self, sparser_periods: list[Periods]): """ Maps to sparser time periods :param sparser_periods: Periods sparser than the domain period :type sparser_periods: list[Periods] """ for sp in sparser_periods: # check if the aspect has been defined for a sparser period # this creates a map from this domain to a sparser domain if sp in self.dispositions[self.space] and is_(sp.of, self.time): self.write(self.domain, self.domain.edit({"periods": sp}), tsum=True) def _map_from_denser_time(self, denser_periods: list[Periods]): """ Maps from denser time periods :param denser_periods: Periods denser than the domain period :type denser_periods: list[Periods] """ for dp in denser_periods: if dp in self.dispositions[self.space] and is_(self.time.of, dp): binds_dict = self.dispositions[self.space][dp] # here I am re creating Bind objects # from the dict of the form {aspect: {component: {aspect: {component: {...}}}}} # there has to be a way to avoid this # I make a list of bounds as such [aspect(component), aspect(component), ...] samples = [ aspect(component) for aspect, comp_dict in binds_dict.items() for component in comp_dict ] from_domain = self.domain.copy() from_domain.periods, from_domain.samples = dp, samples self.write(from_domain, self.domain, tsum=True) def _map_across_time(self): """ Maps across time """ denser_periods, sparser_periods = self.model.time.split(self.time) self._map_to_sparser_time(sparser_periods) self._map_from_denser_time(denser_periods) def _map_across_space(self): """ Maps across space :param contained_locs: Locations contained in the domain location :type contained_locs: list[Space] :param parent_loc: Parent location of the domain location :type parent_loc: Space | None :param time: Time at which the domain is defined :type time: Periods """ parent_loc = self.domain.space.isin if parent_loc: if ( parent_loc not in self.dispositions or self.time not in self.dispositions[parent_loc] ): return self.write( self.domain, self.domain.edit({"location": parent_loc}), ) for space in self.domain.space.has: if space in self.dispositions and self.time in self.dispositions[space]: # get deciding samples if they exist samples = [ aspect(component) for aspect, components in self.dispositions[space][ self.time ].items() for component in components ] for sample in samples: if self.domain.linkage: self.write( self.domain.edit({"linkage": space, "samples": [sample]}), self.domain, ) elif self.domain.location: self.write( self.domain.edit({"location": space, "samples": [sample]}), self.domain, ) def _map_across_samples(self): if self.domain.samples or not self.dispositions[self.space][self.time]: return # if the current variable being declared has no samples # but the aspect has already been defined at this location and time with samples # there is a need to map from the defined samples to no samples # get list of domains of the aspect domains = [ d for d in self.aspect.domains if is_(d.primary, self.domain.primary) and is_(d.space, self.space) and is_(d.periods, self.time) and d.samples ] for domain in domains: if domain.modes is None: self.write(domain, self.domain) def _map_across_modes(self): # modes need some additional checks # constraints could be written using parent modes or child modes # these checks avoid writing the same constraint twice # AVOIDS: v_t = v_t,m1 + v_t,m2 + v_t,m; v_t,m = v_t,m1 + v_t,m2 # CORRECT: v_t = v_t,m; v_t,m = v_t,m1 + v_t,m2 OR v_t = v_t,m1 + v_t,m2 if not self.domain.modes: return if not self._special_modes_check( self.domain, self.domain.edit({"modes": None}) ): return if self.domain.modes.parent: self.write(self.domain, self.domain.edit({"modes": None})) else: self.write(self.domain, self.domain.edit({"modes": None}), msum=True) def _check_existing(self, to_domain: Domain, from_domain: Domain) -> bool: """Checks if the map constraint already exists""" what = (to_domain - from_domain)[0] if to_domain not in self.maps[what]: self.maps[what][to_domain] = [from_domain] # make new constraint return False if from_domain in self.maps[what][to_domain]: # There is an existing map return True else: self.maps[what][to_domain].append(from_domain) return True if what in ["samples", "modes"]: return True def _special_modes_check(self, from_domain: Domain, to_domain: Domain) -> None: """ Checks if constraints have been written for child modes or for parent modes """ if to_domain in self.maps["modes"]: if from_domain.modes.parent: if ( from_domain.edit({"modes": from_domain.modes.parent}) in self.maps["modes"][to_domain] ): return False else: for modes in from_domain.modes._: if ( from_domain.edit({"modes": modes}) in self.maps["modes"][to_domain] ): return False return True def _give_cname( self, var, from_domain: Domain, to_domain: Domain, tsum: bool = False, msum: bool = False, ) -> str: """Return canonical map constraint name based on domain relationship and aggregation type.""" if tsum: return f"{var}{from_domain.idxname}_to_{to_domain.idxname}_tmap" if msum: # Original behavior: use from_domain idxname for per-mode naming return f"{var}{from_domain.idxname}_mmap" if from_domain.modes: if from_domain.modes.parent: parent_domain = from_domain.edit({"modes": from_domain.modes.parent}) return f"{var}{parent_domain.idxname}_mmap" return f"{var}{from_domain.idxname}_mmap" return f"{var}{to_domain.idxname}_map"
[docs] def rhs(self, domain: Domain, tsum=False, msum=False): """Gives the sum of the variable over the domain""" varname = ( f"x_{self.aspect.name}" if (msum and self.reporting) else self.aspect.name ) v = getattr(self.program, varname) if tsum: # if the domain has been mapped to but this is a time sum # we need to first map time # and then add it to an existing map at a lower domain return sigma(v(*domain.I), domain.time.i) if msum: # if the domain has been mapped to but this is a mode sum # we need to first map modes # and then add it to an existing map at a lower domain return sigma(v(*domain.I), domain.modes.I) # the copy is important since otherwise, the printing will take # the update index if the variable is mutated return v(*domain.I).copy()
# -------------------------------------------------------------------# # Constraint writing # -------------------------------------------------------------------# def _check_validity(self) -> bool: """Check space and time""" if ( self.space not in self.dispositions or self.time not in self.dispositions[self.space] ): return True def _inform(self, from_domain: Domain): """Inform components of new constraint""" self.aspect.constraints.add(self.cons_name) from_domain.inform_components_of_cons(self.cons_name) def _handshake(self): """Borrow attributes from aspect""" self.model = self.aspect.model # these are spaces contained in location and parent location to which this location belongs # this gives all the dispositions at which the aspect has been defined self.dispositions = self.model.dispositions[self.aspect][self.domain.primary] self.program = self.model.program # this is the disposition of the variable to be mapped # through time and space self.time, self.space = self.domain.periods, self.domain.space def __call__(self, *index: _X): return self.aspect(*index)