"""Data management utilities"""
import numpy as np
import pandas as pd
# from ..solution.result import Result
[docs]
def make_henry_price_df(
file_name: str,
year: int,
stretch: bool = False,
) -> pd.DataFrame:
"""
Makes a DataFrame from data with missing values filled using previous day values.
The costs are converted from $/MMBtu to $/kg using a factor of 1/22.4.
Only works if there is a full year of data (365 days).
If `stretch` is True, the timescale is repeated to expand from days (365) to hours (8760).
Parameters
----------
file_name : :class:`str`
Path to the CSV file containing the data.
year : :class:`int`
Year to import data from.
stretch : :class:`bool`, optional
If True, stretches the timescale from days to hours. Defaults to ``False``.
Returns
-------
:class:`pandas.DataFrame`
DataFrame containing varying natural gas prices with missing values filled.
"""
df = pd.read_csv(file_name, skiprows=5, names=["date", "CH4"])
df[["month", "day", "year"]] = df["date"].str.split("/", expand=True)
df = df[df["year"] == str(year)].astype({"month": int, "day": int, "year": int})
# , format='%d%b%Y:%H:%M:%S.%f')
df["date"] = pd.to_datetime(df["date"])
df["doy"] = df["date"].dt.dayofyear
df = df.sort_values(by=["doy"])
df = df.drop(columns="date").dropna(axis="rows")
doy_list = list(df["doy"])
# fixes values for weekends and holidays to last active day
for i in np.arange(1, 366):
if i not in doy_list:
if i == 1: # onetime fix if first day has no value, takes value from day 2
df = pd.concat(
[
df,
pd.DataFrame.from_records(
[
{
"CH4": df["CH4"][df["doy"] == 2].values[0],
"month": 1,
"day": 1,
"year": year,
"doy": 1,
},
],
),
],
)
else:
df = pd.concat(
[
df,
pd.DataFrame.from_records(
[
{
"CH4": df["CH4"][df["doy"] == i - 1].values[0],
"month": df["month"][df["doy"] == i - 1].values[0],
"day": df["day"][df["doy"] == i - 1].values[0],
"year": df["year"][df["doy"] == i - 1].values[0],
"doy": i,
},
],
),
],
)
df = df.sort_values(by=["doy"])
df = df.reset_index(drop=True)
df["CH4"] = df["CH4"] / 22.4 # convert from $/MMBtu to $/kg
df = df[["CH4", "doy"]].rename(columns={"doy": "day"})
if stretch is False:
df = df
df["scales"] = [(0, int(i - 1)) for i in df["day"]]
else:
df = df.loc[df.index.repeat(24)].reset_index(drop=True)
df["hour"] = [int(i) for i in range(0, 24)] * 365
df["scales"] = [(0, int(i - 1), int(j)) for i, j in zip(df["day"], df["hour"])]
df = df[["CH4", "scales"]]
return df
[docs]
def remove_outliers(
data: pd.DataFrame,
sd_cuttoff: int = 2,
mean_range: int = 1,
) -> pd.DataFrame:
"""
Removes outliers up to a chosen number of standard deviations.
Outliers are replaced with the mean of data points on both sides of the point.
Parameters
----------
data : :class:`pandas.DataFrame`
Input data.
sd_cutoff : :class:`int`, optional
Remove data points that are beyond this many standard deviations. Defaults to ``2``.
mean_range : :class:`int`, optional
Number of neighboring data points on each side to average over when replacing outliers. Defaults to ``1``.
Returns
-------
:class:`pandas.DataFrame`
DataFrame with outliers replaced by local means.
"""
data_mean, data_std = data.mean(), data.std()
# identify outliers
cut_off = data_std * sd_cuttoff
lower, upper = data_mean - cut_off, data_mean + cut_off
for i in range(len(data)):
x = data.iloc[i].values[0]
if x < float(lower.iloc[0]) or x > float(upper.iloc[0]):
data.iloc[i] = (
(
sum(data.iloc[i - (j + 1)] for j in range(mean_range))
+ sum(data.iloc[i + (j + 1)] for j in range(mean_range))
)
/ 2
* mean_range
)
return data
# def get_data(file_name: str) -> dict:
# """gets data from energia json database
# Args:
# file_name (str): name of file
# Returns:
# dict: dictionary with data
# """
# with open(file_name + '.json') as f:
# data_ = json.load(f)
# f.close()
# return data_
# def dump_data(data: dict, file_name: str):
# """dump data in any of the following formats: .json, .txt, .pkl
# Args:
# data (dict): dictionary with cost trajectories for all processes
# file_name (str): name of output file with format, e.g. file_name.pkl
# """
# if '.pkl' in file_name:
# with open(file_name, "wb") as f:
# pickle.dump(data, f)
# f.close()
# elif '.json' in file_name:
# with open(file_name, 'w') as f:
# json.dump(data, f)
# elif '.txt' in file_name:
# with open(file_name, "w") as f:
# f.write(str(data))
# f.close()
# return
# def make_conversion_dict(file_name: str) -> dict:
# """updates conversion.json conversion values by process
# Returns:
# dict: dictionary with conversion values
# """
# conversion_dict_ = (
# pandas.read_csv(file_name, index_col=0)
# .dropna(axis='rows')
# .transpose()
# .to_dict()
# )
# dump_data(conversion_dict_, 'conversion.json')
# return conversion_dict_
# def make_material_dict(file_name: str) -> dict:
# """updates infra_mat.json which contains infrastructaral material needs by facility
# Args:
# file_name (str):name of file
# Returns:
# dict: dictionary with infrastructaral material needs
# """
# material_dict_ = (
# pandas.read_csv(file_name, index_col=0)
# .dropna(axis='rows')
# .transpose()
# .to_dict()
# )
# dump_data(material_dict_, 'material.json')
# return material_dict_
# def make_cost_dict(
# location_list: list, cost_scenario_list: list, process_list: list, year_list: list
# ) -> dict:
# """intializes an empty dictionary for cost data for all processes
# Args:
# location_list (list): list of locations
# cost_scenario_list (list): list of scenarios
# process_list (list): list of processes
# year_list (list): list of years
# Returns:
# dict: dictionary with costs
# """
# cost_metrics_list = ['CAPEX', 'Fixed O&M', 'Variable O&M', 'units', 'source']
# cost_dict = {
# location_.name: {
# cost_scenario.name: {
# process_.name: {
# year_: {cost_metric_: {} for cost_metric_ in cost_metrics_list}
# for year_ in year_list
# }
# for process_ in process_list
# }
# for cost_scenario in cost_scenario_list
# }
# for location_ in location_list
# }
# return cost_dict
# def make_f_purchase(
# location_list: list,
# day_list: list,
# hour_list: list,
# resource_list: list,
# varying_resource_df: pandas.DataFrame,
# ) -> dict:
# """makes a dictionary for varying resource costs.
# minimum resolution hour|
# Args:
# location_list (list): list of locations
# day_list (list): list of days/seasons
# hour_list (list): list of hours
# process_list (list): list of processes
# varying_resource_df (pandas.DataFrame): dataframe with varying resource costs
# Returns:
# dict: dictionary containing hourly conversion factors for all processes
# """
# f_purchase_dict_ = {
# location.name: {
# resource.name: {day: {hour: {} for hour in hour_list} for day in day_list}
# for resource in resource_list
# }
# for location in location_list
# }
# for location, resource, day, hour in product(
# location_list, resource_list, day_list, hour_list
# ):
# if resource.name in varying_resource_df:
# f_purchase_dict_[location.name][resource.name][day][
# hour
# ] = varying_resource_df[varying_resource_df['day'] == day][
# resource.name
# ].values[
# 0
# ] # use day of the year (doy)
# else:
# f_purchase_dict_[location.name][resource.name][day][hour] = 1
# # varying_resource_df2_ = pandas.DataFrame(columns = ['day', 'hour', var])
# # for day, hour in product(varying_resource_df['day'], hour_list):
# # varying_resource_df2_ = varying_resource_df2_.append({'day': day, 'hour': hour, var: varying_resource_df[var][varying_resource_df['day'] == day].values[0] }, ignore_index = True)
# # , varying_resource_df2_
# return f_purchase_dict_
# # def make_nrel_cost_df(location: Location, nrel_cost_xlsx, pick_nrel_process_list: list, year_list: list, case: str, crpyears: float) -> pandas.DataFrame:
# # """makes dataframe for nrel atb data
# # processes should be specified based on NREL technology tags
# # classes for technology will be picked based on location
# # list of cost metrics ('CAPEX', 'Fixed O&M', 'Variable O&M')
# # Args:
# # location (location): location object
# # nrel_cost_xlsx (.xlsx): excel file from NREL ATB
# # pick_nrel_process_list (list): list of nrel defined processes
# # year_list (list): list of years
# # case (str): Market or Research case
# # crpyears (float): cost recovery period
# # Returns:
# # pandas.DataFrame: with nrel costing data for the specified year
# # """
# # cost_metrics_list = ['CAPEX', 'Fixed O&M', 'Variable O&M', 'units']
# # # import nrel atb cost dataset
# # nrel_cost_df_ = pandas.read_excel(nrel_cost_xlsx, sheet_name='cost')
# # nrel_cost_df_ = nrel_cost_df_[nrel_cost_df_['technology'].isin(
# # pick_nrel_process_list)] # choose technologies
# # nrel_cost_df_ = nrel_cost_df_[nrel_cost_df_['core_metric_parameter'].isin(
# # cost_metrics_list)] # choose costing data to import
# # nrel_cost_df_ = nrel_cost_df_[nrel_cost_df_[
# # 'core_metric_case'].isin([case])] # choose market case
# # nrel_cost_df_ = nrel_cost_df_[nrel_cost_df_['crpyears'].isin(
# # [crpyears])] # choose cost recovery period of 20 year
# # nrel_cost_df_['technology'].replace({'LandbasedWind': 'WF', 'UtilityPV': 'PV', 'Utility-Scale Battery Storage': 'LiI_c',
# # 'Pumped Storage Hydropower': 'PSH_c'}, inplace=True) # replace names to process IDS
# # nrel_cost_df_ = nrel_cost_df_[((nrel_cost_df_['technology'] == 'PV') & (nrel_cost_df_['techdetail'] == location.PV_class)) | # class 5 solar PV\
# # ((nrel_cost_df_['technology'] == 'WF') & (nrel_cost_df_[
# # 'techdetail'] == location.WF_class)) # class 4 wind farms\
# # | ((nrel_cost_df_['technology'] == 'LiI_c') & (nrel_cost_df_['techdetail'] == location.LiI_class)) # 8hr battery cycle LiI\
# # | ((nrel_cost_df_['technology'] == 'PSH_c') & (nrel_cost_df_['techdetail'] == location.PSH_class))] # class 3 PSH
# # year_list = [year_ + 2021 for year_ in year_list]
# # nrel_cost_df_ = nrel_cost_df_[nrel_cost_df_['core_metric_variable'].isin(
# # year_list)] # get data for years in years_list
# # nrel_cost_df_ = nrel_cost_df_.drop(columns=['index', 'revision', 'atb_year', 'core_metric_key',
# # 'core_metric_case', 'crpyears', 'techdetail']) # drop unnecessary columns
# # nrel_cost_df_.columns = [
# # 'metric', 'process', 'cost_scenario_list', 'year', 'units', 'cost'] # rename columns
# # nrel_cost_df_ = nrel_cost_df_.reset_index(drop=True) # reset index
# # # change years to int list 0...
# # nrel_cost_df_['year'] = nrel_cost_df_['year'] - 2021
# # nrel_cost_df_['cost_scenario_list'] = nrel_cost_df_[
# # 'cost_scenario_list'].str.lower()
# # # bring all units to $/MW
# # nrel_cost_df_.loc[nrel_cost_df_.units == '$/KW-yr', ['cost', 'units']] = 1000 * \
# # nrel_cost_df_.loc[nrel_cost_df_.units ==
# # '$/KW-yr']['cost'].values[0], '$/MW'
# # nrel_cost_df_.loc[nrel_cost_df_.units == '$/kW', ['cost', 'units']] = 1000 * \
# # nrel_cost_df_.loc[nrel_cost_df_.units ==
# # '$/kW']['cost'].values[0], '$/MW'
# # nrel_cost_df_.loc[nrel_cost_df_.units == '$/MWh', ['cost', 'units']] = 8760 * \
# # nrel_cost_df_.loc[nrel_cost_df_.units ==
# # '$/MWh']['cost'].values[0], '$/MW'
# # # nrel_cost_df_['cost'][nrel_cost_df_['units']
# # # == '$/KW-yr'] = nrel_cost_df_['cost'][nrel_cost_df_['units'] == '$/KW-yr']*1000
# # # nrel_cost_df_['units'][nrel_cost_df_['units'] == '$/KW-yr'] = '$/MW'
# # # nrel_cost_df_['cost'][nrel_cost_df_['units']
# # # == '$/kW'] = nrel_cost_df_['cost'][nrel_cost_df_['units'] == '$/kW']*1000
# # # nrel_cost_df_['units'][nrel_cost_df_['units'] == '$/kW'] = '$/MW'
# # # nrel_cost_df_['cost'][nrel_cost_df_['units']
# # # == '$/MWh'] = nrel_cost_df_['cost'][nrel_cost_df_['units'] == '$/MWh']*8760
# # # nrel_cost_df_['units'][nrel_cost_df_['units'] == '$/MWh'] = '$/MW'
# # return nrel_cost_df_
# def load_results(filename: str) -> Result:
# """loads saved results
# Args:
# filename (str): file name
# Returns:
# Result: a energiapy result type object
# """
# file_ = open(filename, 'rb')
# results_dict = pickle.load(file_)
# if results_dict['output']['termination'] != 'optimal':
# print('WARNING: Loading non-optimal results')
# return Result(
# name=filename.split('.')[0],
# output=results_dict['output'],
# components=results_dict['components'],
# duals=results_dict['duals'],
# model_elements=results_dict['model_elements'],
# )
# def calculate_hourly(
# data: pandas.DataFrame, column_name: str, what: str = 'mean'
# ) -> pandas.DataFrame:
# """Finds the mean, min, max for each hour of the year for multi-year data
# Args:
# data (pandas.DataFrame): Timeseries data with datetime index
# column_name (str): name of value column
# what (str, optional): 'mean', 'max', 'min'. Defaults to 'mean'.
# Returns:
# pandas.DataFrame: Output
# """
# results = []
# data_input = copy.deepcopy(data)
# data_input['datetime'] = data_input.index
# data_input['month'] = data_input['datetime'].dt.month
# data_input['day'] = data_input['datetime'].dt.day
# data_input['hour'] = data_input['datetime'].dt.hour
# for month in range(1, 13): # 12 months
# for day in range(1, 32): # 31 days (adjust if needed)
# for hour in range(24): # 24 hours
# target_data_input = data_input[
# (data_input['month'] == month)
# & (data_input['day'] == day)
# & (data_input['hour'] == hour)
# ]
# if not target_data_input.empty:
# if what == 'max':
# max_value = target_data_input[column_name].max()
# results.append(
# {
# 'Date': f"{month}/{day}",
# 'Hour': hour,
# 'Highest Value': max_value,
# }
# )
# if what == 'min':
# min_value = target_data_input[column_name].min()
# results.append(
# {
# 'Date': f"{month}/{day}",
# 'Hour': hour,
# 'Highest Value': min_value,
# }
# )
# if what == 'mean':
# mean_value = target_data_input[column_name].mean()
# results.append(
# {
# 'Date': f"{month}/{day}",
# 'Hour': hour,
# 'Highest Value': mean_value,
# }
# )
# results = pandas.DataFrame(results)
# results = results.drop(columns=['Date', 'Hour'])
# return results