# Evaluating Optimization Algorithms for Ornstein-Uhlenbeck with Synthetic Data: A Comparison of Accuracy and Speed

In the realm of algorithm analysis, it is easy to become preoccupied with testing on actual datasets where variables remain unknown. However, to assess algorithms for speed and accuracy, one can generate known synthetic data and evaluate how closely the algorithms approximate the actual values and how quickly the calculations are performed. This approach serves as a litmus test for algorithms before implementation in production, and also aids in identifying syntax errors or issues in the underlying mathematical models.

1. Generating Synthetic Data:

The synthetic data for this experiment is generated using the Ornstein-Uhlenbeck (OU) process, a mean-reverting stochastic process frequently employed in finance and other fields. The following code demonstrates how to generate synthetic data using the OU process:

``````import numpy as np

np.random.seed(42)

# Synthetic data generation
mu_true = 109
theta_true = 22
sigma_true = 23
dt = 0.01
N = 10000

X = np.zeros(N)
X[0] = mu_true
noise = np.random.normal(0, sigma_true * np.sqrt(dt), N - 1)

for i in range(1, N):
X[i] = X[i - 1] * np.exp(-theta_true * dt) + mu_true * (1 - np.exp(-theta_true * dt)) + noise[i - 1]
``````
1. Evaluating Optimization Algorithms:

In this experiment, we will compare the performance of different optimization algorithms on the synthetic dataset generated above. The algorithms under evaluation include:

• Maximum Likelihood Estimation (MLE)
• Least Squares Estimation (LSE)
• Bayesian Estimation
• Method of Moments (MOM)
• Kalman Filter Estimation
1. Defining the Objective Function:

An objective function is required to evaluate the accuracy of the optimization algorithms. In this case, we use the negative log-likelihood function, which is commonly employed for maximum likelihood estimation (MLE).

1. Implementing the Experiment:

To compare the accuracy and speed of the optimization algorithms, we will run each algorithm on the synthetic dataset and measure their performance. We will use the `scipy.optimize.minimize` function from the SciPy library to implement the algorithms.

1. Analyzing the Results:

We will analyze the results by comparing the estimated parameters and the true parameters of the synthetic data. We will also compare the speed of each algorithm by measuring the time taken to converge to the optimal solution.

1. Key Insights and Recommendations:

Based on the comparison, we will discuss the key insights and recommendations for selecting the most suitable optimization algorithm for a particular problem.

Conclusion:

By using synthetic data, we can effectively evaluate the performance of different optimization algorithms in terms of accuracy and speed. This analysis can help data scientists and researchers make informed decisions when choosing an optimization algorithm for their specific problem. Furthermore, synthetic data can be a valuable tool for improving existing algorithms or developing new ones.

Here is my complete code with examples.

This code is not meant to be conclusive but rather to show an example. For instance, many of these algorithms need parameter adjustments to be more accurate or beneficial in any way.

``````import os
import sys
import pandas as pd
import numpy as np
from scipy.stats import norm
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
import plotly.graph_objs as go
import plotly.offline as pyo
from plotly.subplots import make_subplots
import pickle
from tqdm import tqdm
from typing import Union, List
from utils import filtered_df
from scipy.optimize import minimize, fmin
from statsmodels.base.model import LikelihoodModel, GenericLikelihoodModel

parent_dir = os.path.abspath(os.path.join(os.getcwd(), '..'))
data_dir = os.path.join(parent_dir, 'data/')
sys.path.append(data_dir)

def absolute_relative_difference_errors(params, mu, observations, dt):
theta, sigma = params  # Remove mu from here
y_true, y_pred = np.array(observations), np.zeros(len(observations))
y_pred[0] = y_true[0]

random_noise = np.random.normal(0, np.sqrt(dt), size=len(observations) - 1)
y_pred[1:] = y_true[:-1] + theta * (mu - y_true[:-1]) * dt + sigma * random_noise

abs_diff_y_true = np.abs(np.diff(y_true, prepend=y_true[0]))
abs_y_error = np.abs(y_true - y_pred)

