"""
This module contains code for the demand_acep. More documentation to come.
This is a test docstring for the whole module
"""
# %% Imports
import os
import pandas as pd
import numpy as np
import pdb
import xarray as xr
# import matplotlib.pyplot as plt
from scipy.interpolate import interp1d
from scipy.interpolate import UnivariateSpline
from itertools import groupby
from operator import itemgetter
# %%
[docs]def data_resample(df, sample_time='1T'):
"""
This function downsamples a sample-time indexed pandas dataframe containing measurement channel values based
on the sample time supplied. It uses the mean of the values within the resolution interval. It uses the pandas
dataframe module `df.resample`
Parameters
----------
df :
`df` is a sample-time indexed pandas dataframe containing measurement values from the different channels of
each meter.
sample_time :
`sample_time` determines the desired resolution of the downsampled data. For 1 minute - 1T, 1 hour - 1H,
1 month - 1M, 1 Day - 1D etc. The default chosen here is 1 minute.
Returns
-------
Dataframe
Resampled-time indexed pandas dataframe containing downsampled measurement values from the given dataframe.
"""
# Data is downsampled using the mean of the values within the interval of the sample time provided.
# The mean is used because it provided the average/expected value of the measurement within that time range.
df_resampled = df.resample(sample_time, closed="left", label="right").mean()
return df_resampled
[docs]def build_interpolation(y_values, n_val):
"""
This function takes performs the actual 1-d interpolation. If the number of consecutive missing points is less
than 3, a linear interpolation is used, else, a cubic interpolation is used.
Parameters
----------
y_values :
`y_values` are the values on which the function interpolation is built, that is, y_values = f(x).
n_val :
`n_val` is the number of consecutive missing points that needs to be filled.
Returns
-------
y_interp
Array of interpolated values equal in length to the missing supplied length (n_val) of missing data points..
"""
# removes the NaN values so as not to skew the performance of the scipy interp1d function.
y_values = y_values.dropna()
x = np.linspace(1, len(y_values), num=len(y_values))
# x = np.reshape(x, y_values.shape)
y_values = np.asarray(y_values).squeeze()
# pdb.set_trace()
# if-else uses a linear interpolation when the number of consecutive missing data points is less than 3.
# The number of points to be interpolated has to be greater than 3 points to use the spline/Cubic interpolation
if len(y_values) <= 3:
f = interp1d(x, y_values, kind='linear')
else:
f = interp1d(x, y_values, kind='cubic')
x_interp = np.linspace(1, len(y_values), num=n_val)
y_interp = f(x_interp)
return y_interp
[docs]def compute_interpolation(df):
"""
This function imputes missing measurement data (Nan) in a series using 1-d interpolation.
Parameters
----------
df :
`df` is a series containing missing measurements values.
Returns
-------
Series
Filled pandas series with no missing values.
"""
# creates a deep copy of the Series received
test_df = df.copy()
# gets the index location in integers where the NaNs are located
get_nan_idx = np.where(test_df.isna())[0]
idx_grp_nan = []
# creates a list of consecutive index locations to determine the range of interpolation
for k, g in groupby(enumerate(get_nan_idx), lambda ix: ix[0] - ix[1]):
idx_grp_nan.append(list(map(itemgetter(1), g)))
# performs interpolation for each consecutive NaN index location
for idx, val in enumerate(idx_grp_nan):
n_grp = len(val)
# for each range of consecutive NaN locations, use data points of length equal to the number that NaNs in that
# range before and after the NaN data points
prev_idx = val[0] - n_grp
next_idx = val[-1] + n_grp
# This if-else clause handles edge cases
# If - When the number of consecutive NaN points is larger than the number of available data points before it in
# the dataframe.
# Elif - When the number of consecutive NaN points is larger than the number of available data points after it
# in the dataframe.
if prev_idx < 0:
prev_vals = test_df.iloc[0:val[0]]
next_vals = test_df.iloc[val[0] + 1: next_idx + 1]
elif next_idx > len(test_df):
prev_vals = test_df.iloc[prev_idx:val[0]]
next_vals = test_df.iloc[val[0] + 1: len(test_df)]
else:
prev_vals = test_df.iloc[prev_idx:val[0]]
next_vals = test_df.iloc[val[0] + 1: next_idx + 1]
y_values = prev_vals.append(next_vals)
y_interp = build_interpolation(y_values, n_grp)
test_df.iloc[val] = y_interp
return test_df
[docs]def data_impute(impute_df):
"""
This function imputes missing measurement in a dataframe using a 1-d interpolation. If the number of consecutive
missing points is less than 3, a linear interpolation is used, else, a cubic interpolation is used.
Parameters
----------
impute_df :
`impute_df` can either be a dataframe of a dictionary of dataframes containing missing measurements values.
Returns
-------
Dataframe
Filled pandas dataframe with no missing values.
"""
# if-else checks if the input is a dataframe or list of dataframes.
if isinstance(impute_df, dict):
for meter in impute_df:
if impute_df[meter].isnull().values.any():
impute_df[meter] = impute_df[meter].apply(compute_interpolation)
else:
if impute_df.isnull().values.any():
impute_df = impute_df.apply(compute_interpolation)
return impute_df
[docs]def long_missing_data_prep(dirpath, filename):
"""
This function prepares a dataset in a `csv` format with missing days, months or years for interpolation
using the `data_impute` function. It fills in the missing time as a 'DateTimeIndex' and assigns a value
of NaN to the missing data points.
Parameters
----------
dirpath :
`dirpath` is the directory path location of the csv file containing the missing data points in already
down-sampled to a 1-Minute interval.
filename :
`filename` is the csv file containing the missing data points to be read.
Returns
-------
Dataframe
pandas dataframe with 'DateTimeIndex' and value of NaN assigned to the missing data points.
"""
# Reads dataset into a Pandas Dataframe
meter_df = pd.read_csv(os.path.join(dirpath, filename))
# Converts the time column to a DateTimeIndex format
meter_df.set_index(pd.to_datetime(meter_df['time']), inplace=True)
st_time = meter_df['time'].iloc[0]
sp_time = meter_df['time'].iloc[-1]
# Generates a date time range at an interval of 1 minute from the start and end date of the dataset.
date_idx = pd.date_range(start=st_time, end=sp_time, freq='1T')
meter_df.drop(columns=['time'], inplace=True)
# Reindex the dataframe to include the DateTimeIndex for the missing days and NaN values for missing points
meter_mod_df = meter_df.reindex(date_idx)
return meter_mod_df