Source code for robust

import numpy as np
import matplotlib.pyplot as plt
import scipy.stats as stats
from copy import copy
import cmath

[docs]class HAC_robust_conf_int: """ The base class for the implementation of the heteroskedasticity and autocorrelation consistent (HAC) robust confidence intervals for autocorrelation functions across lags for (covariance) stationary time series in Hwang and Vogelsang (2023). Our method provides valid inference for autocorrelation functions. It is pointed out that Bartlett formula on which the widely used statistical packages (Stata, SAS, R, Matlab and etc..) are relying for inference for autocorrelation function is no longer valid when the assumption of i.i.d innovations is relaxed. In contrast, our method implemented here is robust in three ways: innovations can be weak white noise (relaxation of the i.i.d innovations assumption), innovations can have asymmetric distributions, and inference does not require a specific model of serial correlation. We provide vaild confidence intervals and confidence bands for inference about autocorrelation function in this Python package. This class and following methods provide a graph of autocorrelogram with the estimated autocorrelation functions and the valid confidence intervals by Hwang and Vogelsang (2023) across multiple lags for your time series data so that empirical practitioners can easily obtain valid confidence intervals (and confidence bands) for empirical anaylsis of time series data. Parameters ---------- data : array-like The input time series data. lag : int The maximum lag to be considered for autocorrelation functions. null_imp : bool, default=True If true, the null imposed variance estimator is used for inference. If false, then it uses the null not imposed variance estimator. For null imposed and null not imposed estimators, see Lazarus, Lewis, Stock and Watson (2018). method : {"fixedb", "OS", "normal"}, default='fixedb' The asymptotics and the following critical values used for estimation. bandwidth : {"SPJ", "AD"}, default="SPJ" Specifies the data dependent bandwidth selection method used in the estimation. "SPJ" is a testing optimal data depdendent bandwidth selection method suggested in Sun, Phillips and Jin (2008). "AD" is a MSE optimal method by Andrews (1991). time_trend : bool, default=False If True, a time trend is considered in the estimating equation. To be able to obtain bandwidth by the data dependent selection methods, we use Frisch–Waugh–Lovell theorem to partial out the time trend and obtain the residuals for the bandwidth calculation, instead of full regression. If false, the usual regression is used for the estimating equation. diff : bool, default=False If True, the time series data is differenced and the differenced time series is used for autocorrelogram and output. alpha : float, default=0.05 Significance level for the confidence intervals and the confidence bands. Note ---- The provided options may need further refinement. """
[docs] def __init__(self, data, lag, null_imp=True, method='fixedb', bandwidth="SPJ", time_trend=False, diff=False, alpha=0.05): self.data = np.copy(np.asarray(data).reshape(-1, 1)) self.lag = lag self.null_imp = null_imp self.method = method self.bandwidth = bandwidth self.time_trend = time_trend self.diff = diff self.alpha = alpha self.rho_est_values = [] self.cv_values = [] self.var_unres_values = [] self._calculate_estimations()
def _calculate_estimations(self): if self.diff: self.data = np.diff(self.data, axis=0) k_values = range(1, self.lag+1) # 1 to lag k self.rho_est_values = [] if self.null_imp: self.lower_values = [] self.upper_values = [] self.index_values = [] self.cv_values = [] self.var_res_under_zero_values = [] for k in k_values: # calculate rho_est, M_n_band if self.time_trend: rho_est, w_t, res_y_t_k = regression_residuals(data=self.data, k=k) vhat_opt3 = np.copy(res_y_t_k*w_t) #vhat_opt3 = vhat_opt3 - vhat_opt3.mean(axis=0) else: y, x, nmp, X, coeffs = reg_estimating_equation_v1(data=self.data, lag=k) # assumed function name and arguments uhat_opt3 = y - np.dot(X, coeffs) # null is not imposed vhat_opt3 = np.copy(X * uhat_opt3) #vhat_opt3 = vhat_opt3 - vhat_opt3.mean(axis=0) rho_est = coeffs[1] # method and bandwidth processing if self.method == "OS": pass else: if self.bandwidth == "SPJ": # assuming bandwidth is an attribute spj_function = create_spj_function(w=10) # assumed function name and arguments M_n_spj = spj_function(vhat_opt3) M_n_band = M_n_spj elif self.bandwidth == "AD": M_n_AD = AD_band(vhat_opt3) # assumed function name and arguments M_n_band = M_n_AD # assumed variable name # Calculate Q2 Q_2 = compute_Q2(self.data, k) Tnum1 = len(self.data) - k # Calculate Omegas k_function_vectorized = parzen_k_function_vectorized omegas_res_dm = compute_omegas_vectorized_demean(self.data, M_n_band, k_function_vectorized, k) # fixed-b fixedb_coeffs = [0.43754, 0.11912, 0.08640, 0.49629, -0.57879, 0.43266, 0.02543, -0.02379, -0.02376] fixed_cv_res = calc_fixed_cv_org_alpha(fixed_b=M_n_band, Tnum=Tnum1, fixedb_coeffs=fixedb_coeffs, alpha=self.alpha) # Choose CV by condition if self.method == "fixedb": cv_real = fixed_cv_res elif self.method == "normal": cv_real = stats.norm.ppf(1 - self.alpha / 2) # calc roots c_coef_res_dm = compute_c_coefficients(omegas=omegas_res_dm, cv=cv_real, y=self.data, k_val=k, rho_k_til=rho_est, Q_2=Q_2) roots_res_dm = quadratic_roots(c_coef_res_dm[0], c_coef_res_dm[1], c_coef_res_dm[2]) roots = roots_res_dm c_coef = c_coef_res_dm is_first_root_complex = roots[0].imag != 0 is_second_root_complex = roots[1].imag != 0 if is_first_root_complex or is_second_root_complex: index = 2 if c_coef[0] < 0 else 3 if index == 3: raise ValueError("Open upward, vertex is above 0, where there is no such collection of null value rejecting null hypothesis. Impossible.") upper = 1 lower = -1 else: index = 0 if c_coef[0] > 0 else 1 lower = min(roots[0].real, roots[1].real) upper = max(roots[0].real, roots[1].real) #Truncate if greater than 1 #if np.float64(lower) < -1: # lower = -1 #if np.float64(upper) > 1: # upper = 1 # Appending results for this k value to their respective lists self.rho_est_values.append(np.float64(rho_est)) self.lower_values.append(np.float64(lower)) self.upper_values.append(np.float64(upper)) self.index_values.append(index) var_res_under_zero11 = self._calculate_var_res_under_zero(k) self.var_res_under_zero_values.append(np.float64(var_res_under_zero11)) self.cv_values.append(np.float64(cv_real)) else: self.cv_values = [] self.var_unres_values = [] for k in k_values: if self.time_trend: rho_est, w_t, res_y_t_k = regression_residuals(data=self.data, k=k) vhat_opt1 = np.copy(res_y_t_k * w_t) X = np.copy(res_y_t_k) else: y, x, nmp, X, coeffs = reg_estimating_equation_v1(data=self.data, lag=k) # assumed function name and arguments uhat_opt1 = y - np.dot(X, coeffs) # null is not imposed vhat_opt1 = np.copy(X * uhat_opt1) rho_est = coeffs[1] if self.method == "OS": pass else: if self.bandwidth == "SPJ": # assuming bandwidth is an attribute spj_function = create_spj_function(w=10) # assumed function name and arguments M_n_spj = spj_function(vhat_opt1) M_n_band = M_n_spj elif self.bandwidth == "AD": M_n_AD = AD_band(vhat_opt1) # assumed function name and arguments M_n_band = M_n_AD # assumed variable name fixedb_coeffs = [0.43754, 0.11912, 0.08640, 0.49629, -0.57879, 0.43266, 0.02543, -0.02379, -0.02376] fixed_cv, var_unres = process_var_fixedbcv_alpha(vhat=vhat_opt1, M_n=M_n_band, X=X, nmp=len(X), fixedb_coeffs=fixedb_coeffs, alpha=self.alpha) # assumed function name and arguments if self.time_trend: pass else: var_unres = np.copy(var_unres[1,1]) if self.method == "fixedb": cv_real = fixed_cv elif self.method == "normal": cv_real = stats.norm.ppf(1 - self.alpha / 2) self.rho_est_values.append(np.float64(rho_est)) self.cv_values.append(np.float64(cv_real)) # updated cv to cv_real self.var_unres_values.append(np.float64(var_unres)) #def _calculate_var_res_under_zero(self, k): # """ # Calculates the variance estimator under null imposed for a given lag. # # Parameters: # - k (int): The lag order for the regression estimating equation. # # Returns: # - var_res_under_zero11 (float): The element at index (1,1) of the variance estimates under null imposed. # """ # y_z, x_z, nmp_z, X_z, coeffs_z = reg_estimating_equation_v1(data=self.data, lag=k) # null_val = 0 # uhat_z = (y_z - np.mean(y_z)) - null_val*(x_z - np.mean(x_z)) # vhat_z = np.copy(X_z*uhat_z) # vhat_z = vhat_z - vhat_z.mean(axis=0) # Stock Watson demeaning # # if self.bandwidth == "SPJ": # spj_function = create_spj_function(w=10) # M_n_z = spj_function(vhat_z) # elif self.bandwidth == "AD": # M_n_z = AD_band(vhat_z) # fixedb_coeffs = [0.43754, 0.11912, 0.08640, 0.49629, -0.57879, 0.43266, 0.02543, -0.02379, -0.02376] # _, var_res_under_zero = process_var_fixedbcv_alpha(vhat_z, M_n_z, X_z, len(X_z), fixedb_coeffs, self.alpha) # var_res_under_zero11 = var_res_under_zero[1,1] # return var_res_under_zero11 def _calculate_var_res_under_zero(self, k): """ Calculates the variance estimator under null imposed for a given lag. Parameters: - k (int): The lag order for the regression estimating equation. Returns: - var_res_under_zero11 (float): The element at index (1,1) of the variance estimates under null imposed. """ y_band, x_band, nmp_band, X_band, coeffs_band = reg_estimating_equation_v1(data=self.data, lag=k) # assumed function name and arguments uhat_opt_band = y_band - np.dot(X_band, coeffs_band) # null is not imposed vhat_opt_band = np.copy(X_band * uhat_opt_band) y_z, x_z, nmp_z, X_z, coeffs_z = reg_estimating_equation_v1(data=self.data, lag=k) null_val = 0 uhat_z = (y_z - np.mean(y_z)) - null_val*(x_z - np.mean(x_z)) vhat_z = np.copy(X_z*uhat_z) vhat_z = vhat_z - vhat_z.mean(axis=0) # Stock Watson demeaning if self.bandwidth == "SPJ": spj_function = create_spj_function(w=10) M_n_z = spj_function(vhat_opt_band) elif self.bandwidth == "AD": M_n_z = AD_band(vhat_opt_band) fixedb_coeffs = [0.43754, 0.11912, 0.08640, 0.49629, -0.57879, 0.43266, 0.02543, -0.02379, -0.02376] _, var_res_under_zero = process_var_fixedbcv_alpha(vhat_z, M_n_z, X_z, len(X_z), fixedb_coeffs, self.alpha) var_res_under_zero11 = var_res_under_zero[1,1] return var_res_under_zero11
[docs] def plot_acf(self, CI_HAC=True, CB_HAC=False, CB_stata=False, CB_bart=False, title="Autocorrelogram with HAC robust CI", save_as_pdf=False, filename="autocorrelogram.pdf"): """ Plot the autocorrelation functions (ACF). Parameters ---------- CI_HAC : bool, default=True Whether to plot HAC robust confidence intervals. CB_HAC : bool, default=False Whether to plot HAC robust confidence bands. CB_stata : bool, default=False Whether to plot the Stata confidence bands. CB_bart : bool, default=False Whether to plot the Bartlett confidence bands. title : str, default='Autocorrelogram with HAC robust CI' Title for the plot. save_as_pdf : bool, default=False If True, save the plot as a PDF, otherwise display it. filename : str, default='autocorrelogram.pdf' Filename for saving the plot if save_as_pdf is True. Returns ------- fig : matplotlib.figure.Figure The Matplotlib figure object containing the ACF plot. Notes ----- This method plots autocorrelogram with confidence intervals and confidence bands """ if self.null_imp: self._plot_acf_null_imp(CI_HAC, CB_HAC, CB_stata, CB_bart, title, save_as_pdf, filename) else: self._plot_acf_null_not_imp(CI_HAC, CB_HAC, CB_stata, CB_bart, title, save_as_pdf, filename)
def _stacked_autocorrelation(self, k): data = np.copy(self.data) if self.time_trend: t = np.arange(1, len(data) + 1).reshape(-1, 1) residuals_data = partial_regression(np.copy(data), t, coef=False, cons=True) data = np.copy(residuals_data) data = data.flatten() # Reshape the data to 1D if it's 2D n = len(data) mean_val = np.mean(data) acf_values = [] for lag in range(1, k + 1): # Lag 0 to k numerator = np.sum((data[lag:] - mean_val) * (data[:-lag] - mean_val)) denominator = np.sum((data - mean_val) ** 2) acf = numerator / denominator acf_values.append(acf) return acf_values def _calculate_var_stata(self): var_stata = [] n = len(self.data) for k in range(1, len(self.rho_est_values) + 1): if k == 1: var_stata.append(1 / n) else: var_stata.append((1 + 2 * sum([rho**2 for rho in self.rho_est_values[:k-1]])) / n) return var_stata def _calculate_var_stata_ori(self): var_stata = [] n = len(self.data) stacked_acf = self._stacked_autocorrelation(len(self.rho_est_values)) for k in range(1, len(self.rho_est_values) + 1): if k == 1: var_stata.append(1 / n) else: var_stata.append((1 + 2 * sum([rho**2 for rho in stacked_acf[:k-1]])) / n) return var_stata def _plot_acf_null_imp(self, CI_HAC, CB_HAC, CB_stata, CB_bart, title, save_as_pdf, filename): lags = range(1, len(self.rho_est_values) + 1) fig, ax = plt.subplots(figsize=(10, 6)) #ax.vlines(lags, [0], self.rho_est_values, colors='blue', lw=2) # blue bar ax.plot(lags, self.rho_est_values, 'o', color='blue', label = 'Estimated Autocorrelation') ax.axhline(0, color='black', lw=0.5) if CI_HAC and self.lower_values and self.upper_values and self.index_values: for lag, v, lb, ub, index in zip(lags, self.rho_est_values, self.lower_values, self.upper_values, self.index_values): lb = min(max(lb, -1),1) ub = max(min(ub, 1),-1) bracket_length = 0.2 if index == 0 or index == 2: ax.plot([lag, lag], [ub, lb], color='gray', lw=0.8) ax.plot([lag-bracket_length, lag+bracket_length], [ub, ub], color='black', lw=1.1) ax.plot([lag-bracket_length, lag+bracket_length], [lb, lb], color='black', lw=1.1, label='Confidence Interval - HAC robust' if lag == lags[0] else "") elif index == 1: ax.plot([lag, lag], [-1, lb], color='gray', lw=0.8) ax.plot([lag, lag], [ub, 1], color='gray', lw=0.8) ax.plot([lag-bracket_length, lag+bracket_length], [-1, -1], color='black', lw=1.1) ax.plot([lag-bracket_length, lag+bracket_length], [1, 1], color='black', lw=1.1) ax.plot([lag-bracket_length, lag+bracket_length], [lb, lb], color='black', lw=1.1) ax.plot([lag-bracket_length, lag+bracket_length], [ub, ub], color='black', lw=1.1, label='Confidence Interval - HAC robust' if lag == lags[0] else "") elif index == 3: raise ValueError("Open upward, vertex is above 0. No valid collection of null values rejecting the null hypothesis. Impossible.") if CB_HAC and self.cv_values and self.var_res_under_zero_values: upper_bounds = [min(1.00, 0 + fc * np.sqrt(vu)) for fc, vu in zip(self.cv_values, self.var_res_under_zero_values)] lower_bounds = [max(-1.00, 0 - fc * np.sqrt(vu)) for fc, vu in zip(self.cv_values, self.var_res_under_zero_values)] ax.plot(lags, upper_bounds, color='green', linestyle='dashed', lw=0.99) ax.plot(lags, lower_bounds, color='green', linestyle='dashed', lw=0.99, label='Confidence Band - HAC robust') if CB_stata: ax.plot(lags, self._stacked_autocorrelation(len(self.rho_est_values)), 'o', color='red', markerfacecolor="None", label='Sample Autocorrelation') var_stata = self._calculate_var_stata_ori() upper_bound = [0 + stats.norm.ppf(1 - self.alpha / 2) * np.sqrt(var) for var in var_stata] lower_bound = [0 - stats.norm.ppf(1 - self.alpha / 2) * np.sqrt(var) for var in var_stata] ax.fill_between(lags, lower_bound, upper_bound, color='grey', alpha=0.2, label='Confidence Band - Stata Formula') if CB_bart and len(self.data): if not CB_stata: ax.plot(lags, self._stacked_autocorrelation(len(self.rho_est_values)), 'o', color='red', markerfacecolor="None", label='Sample Autocorrelation') conf_value = 1.96 / np.sqrt(len(self.data)) ax.axhline(conf_value, color='red', linestyle='dashed', lw=0.8) ax.axhline(-conf_value, color='red', linestyle='dashed', lw=0.8, label='Confidence Band - Bartlett Formula') ax.set_xlabel('$k$ (Lag)') ax.set_ylabel('Estimated Autocorrelation') ax.set_title(title) ax.legend(fontsize=8, loc='upper right', framealpha=0.25) ax.set_xlim([0, len(self.rho_est_values) + 1]) ax.set_ylim([-1.1, 1.1]) if save_as_pdf: plt.savefig(filename, format='pdf') else: plt.show() def _plot_acf_null_not_imp(self, CI_HAC, CB_HAC, CB_stata, CB_bart, title, save_as_pdf, filename): lags = range(1, len(self.rho_est_values) + 1) fig, ax = plt.subplots(figsize=(10, 6)) #ax.vlines(lags, [0], self.rho_est_values, colors='blue', lw=2) # blue bar ax.plot(lags, self.rho_est_values, 'o', color='blue', label = 'Estimated Autocorrelation') ax.axhline(0, color='black', lw=0.5) if CI_HAC and self.cv_values and self.var_unres_values: for lag, v, fc, vu in zip(lags, self.rho_est_values, self.cv_values, self.var_unres_values): upper_bound = min(1.00, v + fc * np.sqrt(vu)) lower_bound = max(-1.00, v - fc * np.sqrt(vu)) # Plotting the brackets ax.plot([lag, lag], [upper_bound, v], color='gray', lw=0.8) ax.plot([lag, lag], [lower_bound, v], color='gray', lw=0.8) # Plotting the horizontal bar at the tip of brackets bracket_length = 0.2 ax.plot([lag-bracket_length, lag+bracket_length], [upper_bound, upper_bound], color='black', lw=1.1) ax.plot([lag-bracket_length, lag+bracket_length], [lower_bound, lower_bound], color='black', lw=1.1, label='Confidence Interval - HAC robust' if lag == lags[0] else "") #if CB_HAC and self.cv_values and self.var_unres_values: # upper_bounds = [min(1.00, 0 + fc * np.sqrt(vu)) for fc, vu in zip(self.cv_values, self.var_unres_values)] # lower_bounds = [max(-1.00, 0 - fc * np.sqrt(vu)) for fc, vu in zip(self.cv_values, self.var_unres_values)] # ax.fill_between(lags, lower_bounds, upper_bounds, color='gray', alpha=0.2) if CB_HAC and self.cv_values and self.var_unres_values: upper_bounds = [min(1.00, 0 + fc * np.sqrt(vu)) for fc, vu in zip(self.cv_values, self.var_unres_values)] lower_bounds = [max(-1.00, 0 - fc * np.sqrt(vu)) for fc, vu in zip(self.cv_values, self.var_unres_values)] ax.plot(lags, upper_bounds, color='green', linestyle='dashed', lw=0.99) ax.plot(lags, lower_bounds, color='green', linestyle='dashed', lw=0.99, label='Confidence Band - HAC robust') if CB_stata: ax.plot(lags, self._stacked_autocorrelation(len(self.rho_est_values)), 'o', color='red', markerfacecolor="None", label='Sample Autocorrelation') var_stata = self._calculate_var_stata_ori() upper_bound = [0 + stats.norm.ppf(1 - self.alpha / 2) * np.sqrt(var) for var in var_stata] lower_bound = [0 - stats.norm.ppf(1 - self.alpha / 2) * np.sqrt(var) for var in var_stata] ax.fill_between(lags, lower_bound, upper_bound, color='grey', alpha=0.2, label='Confidence Band - Stata Formula') if CB_bart and len(self.data): if not CB_stata: ax.plot(lags, self._stacked_autocorrelation(len(self.rho_est_values)), 'o', color='red', markerfacecolor="None", label='Sample Autocorrelation') conf_value = 1.96 / np.sqrt(len(self.data)) ax.axhline(conf_value, color='red', linestyle='dashed', lw=0.8) ax.axhline(-conf_value, color='red', linestyle='dashed', lw=0.8, label='Confidence Band - Bartlett Formula') ax.set_xlabel('Lag') ax.set_ylabel('Estimated Autocorrelation') ax.set_title(title) ax.legend(fontsize=8, loc='upper right', framealpha=0.25) ax.set_xlim([0, len(self.rho_est_values) + 1]) ax.set_ylim([-1.1, 1.1]) if save_as_pdf: plt.savefig(filename, format='pdf') else: plt.show()
[docs] def get_estimated_acf(self): """ Retrieve the estimated autocorrelation function values by the estimating equation approach in Hwang and Vogelsang (2023). Returns ------- list of float List of estimated ACF values. """ return self.rho_est_values
[docs] def get_confidence_interval(self): """ Compute the confidence intervals for the ACF values by the estimating equation approach with the options pre-defined in the class. Returns ------- tuple of list of float Lower and upper confidence interval bounds for each ACF value. """ if self.null_imp: return self.lower_values, self.upper_values else: lower_bounds = [rho - cv * np.sqrt(var) for rho, cv, var in zip(self.rho_est_values, self.cv_values, self.var_unres_values)] upper_bounds = [rho + cv * np.sqrt(var) for rho, cv, var in zip(self.rho_est_values, self.cv_values, self.var_unres_values)] return lower_bounds, upper_bounds
[docs] def get_confidence_band(self): """ Compute the confidence bands around zero for the ACF values by the estimating equation approach with the options pre-defined in the class. Returns ------- tuple of list of float Lower and upper confidence band bounds for each ACF value. """ if self.null_imp: lower_bounds = [0 - cv * np.sqrt(var) for cv, var in zip(self.cv_values, self.var_res_under_zero_values)] upper_bounds = [0 + cv * np.sqrt(var) for cv, var in zip(self.cv_values, self.var_res_under_zero_values)] return lower_bounds, upper_bounds else: lower_bounds = [0 - cv * np.sqrt(var) for cv, var in zip(self.cv_values, self.var_unres_values)] upper_bounds = [0 + cv * np.sqrt(var) for cv, var in zip(self.cv_values, self.var_unres_values)] return lower_bounds, upper_bounds
[docs] def get_null_imp_index(self): """ Retrieve the index values (indicating the solution types of the quadractic equation across lags) if the null is imposed. Returns ------- list of int or None List of index values if null is imposed; otherwise, print a message and return None. """ if self.null_imp: return self.index_values else: print ("Only null imposed has index")
[docs] def get_cb_stata(self): """ Compute the confidence bands suggested by Stata for the sample ACF values. Returns ------- tuple of list of float Lower and upper confidence band bounds by Stata for each sample ACF value. """ var_stata = self._calculate_var_stata_ori() upper_bounds = [0 + stats.norm.ppf(1 - self.alpha / 2) * np.sqrt(var) for var in var_stata] lower_bounds = [0 - stats.norm.ppf(1 - self.alpha / 2) * np.sqrt(var) for var in var_stata] return lower_bounds, upper_bounds
[docs] def get_cv(self): """ Retrieve the critical values used for confidence interval calculations based on the estimating equation approach. Returns ------- list of float List of critical values. """ return self.cv_values
[docs] def get_sample_autocorrelation(self): """ Compute the sample autocorrelation for a given number of lags. Returns ------- list of float List of sample autocorrelation values. """ return self._stacked_autocorrelation(len(self.rho_est_values))
def calc_fixed_cv_org_alpha(fixed_b, Tnum, fixedb_coeffs, alpha): """ Compute fixed-b critical values for t-test Args: - fixed_b: value of bandwidth (i.e. M) - Tnum (int): length of time series, in this autocorrelation testing case, T-k - fixedb_coeffs: List of fixed-b cv coefficients [c1, c2, ..., c9] Returns: - Scalar, fixed-b critical value """ c = copy(fixedb_coeffs) # shallow copy for list prevent mistake n_cv = stats.norm.ppf(1 - alpha / 2) # eg) stats.norm.ppf(0.975) return n_cv + c[0] * (fixed_b / Tnum * n_cv) + c[1] * ((fixed_b / Tnum) * (n_cv ** 2)) + \ c[2] * ((fixed_b / Tnum) * (n_cv ** 3)) + \ c[3] * (((fixed_b / Tnum) ** 2) * n_cv) + c[4] * (((fixed_b / Tnum) ** 2) * (n_cv ** 2)) + \ c[5] * (((fixed_b / Tnum) ** 2) * (n_cv ** 3)) + \ c[6] * (((fixed_b / Tnum) ** 3) * (n_cv ** 1)) + c[7] * (((fixed_b / Tnum) ** 3) * (n_cv ** 2)) + \ c[8] * (((fixed_b / Tnum) ** 3) * (n_cv ** 3)) def process_var_fixedbcv_alpha(vhat, M_n, X, nmp, fixedb_coeffs,alpha): vhat = np.copy(vhat) X = np.copy(X) fixedb_coeffs = copy(fixedb_coeffs) Q_inv = np.linalg.inv(np.dot(X.T, X)) LRV = newLRV_vec(vhat, M_n) var = np.linalg.multi_dot([Q_inv, LRV, Q_inv]) * nmp fixed_cv = calc_fixed_cv_org_alpha(M_n, nmp, fixedb_coeffs,alpha) #t_stat = (coeffs1 - true_acf1) / np.sqrt(np.float64(var[1, 1])) return fixed_cv, var def regression_residuals(data, k): """ Compute the beta_2 coefficient and residuals w_t after regressing the residuals from partialling out the effects of t and intercept on y_t and y_t-k. Parameters: - data: ndarray Column vector containing the data series - k: int Lag value Returns: - beta_2: float Coefficient from regressing the residuals of y_t on y_t-k - w_t: ndarray Residuals from the regression """ data = np.copy(data) T_data = len(data) y_t = np.copy(data[k:T_data, :]) y_t_minus_k = np.copy(data[0:T_data - k, :]) t = np.arange(k + 1, T_data + 1).reshape(-1, 1) # Part 2: Partial out the effect of t and intercept on y_t and y_t-k residuals_y_t = partial_regression(y_t, t, coef=False, cons=True) residuals_y_t_minus_k = partial_regression(y_t_minus_k, t, coef=False, cons=True) # Part 3: Regress the residuals on each other without intercept beta_2, _ = partial_regression(residuals_y_t, residuals_y_t_minus_k, coef=True, cons=False) # w_t residuals from this regression w_t = residuals_y_t - beta_2 * residuals_y_t_minus_k return beta_2, w_t, residuals_y_t_minus_k def partial_regression(y, x, coef=False, cons=True): """ Perform a regression of y on x and optionally return the coefficients. Parameters: - y: ndarray Dependent variable - x: ndarray Independent variable - coef: bool (default: False) If True, return coefficients - cons: bool (default: True) If True, include intercept in regression Returns: - residuals: ndarray Residuals from the regression - gamma_1: float (optional) Coefficient from the regression (if coef=True) """ y = np.copy(y) x = np.copy(x) y = np.array(y).reshape(-1, 1) x = np.array(x).reshape(-1, 1) if cons: cons_array = np.ones((len(x), 1)) X = np.hstack((cons_array, x)) else: X = x coeffs = np.linalg.inv(X.transpose().dot(X)).dot(X.transpose()).dot(y) y_pred = X.dot(coeffs) residuals = y - y_pred if coef: return coeffs.flatten(), residuals else: return residuals def AD_band(vhat): vhat = np.copy(vhat) #vhat = np.copy(xuhat) if vhat.shape[1] == 1: pass elif vhat.shape[1] == 2: vhat = np.copy(np.reshape(vhat[:,1],(len(vhat[:,1]),1))) T = len(vhat) vhat_t = np.copy(vhat[1:]) vhat_tm1 = np.copy(vhat[:-1]) y2 = np.copy(vhat_t) X2 = np.copy(vhat_tm1) rho_hat = np.linalg.inv(X2.transpose().dot(X2)).dot(X2.transpose()).dot(y2) #without constant #print (rho_hat) #if (np.abs(rho_hat-1) < 0.001) & (rho_hat>=1): # rho_hat = 1.001 #elif (np.abs(rho_hat-1) < 0.001) & (rho_hat<1): # rho_hat = 0.999 #if np.abs(rho_hat) >= 0.97: # rho_hat = 0.97*np.sign(rho_hat) alpha_hat_2 = (4*(rho_hat)**2)/((1-rho_hat)**4) #univariate version CLP ST = 2.6614*(T*alpha_hat_2)**(0.2) #bandwidth ST = np.float64(ST) if ST > T: ST = np.copy(T) return ST def noncen_chisq(k,j,x): #k is df #j is non-centrality parameter pdf_v = 0.5*np.exp(-0.5*(x+j))*((x/j)**(k/4-0.5))*mpmath.besseli(0.5*k-1, np.sqrt(j*x), derivative=0) pdf_v2 = np.float64(pdf_v) return pdf_v2 def chisq_dfone(x): pdf_v = (np.exp(-x/2))/(np.sqrt(2*math.pi*x)) pdf_v2 = np.float64(pdf_v) return pdf_v2 def SPJ_band(vhat,w,z_a=1.96,delta=2,q=2,g=6,c=0.539): #for Parzen #z_a = 1.96 # w is tuning parameter vhat = np.copy(vhat) if vhat.shape[1] == 1: pass elif vhat.shape[1] == 2: vhat = np.copy(np.reshape(vhat[:,1],(len(vhat[:,1]),1))) T = len(vhat) vhat_t = np.copy(vhat[1:]) vhat_tm1 = np.copy(vhat[:-1]) y2 = np.copy(vhat_t) X2 = np.copy(vhat_tm1) rho_hat = np.linalg.inv(X2.transpose().dot(X2)).dot(X2.transpose()).dot(y2) #without constant #if np.abs(rho_hat) >= 0.97: # rho_hat = 0.97*np.sign(rho_hat) d_hat = (2*rho_hat)/(1-rho_hat)**2 d_hat = np.float64(d_hat) x_val = z_a**2 ### calculation of b_hat G_0 = chisq_dfone(x=x_val) G_d = noncen_chisq(k=1,j=delta**2,x=x_val) k_d = ((delta**2)/(2*x_val))*noncen_chisq(k=3,j=delta**2,x=x_val) if d_hat*(w*G_0 - G_d) > 0: b_hat = (((q*g*d_hat*(w*G_0 - G_d))/(c*x_val*k_d))**(1/3))*(T**(-2/3)) #print (b_hat) elif d_hat*(w*G_0 - G_d) <= 0: b_hat = np.log(T)/T else: raise Exception("Error") spj_band = b_hat * T spj_band = np.float64(spj_band) if spj_band > T: spj_band = np.copy(T) return spj_band def create_spj_function(w, z_a=1.96, delta=2, q=2, g=6, c=0.539): def spj_function(vhat): return SPJ_band(vhat, w=w, z_a=z_a, delta=delta, q=q, g=g, c=c) return spj_function def compute_Q2(y, k): """ Compute Q2 value. Args: - y (np.array): Input data array (time-series) - k (int): lag k; i.e. rho_k Returns: - float: Q2 value """ T = len(y) # Compute the means ybar_1_T_minus_k = np.mean(y[:T-k]) # Demeaning y values y_tilda_minus_k = y[:T-k] - ybar_1_T_minus_k # Compute Q2 Q2 = np.mean(y_tilda_minus_k**2) return Q2 def parzen_k_function_vectorized(x): """Vectorized Kernel Function.""" conditions = [ (abs(x) <= 0.5), (abs(x) > 0.5) & (abs(x) <= 1) ] outputs = [ 1 - 6 * x**2 + 6 * abs(x)**3, 2 * (1 - abs(x))**3 ] return np.select(conditions, outputs, default=0) def compute_omegas_vectorized_demean(y, M, k_function, k): y = y.reshape(-1, 1) T = y.shape[0] # Compute the means ybar_1_T_minus_k = np.mean(y[:T-k]) ybar_k_plus_1_T = np.mean(y[k:T]) # Demeaning y values and computing v1 and v2 y_tilda = y[k:] - ybar_k_plus_1_T y_tilda_minus_k = y[:T-k] - ybar_1_T_minus_k v1 = (y_tilda * y_tilda_minus_k).squeeze() v2 = (y_tilda_minus_k ** 2).squeeze() # Calculate the means of v1 and v2 mean_v1 = np.mean(v1) mean_v2 = np.mean(v2) # Demean v1 and v2 v1_demeaned = v1 - mean_v1 v2_demeaned = v2 - mean_v2 # Create the kernel matrix indices_matrix = np.abs(np.arange(k+1, T+1)[:, None] - np.arange(k+1, T+1)) K = k_function(indices_matrix / M) # Compute Omega values using demeaned v1 and v2 omega_11 = np.sum(v1_demeaned[:, None] * K * v1_demeaned[None, :]) / (T-k) omega_12 = np.sum(v1_demeaned[:, None] * K * v2_demeaned[None, :]) / (T-k) omega_22 = np.sum(v2_demeaned[:, None] * K * v2_demeaned[None, :]) / (T-k) return omega_11, omega_12, omega_22 def compute_c_coefficients(omegas, cv, y, k_val, rho_k_til, Q_2): """ Compute the coefficients c_0_nd, c_1_nd, and c_2_nd. Args: - omegas (tuple): Omega values (omegas_nd11, omegas_nd12, omegas_nd22) - cv (float): Value of cv - y (np.array): Input data array - k_val (int): Parameter k value - rho_k_til (float): Value of rho_k_til - Q_2 (float): Value of Q_2 Returns: - tuple: Coefficients (c_0_nd, c_1_nd, c_2_nd) """ omegas_nd11, omegas_nd12, omegas_nd22 = omegas T_minus_k = len(y) - k_val Q_2_inv_squared = Q_2 ** (-2) common_factor = (1 / T_minus_k) * Q_2_inv_squared * (cv ** 2) c_2_nd = 1 - common_factor * omegas_nd22 c_1_nd = common_factor * omegas_nd12 - rho_k_til c_0_nd = (rho_k_til ** 2) - common_factor * omegas_nd11 return c_2_nd, c_1_nd, c_0_nd def quadratic_roots(c2, c1, c0): """ Calculate the roots of the equation: c2*a^2 + 2*c1*a + c0 = 0 Args: - c2 (float): Coefficient of a^2 - 2*c1 (float): Coefficient of a - c0 (float): Constant term Returns: - tuple: Roots of the equation (root1, root2) """ # Calculating the discriminant D = cmath.sqrt((2*c1)**2 - 4*c2*c0) # Calculating the roots root1 = (-2*c1 + D) / (2*c2) root2 = (-2*c1 - D) / (2*c2) return root1, root2 def reg_estimating_equation_v1(data, lag): """ Estimating equtation for autocorrelation, the first option Parameters: - data (np.ndarray): The input data array. y_t from t=1 to T - p (lag) (int): The lag order estimating equation Returns: - y (np.ndarray): y_t from t=p+1 to T - x (np.ndarray): y_t-p - nmp (int): The length of x (T - p). - X (np.ndarray): T x 2 matrix with intercept and x - coeffs (np.ndarray): The estimated coefficients from the estimating equation """ data = np.copy(data) p = np.copy(lag) T_data = len(data) # T # Create y and x based on the lag order p y = np.copy(data[p:T_data, :]) x = np.copy(data[0:T_data - p, :]) # Calculate nmp (T - p) nmp = len(x) # Create a constant term for the regression cons = np.ones((nmp, 1)) # Create the design matrix X X = np.hstack((cons, x)) # Calculate the coefficients based on the ordinary least squares (OLS) formula coeffs = np.linalg.inv(X.transpose().dot(X)).dot(X.transpose()).dot(y) return y, x, nmp, X, coeffs def newLRV_vec(vhat, M_n): vhat = np.copy(vhat) n, k = vhat.shape LRV = autocov_est(vhat, 0) j_values = np.arange(1, n) parzen_values = Parzen_vec(j_values / M_n) autocov_values = np.array([autocov_est(vhat, j) for j in j_values]) for j, p_val in zip(j_values, parzen_values): LRV += p_val * (autocov_values[j-1] + autocov_values[j-1].T) return LRV def autocov_est(vhat,j): vhat = np.copy(vhat) if j >= 0: T = len(vhat) v_t = vhat[j:T] v_tm1 = vhat[0:T-j] gamma_hat_j = np.dot(v_t.T,v_tm1)/T return gamma_hat_j else: raise ValueError("j cannot be negative number") def Parzen_vec(x): x = np.copy(x) kx = np.zeros_like(x) mask1 = (0 <= np.abs(x)) & (np.abs(x) <= 0.5) mask2 = (0.5 < np.abs(x)) & (np.abs(x) <= 1) kx[mask1] = 1 - 6 * (x[mask1] ** 2) + 6 * (np.abs(x[mask1]) ** 3) kx[mask2] = 2 * ((1 - np.abs(x[mask2])) ** 3) return kx