# Safely divide, avoiding division by zero
relative_difference_errors = np.divide(abs_y_error, abs_diff_y_true, where=(abs_diff_y_true != 0))
return np.sum(relative_difference_errors)

def optimize_mean_absolute_relative_difference_error(params, observations, dt):
theta, mu, sigma = params
n = len(observations)
ou_predictions = np.zeros(n)
ou_predictions[0] = observations[0]

random_noise = np.random.normal(0, np.sqrt(dt), size=n - 1)
ou_predictions[1:] = observations[:-1] + theta * (mu - observations[:-1]) * dt + sigma * random_noise

errors = absolute_relative_difference_errors(observations, ou_predictions)
return np.sum(errors)

# Update optimization functions to pass mu as an additional argument
def optimize_nelder_mead(observations, dt, theta_init, mu, sigma_init):
initial_guess = [theta_init, sigma_init]  # Remove mu from here
result = minimize(absolute_relative_difference_errors, initial_guess, args=(mu, observations, dt), method='Nelder-Mead')
return result.x

def optimize_bfgs(observations, dt, theta_init, mu, sigma_init):
initial_guess = [theta_init, sigma_init]  # Remove mu from here
result = minimize(absolute_relative_difference_errors, initial_guess, args=(mu, observations, dt), method='BFGS')
return result.x

def optimize_powell(observations, dt, theta_init, mu, sigma_init):
initial_guess = [theta_init, sigma_init]  # Remove mu from here
result = minimize(absolute_relative_difference_errors, initial_guess, args=(mu, observations, dt), method='Powell')
return result.x

def optimize_lbfgsb(observations, dt, theta_init, mu, sigma_init):
initial_guess = [theta_init, sigma_init]  # Remove mu from here
result = minimize(absolute_relative_difference_errors, initial_guess, args=(mu, observations, dt), method='L-BFGS-B')
return result.x

def optimize_tnc(observations, dt, theta_init, mu, sigma_init):
initial_guess = [theta_init, sigma_init]  # Remove mu from here
result = minimize(absolute_relative_difference_errors, initial_guess, args=(mu, observations, dt), method='TNC')
return result.x

def mean_squared_error(y_true, y_pred):
return np.mean((y_true - y_pred) ** 2)

def mean_absolute_error(y_true, y_pred):
return np.mean(np.abs(y_true - y_pred))

def mean_absolute_percentage_error(y_true, y_pred):
y_true, y_pred = np.array(y_true), np.array(y_pred)
return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

def mean_absolute_relative_difference_error(y_true: Union[List[float], np.ndarray], y_pred: Union[List[float], np.ndarray]) -> float:
"""
Calculate the mean absolute relative difference error for the given true and predicted values.

:param y_true: List or numpy array of true values
:param y_pred: List or numpy array of predicted values
:return: Mean absolute volatility error (in percentage)
"""
# Convert input lists to numpy arrays
y_true, y_pred = np.array(y_true), np.array(y_pred)

# Check if there are less than two elements in y_true
if len(y_true) < 2:
return np.nan

# Calculate the absolute differences between consecutive elements in y_true
if len(y_true) == 2:
abs_diff_y_true = np.abs(y_true[1] - y_true[0])
else:
abs_diff_y_true = np.abs(np.diff(y_true))
# Insert the first difference value at the beginning of the array
abs_diff_y_true = np.insert(abs_diff_y_true, 0, abs_diff_y_true[0])

# Replace any zero values with a small value (epsilon) to prevent division by zero
abs_diff_y_true = np.where(abs_diff_y_true == 0, np.finfo(float).eps, abs_diff_y_true)

# Calculate the absolute differences between y_true and y_predict
abs_y_error = np.abs(y_true - y_pred)

# Calculate mean absolute volatility error (in percentage)
return np.mean(abs_y_error / abs_diff_y_true)

def compute_direction_accuracy(y_true, y_pred):
direction_actual = np.sign(np.array(y_true[1:]) - np.array(y_true[:-1]))
direction_predicted = np.sign(np.array(y_pred[1:]) - np.array(y_pred[:-1]))
return np.mean(direction_actual == direction_predicted)

import numpy as np
import pandas as pd
from scipy.optimize import minimize
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import plotly.graph_objs as go
from plotly.subplots import make_subplots
import plotly.offline as pyo
import os

def plot_best_method_chart(symbol, method_name, metrics, data, predictions, rolling_mse, rolling_rmse, rolling_mae, rolling_marde, rolling_mape, rolling_r2, rolling_dir, best_theta, best_mu, best_sigma):
fig = make_subplots(rows=4, cols=1, shared_xaxes=True,
subplot_titles=("Original and Predicted Values", "Difference between Actual and Predicted",
"Metrics", "R2"),
vertical_spacing=0.025,
row_heights=[0.5, 0.1, 0.2, 0.2],
)

fig.add_trace(go.Scatter(x=data.index[252:], y=data[252:], mode="lines", name="Original"), row=1, col=1)
row=1, col=1)

differences = data[252:] - predictions[252:]
fig.add_trace(go.Scatter(x=differences.index, y=differences, mode="lines", name="Difference"), row=2, col=1)

fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_mse, mode="lines", name="MSE"), row=3,
col=1)
fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_rmse, mode="lines", name="RMSE"), row=3,
col=1)
fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_mae, mode="lines", name="MAE"), row=3,
col=1)
fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_marde, mode="lines", name="MARDE"), row=3,
col=1)
fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_mape, mode="lines", name="MAPE"), row=3,
col=1)
fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_dir, mode="lines", name="Direction Accuracy"), row=3, col=1)
fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_r2, mode="lines", name="R2"), row=4, col=1)

fig.add_annotation(xref="paper", yref="paper", xanchor='left', yanchor='top', x=0.0, y=1,
text=f"<b>{method_name} Metrics</b><br>"
f"MSE: {metrics[method_name]['MSE']:.4f}<br>"
f"RMSE: {metrics[method_name]['RMSE']:.4f}<br>"
f"MAE: {metrics[method_name]['MAE']:.4f}<br>"
f"MARDE: {metrics[method_name]['MARDE']:.4f}<br>"
f"MAPE: {metrics[method_name]['MAPE']:.4f}<br>"
f"R-squared: {metrics[method_name]['R2']:.4f}<br>"
f"Direction Accuracy: {metrics[method_name]['Direction Accuracy']:.4f}<br>"
f"Best theta: {best_theta:.4f}<br>"
f"Best mu: {best_mu:.4f}<br>"
f"Best sigma: {best_sigma:.4f}",
font=dict(size=12), align="center", showarrow=False)

fig.update_layout(
title=f"{symbol} and MARDE: {metrics[method_name]['MARDE']:.4f}",
title_x=0.5,
title_y=0.95,
title_xanchor="center",
title_yanchor="top",
)

if not os.path.exists("plots"):
os.makedirs("plots")

plot_file = f"plots/{symbol}_{method_name}_marde{metrics[method_name]['MARDE']:.4f}_theta{best_theta:.4f}_mu{best_mu:.4f}_sigma{best_sigma:.4f}.html"
pyo.plot(fig, filename=plot_file, auto_open=False)

print(f"Plot saved as {plot_file}")

def save_analysis_to_dataframe(symbol, method_name, metrics, theta, mu, sigma, filename="ornstein_uhlenbeck_pairs_analysis"):
# Create a DataFrame with the specified index and columns
columns = ["Theta", "Mu", "Sigma", "Best Method", "MSE", "RMSE", "MAE", "MAPE", "MARDE", "R2"]
analysis_df = pd.DataFrame(index=[symbol], columns=columns)

# Add the data to the DataFrame
analysis_df.loc[symbol] = [theta, mu, sigma, method_name, metrics[method_name]['MSE'],
metrics[method_name]['RMSE'], metrics[method_name]['MAE'],
metrics[method_name]['MARDE'], metrics[method_name]['MARDE'],
metrics[method_name]['R2'], metrics[method_name]['Direction Accuracy']]

# Save the DataFrame to disk using the pickle library
with open(filename + ".pkl", "wb") as file:
pickle.dump(analysis_df, file)

print(f"Analysis saved as {filename}.pkl")

def ornstein_uhlenbeck(filename='cointegrated_etf_pairs_historical_ratios.pkl', make_stationary=False, plot_all_charts=True, output_file="ornstein_uhlenbeck_pairs_analysis.pkl"):
# Load the DataFrame from the specified pickle file
file_path = os.path.join(data_dir, filename)
with open(file_path, 'rb') as file:

# Create a DataFrame to store the analysis for all symbols
columns = ["Theta", "Mu", "Sigma", "Best Method", "MSE", "RMSE", "MAE", "MARDE", "MAPE", "R2", "Direction Accuracy"]
all_symbols_analysis = pd.DataFrame(columns=columns)

# Iterate through each column in the DataFrame
for symbol in tqdm(df.columns, desc="Processing symbols", leave=False):
# Select the symbol column and drop NaNs
data = df[symbol].dropna()
original_first_value = data.iloc[0]  # Save the first value before differencing

# Apply first-differencing if make_stationary is True
if make_stationary:
data = data.diff().dropna()

# Calculate dt
index_diff = data.index[-1] - data.index[-2]
dt = index_diff.days + index_diff.seconds / (60*60*24)
# dt = dt/365 # for calendar
dt = dt/252 # for trading days

# Create a series to store predictions
predictions = pd.Series(index=data.index, dtype='float64')

best_method = None
best_params = None
best_rmse = float('inf')

optimization_methods = [
('BFGS', optimize_bfgs),
('L-BFGS-B', optimize_lbfgsb),
('Powell', optimize_powell),
('TNC', optimize_tnc),
]
metrics = {}
best_parameters = {}
rolling_metrics = {}
method_predictions = {}

for method_name, optimization_method in tqdm(optimization_methods, desc="Optimizing parameters", leave=False):
all_predictions = []
all_actuals = []

theta_init = 2
sigma_init = 2.9

last_theta = None
last_mu = None
last_sigma = None

rolling_mse = []
rolling_rmse = []
rolling_mae = []
rolling_mape = []
rolling_marde = []
rolling_r2 = []
rolling_dir_acc = []
rolling_mu = []

for i in range(252, len(data) - 1):
if last_theta is None:
# First time running this optimization method, use initial values
theta_init, mu_init, sigma_init = theta_init, data[:i].mean(), sigma_init
else:
# Use last calculated values as initial guess
# theta_init, mu_init, sigma_init = last_theta, last_mu, last_sigma
#this method does not allow mu to be optimized
theta_init, mu_init, sigma_init = last_theta, data[:i].mean(), last_sigma

# Run optimization
theta, sigma = optimization_method(data[i - 252:i].values, dt, theta_init, mu_init,
sigma_init)  # Unpack two values
mu = data[i-252:i].mean()  # Calculate mu as the mean of the data up to the current index
rolling_mu.append(mu)
# Save the last calculated values
last_theta, last_mu, last_sigma = theta, mu, sigma

# print("All actuals:", all_actuals)
# print("All predictions:", all_predictions)
# Calculate prediction and save it to the predictions series
ou_prediction = data[i] + theta * (mu - data[i]) * dt + sigma * np.random.normal(0, np.sqrt(dt))
predictions[i + 1] = ou_prediction

# Save actual and predicted values for computing metrics later
all_predictions.append(ou_prediction)
all_actuals.append(data[i + 1])

# print("All actuals:", all_actuals)
# print("All predictions:", all_predictions)

# Calculate the rolling metrics
mse = mean_squared_error(np.array(all_actuals), np.array(all_predictions))
rmse = np.sqrt(mse)
mae = mean_absolute_error(np.array(all_actuals), np.array(all_predictions))
marde = mean_absolute_relative_difference_error(np.array(all_actuals), np.array(all_predictions))
mape = mean_absolute_percentage_error(np.array(all_actuals), np.array(all_predictions))
r2 = r2_score(np.array(all_actuals), np.array(all_predictions))
dir_acc = compute_direction_accuracy(np.array(data[i-252:i]), np.array(predictions[i-252:i]))

# Append the rolling metrics to their respective lists
rolling_mse.append(mse)
rolling_rmse.append(rmse)
rolling_mae.append(mae)
rolling_marde.append(marde)
rolling_mape.append(mape)
rolling_r2.append(r2)
rolling_dir_acc.append(dir_acc)

rolling_metrics[method_name] = {
'rolling_mse': rolling_mse,
'rolling_rmse': rolling_rmse,
'rolling_mae': rolling_mae,
'rolling_marde': rolling_marde,
'rolling_mape': rolling_mape,
'rolling_r2': rolling_r2,
'rolling_direction_accuracy': rolling_dir_acc,

}

method_predictions[method_name] = predictions.copy()

# Compute metrics for this optimization method
if len(all_actuals) > 0 and len(all_predictions) > 0:
mse = mean_squared_error(np.array(all_actuals), np.array(all_predictions))
rmse = np.sqrt(mse)
mae = mean_absolute_error(np.array(all_actuals), np.array(all_predictions))
marde = mean_absolute_relative_difference_error(np.array(all_actuals), np.array(all_predictions))
mape = mean_absolute_percentage_error(np.array(all_actuals), np.array(all_predictions))
r2 = r2_score(np.array(all_actuals), np.array(all_predictions))
direction_accuracy = compute_direction_accuracy(np.array(all_actuals), np.array(all_predictions))

metrics[method_name] = {'MSE': mse, 'RMSE': rmse, 'MAE': mae, 'MARDE': marde, 'MAPE': mape, 'R2': r2,
'Direction Accuracy': direction_accuracy}

best_parameters[method_name] = {'theta': last_theta, 'mu': last_mu, 'sigma': last_sigma}

print(f"Optimization method: {method_name}")
print(f"Mean Squared Error: {mse}")
print(f"Root Mean Squared Error: {rmse}")
print(f"Mean Absolute Error: {mae}")
print(f"Mean Absolute Volatility Error: {marde}%\n")
print(f"Mean Absolute Percentage Error: {mape}%\n")
print(f"R-squared: {r2}\n")
print(f"Direction Accuracy: {direction_accuracy}\n")

if plot_all_charts:
# Add the plotting code here
fig = make_subplots(rows=3, cols=1, shared_xaxes=True,
subplot_titles=(
"Original and Predicted Values", "Difference between Actual and Predicted",
"Metrics"),
vertical_spacing=0.05,
row_heights=[0.5, 0.5, 0.5],
)

col=1)
go.Scatter(x=predictions.index[252:], y=predictions[252:], mode="markers", name="Predicted"),
row=1, col=1)

go.Scatter(x=data.index[252:], y=rolling_mu, mode="markers", name="Mu"),
row=1, col=1)

differences = data[252:] - predictions[252:]
row=2, col=1)

fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_mse, mode="lines", name="MSE"),
row=3, col=1)
fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_rmse, mode="lines", name="RMSE"),
row=3, col=1)
fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_mae, mode="lines", name="MAE"),
row=3,col=1)
fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_marde, mode="lines", name="MARDE"),
row=3, col=1)
fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_mape, mode="lines", name="MAPE"),
row=3, col=1)
fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_r2, mode="lines", name="R2"),
row=3, col=1)
fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_dir_acc, mode="lines", name="Direction Accuracy"),
row=3, col=1)

fig.add_annotation(xref="paper", yref="paper", xanchor='left', yanchor='top', x=0.0, y=1,
text=f"<b>{method_name} Metrics</b><br>"
f"MSE: {metrics[method_name]['MSE']:.4f}<br>"
f"RMSE: {metrics[method_name]['RMSE']:.4f}<br>"
f"MAE: {metrics[method_name]['MAE']:.4f}<br>"
f"MARDE: {metrics[method_name]['MARDE']:.4f}<br>"
f"MAPE: {metrics[method_name]['MAPE']:.4f}<br>"
f"R-squared: {metrics[method_name]['R2']:.4f}<br>"
f"Direction Accuracy: {metrics[method_name]['Direction Accuracy']:.4f}",
font=dict(size=12), align="center", showarrow=False)

if not os.path.exists("plots"):
os.makedirs("plots")

plot_file = f"plots/{symbol}_ornstein_uhlenbeck_{method_name}.html"
pyo.plot(fig, filename=plot_file, auto_open=False)

print(f"Plot saved as {plot_file}")
else:
print("Skipping metric computation due to insufficient data.")

if metrics:
best_method = min(metrics, key=lambda x: metrics[x]['MAPE'])
else:
print("No metrics available for this symbol.")
continue
# best_method = max(metrics, key=lambda x: metrics[x]['R2'])

best_mse = metrics[best_method]['MSE']
best_rmse = metrics[best_method]['RMSE']
best_mae = metrics[best_method]['MAE']
best_marde = metrics[best_method]['MARDE']
best_mape = metrics[best_method]['MAPE']
best_r2 = metrics[best_method]['R2']
best_dir_acc = metrics[best_method]['Direction Accuracy']

best_theta = best_parameters[best_method]['theta']
best_mu = best_parameters[best_method]['mu']
best_sigma = best_parameters[best_method]['sigma']

print(f"Best optimization method: {best_method}")
print(f"Best values for theta: {best_theta}, mu: {best_mu}, sigma: {best_sigma}")
print(f"Mean Squared Error: {best_mse}")
print(f"Root Mean Squared Error: {best_rmse}")
print(f"Mean Absolute Error: {best_mae}")
print(f"Mean Absolute Relative Distance Error: {best_marde}")
print(f"Mean Absolute Percentage Error: {best_mape}")
print(f"R-squared: {best_r2}")
print(f"Direction Accuracy: {best_dir_acc}")

all_symbols_analysis.loc[symbol] = [best_theta, best_mu, best_sigma, best_method,
metrics[best_method]['MSE'],
metrics[best_method]['RMSE'], metrics[best_method]['MAE'],
metrics[best_method]['MARDE'], metrics[best_method]['MAPE'],
metrics[best_method]['R2'],
metrics[best_method]['Direction Accuracy']]

# Call the plot_best_method_chart function after determining the best_method
plot_best_method_chart(
symbol, best_method, metrics, data, method_predictions[best_method],
rolling_metrics[best_method]['rolling_mse'],
rolling_metrics[best_method]['rolling_rmse'],
rolling_metrics[best_method]['rolling_mae'],
rolling_metrics[best_method]['rolling_marde'],
rolling_metrics[best_method]['rolling_mape'],
rolling_metrics[best_method]['rolling_r2'],
rolling_metrics[best_method]['rolling_direction_accuracy'],
best_theta, best_mu, best_sigma
)

with open(output_file, "wb") as file:
pickle.dump(all_symbols_analysis, file)
# return best_method, best_theta, best_mu, best_sigma, best_mse, best_rmse, best_mae, best_marde, best_mape,best_r2

ornstein_uhlenbeck()

``````

Your final results will look something like this.

True values: μ = 109.00, θ = 22.00, σ = 23.00
MLE estimates: μ = 923764.61, θ = -4591680.43, σ = 69817.80
Least Squares: μ = 108.97, θ = 23.72, σ = 1.00
Method of Moments: μ = 108.98, θ = 23.70, σ = 19.33
Kalman Filter: μ = 0.00, θ = 2.00, σ = 2.42
Ensemble prediction: μ = 108.98, θ = 23.70, σ = 19.33
MLE estimates (Nelder-Mead): μ = 0.00, θ = -3255.78, σ = 18095.76
MLE estimates (Powell): μ = -0.51, θ = -104.01, σ = 16.22
MLE estimates (CG): μ = -31431.14, θ = -159.34, σ = 8154.10
MLE estimates (BFGS): μ = -59278.98, θ = -1326.24, σ = 19548.41
MLE estimates (L-BFGS-B): μ = 0.77, θ = 4.98, σ = 5.30

Process finished with exit code 0