Automating the Management of Hundreds of TreasuryDirect.gov accounts

If you manage multiple accounts through TreasuryDirect.gov to purchase T-bills, T-notes, T-bonds, TIPS, FRNs, or C of I certificates then you know how antiquated and difficult it can be to manage all of your accounts at the United States Treasury. I’ve written a Python program to aggregate all of these accounts into a single summary Excel document. This program can be expanded to buy and sell securities. However, in its published state is simply aggregates all the data and returns it to a CSV file.

The program is written in Python and uses the Selenium WebDriver to interact with the TreasuryDirect.gov website. The program is designed to work with multiple accounts, and it reads the account information from a CSV file. It also has a feature to bypass the one-time password by logging in to your email and extracting the OTP. The program works by logging in to each account, checking the account details, and updating the account information as needed. It currently returns the following data:

Automating the login to each account

To do this, the program reads a CSV file containing all of your TreasuryDirect.gov account numbers and other relevant information, such as LLC names and purchase amounts. It then loops through each account, retrieves the necessary information from the TreasuryDirect.gov website using Selenium, and saves the data into a new column of the CSV file.

Here’s an example of the code that performs this task:

import pandas as pd
from treasury_direct import process_account

if __name__ == '__main__':
    df = pd.read_csv('accounts.csv')
    url = "https://www.treasurydirect.gov/RS/UN-Display.do"

    for index, row in df.iterrows():
        account_number = row['Treasury Direct']

        # Check if the row is already complete
        if not pd.isna(row['LLC Name']) and not pd.isna(row['Original Purchase Amount']) \
                and not pd.isna(row['Current Value']) and not pd.isna(row['Issue Date']) \
                and not pd.isna(row['Interest Rate']):
            continue

        success = process_account(account_number, df, index, url)

        if success:
            time.sleep(60)

    df.to_csv('accounts.csv', index=False)


Automatically retrieving one-time passcodes (OTPs) from emails

To log into each account, the program needs to retrieve OTPs from emails sent by TreasuryDirect.gov. It uses the Gmail API to search for unread emails from TreasuryDirect.gov containing the subject line “One Time Passcode” and retrieves the OTP from the email body. Once it has the OTP, the program enters it into the appropriate field on the TreasuryDirect.gov login page.

Here’s an example of the code that retrieves OTPs:

from gmail import get_otp

# Perform the one-time password process here
# Enter the OTP in the input field
# Continuously try to get OTP from the email until it's received
otp = None
while otp is None:
    otp = get_otp()
    if otp is None:
        # Sleep for 10 seconds before trying again
        time.sleep(10)
otp_input = driver.find_element_by_name("otp")
otp_input.send_keys(otp)


Finding the OTP in the email

      else:
            # Get the first unread email
            message = messages[0]
            msg = service.users().messages().get(userId='me', id=message['id'], format='full').execute()
            msg_str = base64.urlsafe_b64decode(msg['payload']['body']['data']).decode()
            otp = msg_str.splitlines()[6].split()[0]
            if otp:
                one_time_passcode = otp
                print(f"{one_time_passcode}")
                return one_time_passcode
            else:
                print("No One Time Passcode found in the email.")
                return None

Moving OTP emails to trash

To avoid cluttering your inbox, the program moves OTP emails to the trash folder after retrieving the OTPs. It uses the Gmail API to search for emails from TreasuryDirect.gov containing the subject line “One Time Passcode” and moves them to the trash folder.

Here’s an example of the code that moves OTP emails to trash:

def move_otp_emails_to_trash():
    try:
        creds = get_credentials()
        service = build('gmail', 'v1', credentials=creds)
        results = service.users().messages().list(userId='me',
                                                  q='from:Treasury.Direct@fiscal.treasury.gov subject:"One Time Passcode"').execute()
        messages = results.get('messages', [])
        if not messages:
            print('No messages found.')
        else:
            for message in messages:
                service.users().messages().trash(userId='me', id=message['id']).execute()
                print(f"Moved message with ID {message['id']} to trash.")
    except HttpError as error:
        print(f'An error occurred: {error}')

Extracting Account Information

    try:
        # Locate the elements containing the desired information
        llc_and_account_number = driver.find_element_by_xpath('//div[@id="accountnumber"]').text
        original_purchase_amount = driver.find_element_by_xpath('//p[contains(text(), "Series I current holdings total amount")]/span').text
        current_value = driver.find_element_by_xpath('//p[contains(text(), "Series I current holdings current value")]/span').text
        issue_date = driver.find_element_by_xpath('(//tr[contains(@class, "altrow")]/td)[3]').text
        interest_rate = driver.find_element_by_xpath('//td[contains(text(), "%")]').text

        # Separate the LLC name and account number
        llc_name, account_number = llc_and_account_number.split(':', 1)
        llc_name = llc_name.strip().replace("LLC Name: ", "")
        account_number = account_number.strip()

        # Print the extracted information
        print(f"LLC Name: {llc_name}")
        print(f"Account Number: {account_number}")
        print(f"Original Purchase Amount: {original_purchase_amount}")
        print(f"Current Value: {current_value}")
        print(f"Issue Date: {issue_date}")
        print(f"Interest Rate: {interest_rate}")

        # Save the extracted information as new columns for the current row
        df.loc[index, 'LLC Name'] = llc_name
        df.loc[index, 'Original Purchase Amount'] = original_purchase_amount
        df.loc[index, 'Current Value'] = current_value
        df.loc[index, 'Issue Date'] = issue_date
        df.loc[index, 'Interest Rate'] = interest_rate
    except NoSuchElementException:
        print(f"Failed to extract ibond information for account {account_number}. Moving to the next account.")


    try:
        bank_name = driver.find_element_by_xpath('//tr[@class="altrow1"][1]/td[3]/strong').text
        routing_number = driver.find_element_by_xpath('//tr[@class="altrow1"][2]/td[3]/strong').text
        account_number = driver.find_element_by_xpath('//tr[@class="altrow1"][3]/td[3]/strong').text
        names_on_account = driver.find_element_by_xpath('//tr[@class="altrow1"][4]/td[3]/strong').text
        account_type = driver.find_element_by_xpath('//tr[@class="altrow1"][5]/td[3]/strong').text
        return_code = driver.find_element_by_xpath('//tr[@class="altrow1"][6]/td[3]/strong').text

        # Print the extracted information
        print("Bank Name:", bank_name)
        print("Routing Number:", routing_number)
        print("Account Number:", account_number)
        print("Name(s) on Account:", names_on_account)
        print("Account Type:", account_type)
        print("Return Code:", return_code)

        # Save the extracted information as new columns for the current row
        df.loc[index, 'Bank Name'] = bank_name
        df.loc[index, 'Routing Number'] = routing_number
        df.loc[index, 'Account Number'] = account_number
        df.loc[index, 'Name(s) on Account'] = names_on_account
        df.loc[index, 'Account Type'] = account_type
        df.loc[index, 'Return Code'] = return_code

    except NoSuchElementException:
        print(f"Failed to extract information for account {account_number}. Moving to the next account.")

Full Code:

Coming soon.

Evaluating Optimization Algorithms for Ornstein-Uhlenbeck with Synthetic Data: A Comparison of Accuracy and Speed

In the realm of algorithm analysis, it is easy to become preoccupied with testing on actual datasets where variables remain unknown. However, to assess algorithms for speed and accuracy, one can generate known synthetic data and evaluate how closely the algorithms approximate the actual values and how quickly the calculations are performed. This approach serves as a litmus test for algorithms before implementation in production, and also aids in identifying syntax errors or issues in the underlying mathematical models.

  1. Generating Synthetic Data:

The synthetic data for this experiment is generated using the Ornstein-Uhlenbeck (OU) process, a mean-reverting stochastic process frequently employed in finance and other fields. The following code demonstrates how to generate synthetic data using the OU process:

import numpy as np

np.random.seed(42)

# Synthetic data generation
mu_true = 109
theta_true = 22
sigma_true = 23
dt = 0.01
N = 10000

X = np.zeros(N)
X[0] = mu_true
noise = np.random.normal(0, sigma_true * np.sqrt(dt), N - 1)

for i in range(1, N):
    X[i] = X[i - 1] * np.exp(-theta_true * dt) + mu_true * (1 - np.exp(-theta_true * dt)) + noise[i - 1]
  1. Evaluating Optimization Algorithms:

In this experiment, we will compare the performance of different optimization algorithms on the synthetic dataset generated above. The algorithms under evaluation include:

  • Maximum Likelihood Estimation (MLE)
  • Least Squares Estimation (LSE)
  • Bayesian Estimation
  • Method of Moments (MOM)
  • Kalman Filter Estimation
  1. Defining the Objective Function:

An objective function is required to evaluate the accuracy of the optimization algorithms. In this case, we use the negative log-likelihood function, which is commonly employed for maximum likelihood estimation (MLE).

  1. Implementing the Experiment:

To compare the accuracy and speed of the optimization algorithms, we will run each algorithm on the synthetic dataset and measure their performance. We will use the scipy.optimize.minimize function from the SciPy library to implement the algorithms.

  1. Analyzing the Results:

We will analyze the results by comparing the estimated parameters and the true parameters of the synthetic data. We will also compare the speed of each algorithm by measuring the time taken to converge to the optimal solution.

  1. Key Insights and Recommendations:

Based on the comparison, we will discuss the key insights and recommendations for selecting the most suitable optimization algorithm for a particular problem.

Conclusion:

By using synthetic data, we can effectively evaluate the performance of different optimization algorithms in terms of accuracy and speed. This analysis can help data scientists and researchers make informed decisions when choosing an optimization algorithm for their specific problem. Furthermore, synthetic data can be a valuable tool for improving existing algorithms or developing new ones.

Here is my complete code with examples.

This code is not meant to be conclusive but rather to show an example. For instance, many of these algorithms need parameter adjustments to be more accurate or beneficial in any way.

import os
import sys
import pandas as pd
import numpy as np
from scipy.stats import norm
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
import plotly.graph_objs as go
import plotly.offline as pyo
from plotly.subplots import make_subplots
import pickle
from tqdm import tqdm
from typing import Union, List
from utils import filtered_df
from scipy.optimize import minimize, fmin
from statsmodels.base.model import LikelihoodModel, GenericLikelihoodModel


parent_dir = os.path.abspath(os.path.join(os.getcwd(), '..'))
data_dir = os.path.join(parent_dir, 'data/')
sys.path.append(data_dir)

file_path = os.path.join(data_dir, 'shortable_active_adj_close.pickle')
df = pd.read_pickle(file_path)

def absolute_relative_difference_errors(params, mu, observations, dt):
    theta, sigma = params  # Remove mu from here
    y_true, y_pred = np.array(observations), np.zeros(len(observations))
    y_pred[0] = y_true[0]

    random_noise = np.random.normal(0, np.sqrt(dt), size=len(observations) - 1)
    y_pred[1:] = y_true[:-1] + theta * (mu - y_true[:-1]) * dt + sigma * random_noise

    abs_diff_y_true = np.abs(np.diff(y_true, prepend=y_true[0]))
    abs_y_error = np.abs(y_true - y_pred)

    # Safely divide, avoiding division by zero
    relative_difference_errors = np.divide(abs_y_error, abs_diff_y_true, where=(abs_diff_y_true != 0))
    return np.sum(relative_difference_errors)


def optimize_mean_absolute_relative_difference_error(params, observations, dt):
    theta, mu, sigma = params
    n = len(observations)
    ou_predictions = np.zeros(n)
    ou_predictions[0] = observations[0]

    random_noise = np.random.normal(0, np.sqrt(dt), size=n - 1)
    ou_predictions[1:] = observations[:-1] + theta * (mu - observations[:-1]) * dt + sigma * random_noise

    errors = absolute_relative_difference_errors(observations, ou_predictions)
    return np.sum(errors)


# Update optimization functions to pass mu as an additional argument
def optimize_nelder_mead(observations, dt, theta_init, mu, sigma_init):
    initial_guess = [theta_init, sigma_init]  # Remove mu from here
    result = minimize(absolute_relative_difference_errors, initial_guess, args=(mu, observations, dt), method='Nelder-Mead')
    return result.x

def optimize_bfgs(observations, dt, theta_init, mu, sigma_init):
    initial_guess = [theta_init, sigma_init]  # Remove mu from here
    result = minimize(absolute_relative_difference_errors, initial_guess, args=(mu, observations, dt), method='BFGS')
    return result.x

def optimize_powell(observations, dt, theta_init, mu, sigma_init):
    initial_guess = [theta_init, sigma_init]  # Remove mu from here
    result = minimize(absolute_relative_difference_errors, initial_guess, args=(mu, observations, dt), method='Powell')
    return result.x

def optimize_lbfgsb(observations, dt, theta_init, mu, sigma_init):
    initial_guess = [theta_init, sigma_init]  # Remove mu from here
    result = minimize(absolute_relative_difference_errors, initial_guess, args=(mu, observations, dt), method='L-BFGS-B')
    return result.x

def optimize_tnc(observations, dt, theta_init, mu, sigma_init):
    initial_guess = [theta_init, sigma_init]  # Remove mu from here
    result = minimize(absolute_relative_difference_errors, initial_guess, args=(mu, observations, dt), method='TNC')
    return result.x

def mean_squared_error(y_true, y_pred):
    return np.mean((y_true - y_pred) ** 2)

def mean_absolute_error(y_true, y_pred):
    return np.mean(np.abs(y_true - y_pred))

def mean_absolute_percentage_error(y_true, y_pred):
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

def mean_absolute_relative_difference_error(y_true: Union[List[float], np.ndarray], y_pred: Union[List[float], np.ndarray]) -> float:
    """
    Calculate the mean absolute relative difference error for the given true and predicted values.

    :param y_true: List or numpy array of true values
    :param y_pred: List or numpy array of predicted values
    :return: Mean absolute volatility error (in percentage)
    """
    # Convert input lists to numpy arrays
    y_true, y_pred = np.array(y_true), np.array(y_pred)

    # Check if there are less than two elements in y_true
    if len(y_true) < 2:
        return np.nan

    # Calculate the absolute differences between consecutive elements in y_true
    if len(y_true) == 2:
        abs_diff_y_true = np.abs(y_true[1] - y_true[0])
    else:
        abs_diff_y_true = np.abs(np.diff(y_true))
        # Insert the first difference value at the beginning of the array
        abs_diff_y_true = np.insert(abs_diff_y_true, 0, abs_diff_y_true[0])

    # Replace any zero values with a small value (epsilon) to prevent division by zero
    abs_diff_y_true = np.where(abs_diff_y_true == 0, np.finfo(float).eps, abs_diff_y_true)

    # Calculate the absolute differences between y_true and y_predict
    abs_y_error = np.abs(y_true - y_pred)

    # Calculate mean absolute volatility error (in percentage)
    return np.mean(abs_y_error / abs_diff_y_true)

def compute_direction_accuracy(y_true, y_pred):
    direction_actual = np.sign(np.array(y_true[1:]) - np.array(y_true[:-1]))
    direction_predicted = np.sign(np.array(y_pred[1:]) - np.array(y_pred[:-1]))
    return np.mean(direction_actual == direction_predicted)


import numpy as np
import pandas as pd
from scipy.optimize import minimize
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import plotly.graph_objs as go
from plotly.subplots import make_subplots
import plotly.offline as pyo
import os

def plot_best_method_chart(symbol, method_name, metrics, data, predictions, rolling_mse, rolling_rmse, rolling_mae, rolling_marde, rolling_mape, rolling_r2, rolling_dir, best_theta, best_mu, best_sigma):
    fig = make_subplots(rows=4, cols=1, shared_xaxes=True,
                        subplot_titles=("Original and Predicted Values", "Difference between Actual and Predicted",
                                        "Metrics", "R2"),
                        vertical_spacing=0.025,
                        row_heights=[0.5, 0.1, 0.2, 0.2],
                        )

    fig.add_trace(go.Scatter(x=data.index[252:], y=data[252:], mode="lines", name="Original"), row=1, col=1)
    fig.add_trace(go.Scatter(x=predictions.index[252:], y=predictions[252:], mode="markers", name="Predicted"),
                  row=1, col=1)

    differences = data[252:] - predictions[252:]
    fig.add_trace(go.Scatter(x=differences.index, y=differences, mode="lines", name="Difference"), row=2, col=1)

    # Add metric subplots
    fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_mse, mode="lines", name="MSE"), row=3,
                  col=1)
    fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_rmse, mode="lines", name="RMSE"), row=3,
                  col=1)
    fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_mae, mode="lines", name="MAE"), row=3,
                  col=1)
    fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_marde, mode="lines", name="MARDE"), row=3,
                  col=1)
    fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_mape, mode="lines", name="MAPE"), row=3,
                  col=1)
    fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_dir, mode="lines", name="Direction Accuracy"), row=3, col=1)
    fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_r2, mode="lines", name="R2"), row=4, col=1)


    fig.add_annotation(xref="paper", yref="paper", xanchor='left', yanchor='top', x=0.0, y=1,
                       text=f"<b>{method_name} Metrics</b><br>"
                            f"MSE: {metrics[method_name]['MSE']:.4f}<br>"
                            f"RMSE: {metrics[method_name]['RMSE']:.4f}<br>"
                            f"MAE: {metrics[method_name]['MAE']:.4f}<br>"
                            f"MARDE: {metrics[method_name]['MARDE']:.4f}<br>"
                            f"MAPE: {metrics[method_name]['MAPE']:.4f}<br>"
                            f"R-squared: {metrics[method_name]['R2']:.4f}<br>"
                            f"Direction Accuracy: {metrics[method_name]['Direction Accuracy']:.4f}<br>"
                            f"Best theta: {best_theta:.4f}<br>"
                            f"Best mu: {best_mu:.4f}<br>"
                            f"Best sigma: {best_sigma:.4f}",
                       font=dict(size=12), align="center", showarrow=False)

    fig.update_layout(
        title=f"{symbol} and MARDE: {metrics[method_name]['MARDE']:.4f}",
        title_x=0.5,
        title_y=0.95,
        title_xanchor="center",
        title_yanchor="top",
    )

    if not os.path.exists("plots"):
        os.makedirs("plots")

    plot_file = f"plots/{symbol}_{method_name}_marde{metrics[method_name]['MARDE']:.4f}_theta{best_theta:.4f}_mu{best_mu:.4f}_sigma{best_sigma:.4f}.html"
    pyo.plot(fig, filename=plot_file, auto_open=False)

    print(f"Plot saved as {plot_file}")

def save_analysis_to_dataframe(symbol, method_name, metrics, theta, mu, sigma, filename="ornstein_uhlenbeck_pairs_analysis"):
    # Create a DataFrame with the specified index and columns
    columns = ["Theta", "Mu", "Sigma", "Best Method", "MSE", "RMSE", "MAE", "MAPE", "MARDE", "R2"]
    analysis_df = pd.DataFrame(index=[symbol], columns=columns)

    # Add the data to the DataFrame
    analysis_df.loc[symbol] = [theta, mu, sigma, method_name, metrics[method_name]['MSE'],
                                metrics[method_name]['RMSE'], metrics[method_name]['MAE'],
                                metrics[method_name]['MARDE'], metrics[method_name]['MARDE'],
                                metrics[method_name]['R2'], metrics[method_name]['Direction Accuracy']]

    # Save the DataFrame to disk using the pickle library
    with open(filename + ".pkl", "wb") as file:
        pickle.dump(analysis_df, file)

    print(f"Analysis saved as {filename}.pkl")

def ornstein_uhlenbeck(filename='cointegrated_etf_pairs_historical_ratios.pkl', make_stationary=False, plot_all_charts=True, output_file="ornstein_uhlenbeck_pairs_analysis.pkl"):
    # Load the DataFrame from the specified pickle file
    file_path = os.path.join(data_dir, filename)
    with open(file_path, 'rb') as file:
        df = pickle.load(file)

    # Create a DataFrame to store the analysis for all symbols
    columns = ["Theta", "Mu", "Sigma", "Best Method", "MSE", "RMSE", "MAE", "MARDE", "MAPE", "R2", "Direction Accuracy"]
    all_symbols_analysis = pd.DataFrame(columns=columns)

    # Iterate through each column in the DataFrame
    for symbol in tqdm(df.columns, desc="Processing symbols", leave=False):
        # Select the symbol column and drop NaNs
        data = df[symbol].dropna()
        original_first_value = data.iloc[0]  # Save the first value before differencing

        # Apply first-differencing if make_stationary is True
        if make_stationary:
            data = data.diff().dropna()

        # Calculate dt
        index_diff = data.index[-1] - data.index[-2]
        dt = index_diff.days + index_diff.seconds / (60*60*24)
        # dt = dt/365 # for calendar
        dt = dt/252 # for trading days

        # Create a series to store predictions
        predictions = pd.Series(index=data.index, dtype='float64')

        best_method = None
        best_params = None
        best_rmse = float('inf')

        optimization_methods = [
            ('Nelder-Mead', optimize_nelder_mead),
            ('BFGS', optimize_bfgs),
            ('L-BFGS-B', optimize_lbfgsb),
            ('Powell', optimize_powell),
            ('TNC', optimize_tnc),
        ]
        metrics = {}
        best_parameters = {}
        rolling_metrics = {}
        method_predictions = {}


        for method_name, optimization_method in tqdm(optimization_methods, desc="Optimizing parameters", leave=False):
            all_predictions = []
            all_actuals = []

            theta_init = 2
            sigma_init = 2.9

            last_theta = None
            last_mu = None
            last_sigma = None

            rolling_mse = []
            rolling_rmse = []
            rolling_mae = []
            rolling_mape = []
            rolling_marde = []
            rolling_r2 = []
            rolling_dir_acc = []
            rolling_mu = []

            for i in range(252, len(data) - 1):
                if last_theta is None:
                    # First time running this optimization method, use initial values
                    theta_init, mu_init, sigma_init = theta_init, data[:i].mean(), sigma_init
                else:
                    # Use last calculated values as initial guess
                    # theta_init, mu_init, sigma_init = last_theta, last_mu, last_sigma
                    #this method does not allow mu to be optimized
                    theta_init, mu_init, sigma_init = last_theta, data[:i].mean(), last_sigma

                # Run optimization
                theta, sigma = optimization_method(data[i - 252:i].values, dt, theta_init, mu_init,
                                                   sigma_init)  # Unpack two values
                mu = data[i-252:i].mean()  # Calculate mu as the mean of the data up to the current index
                rolling_mu.append(mu)
                # Save the last calculated values
                last_theta, last_mu, last_sigma = theta, mu, sigma

                # print("All actuals:", all_actuals)
                # print("All predictions:", all_predictions)
                # Calculate prediction and save it to the predictions series
                ou_prediction = data[i] + theta * (mu - data[i]) * dt + sigma * np.random.normal(0, np.sqrt(dt))
                predictions[i + 1] = ou_prediction

                # Save actual and predicted values for computing metrics later
                all_predictions.append(ou_prediction)
                all_actuals.append(data[i + 1])

                # print("All actuals:", all_actuals)
                # print("All predictions:", all_predictions)

                # Calculate the rolling metrics
                mse = mean_squared_error(np.array(all_actuals), np.array(all_predictions))
                rmse = np.sqrt(mse)
                mae = mean_absolute_error(np.array(all_actuals), np.array(all_predictions))
                marde = mean_absolute_relative_difference_error(np.array(all_actuals), np.array(all_predictions))
                mape = mean_absolute_percentage_error(np.array(all_actuals), np.array(all_predictions))
                r2 = r2_score(np.array(all_actuals), np.array(all_predictions))
                dir_acc = compute_direction_accuracy(np.array(data[i-252:i]), np.array(predictions[i-252:i]))

                # Append the rolling metrics to their respective lists
                rolling_mse.append(mse)
                rolling_rmse.append(rmse)
                rolling_mae.append(mae)
                rolling_marde.append(marde)
                rolling_mape.append(mape)
                rolling_r2.append(r2)
                rolling_dir_acc.append(dir_acc)

                rolling_metrics[method_name] = {
                    'rolling_mse': rolling_mse,
                    'rolling_rmse': rolling_rmse,
                    'rolling_mae': rolling_mae,
                    'rolling_marde': rolling_marde,
                    'rolling_mape': rolling_mape,
                    'rolling_r2': rolling_r2,
                    'rolling_direction_accuracy': rolling_dir_acc,


                }

                method_predictions[method_name] = predictions.copy()

                # Compute metrics for this optimization method
            if len(all_actuals) > 0 and len(all_predictions) > 0:
                mse = mean_squared_error(np.array(all_actuals), np.array(all_predictions))
                rmse = np.sqrt(mse)
                mae = mean_absolute_error(np.array(all_actuals), np.array(all_predictions))
                marde = mean_absolute_relative_difference_error(np.array(all_actuals), np.array(all_predictions))
                mape = mean_absolute_percentage_error(np.array(all_actuals), np.array(all_predictions))
                r2 = r2_score(np.array(all_actuals), np.array(all_predictions))
                direction_accuracy = compute_direction_accuracy(np.array(all_actuals), np.array(all_predictions))

                metrics[method_name] = {'MSE': mse, 'RMSE': rmse, 'MAE': mae, 'MARDE': marde, 'MAPE': mape, 'R2': r2,
                                        'Direction Accuracy': direction_accuracy}

                best_parameters[method_name] = {'theta': last_theta, 'mu': last_mu, 'sigma': last_sigma}

                print(f"Optimization method: {method_name}")
                print(f"Mean Squared Error: {mse}")
                print(f"Root Mean Squared Error: {rmse}")
                print(f"Mean Absolute Error: {mae}")
                print(f"Mean Absolute Volatility Error: {marde}%\n")
                print(f"Mean Absolute Percentage Error: {mape}%\n")
                print(f"R-squared: {r2}\n")
                print(f"Direction Accuracy: {direction_accuracy}\n")



                if plot_all_charts:
                    # Add the plotting code here
                    fig = make_subplots(rows=3, cols=1, shared_xaxes=True,
                                        subplot_titles=(
                                        "Original and Predicted Values", "Difference between Actual and Predicted",
                                        "Metrics"),
                                        vertical_spacing=0.05,
                                        row_heights=[0.5, 0.5, 0.5],
                                        )

                    fig.add_trace(go.Scatter(x=data.index[252:], y=data[252:], mode="lines", name="Original"), row=1,
                                  col=1)
                    fig.add_trace(
                        go.Scatter(x=predictions.index[252:], y=predictions[252:], mode="markers", name="Predicted"),
                        row=1, col=1)

                    fig.add_trace(
                        go.Scatter(x=data.index[252:], y=rolling_mu, mode="markers", name="Mu"),
                        row=1, col=1)

                    differences = data[252:] - predictions[252:]
                    fig.add_trace(go.Scatter(x=differences.index, y=differences, mode="lines", name="Difference"),
                                  row=2, col=1)

                    # Add metric subplots
                    fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_mse, mode="lines", name="MSE"),
                                  row=3, col=1)
                    fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_rmse, mode="lines", name="RMSE"),
                                  row=3, col=1)
                    fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_mae, mode="lines", name="MAE"),
                                  row=3,col=1)
                    fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_marde, mode="lines", name="MARDE"),
                                  row=3, col=1)
                    fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_mape, mode="lines", name="MAPE"),
                                  row=3, col=1)
                    fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_r2, mode="lines", name="R2"),
                                  row=3, col=1)
                    fig.add_trace(go.Scatter(x=data.index[252:len(data) - 1], y=rolling_dir_acc, mode="lines", name="Direction Accuracy"),
                                  row=3, col=1)

                    fig.add_annotation(xref="paper", yref="paper", xanchor='left', yanchor='top', x=0.0, y=1,
                                       text=f"<b>{method_name} Metrics</b><br>"
                                            f"MSE: {metrics[method_name]['MSE']:.4f}<br>"
                                            f"RMSE: {metrics[method_name]['RMSE']:.4f}<br>"
                                            f"MAE: {metrics[method_name]['MAE']:.4f}<br>"
                                            f"MARDE: {metrics[method_name]['MARDE']:.4f}<br>"
                                            f"MAPE: {metrics[method_name]['MAPE']:.4f}<br>"
                                            f"R-squared: {metrics[method_name]['R2']:.4f}<br>"
                                            f"Direction Accuracy: {metrics[method_name]['Direction Accuracy']:.4f}",
                                       font=dict(size=12), align="center", showarrow=False)

                    if not os.path.exists("plots"):
                        os.makedirs("plots")

                    plot_file = f"plots/{symbol}_ornstein_uhlenbeck_{method_name}.html"
                    pyo.plot(fig, filename=plot_file, auto_open=False)

                    print(f"Plot saved as {plot_file}")
            else:
                print("Skipping metric computation due to insufficient data.")

        if metrics:
            best_method = min(metrics, key=lambda x: metrics[x]['MAPE'])
        else:
            print("No metrics available for this symbol.")
            continue
        # best_method = max(metrics, key=lambda x: metrics[x]['R2'])

        best_mse = metrics[best_method]['MSE']
        best_rmse = metrics[best_method]['RMSE']
        best_mae = metrics[best_method]['MAE']
        best_marde = metrics[best_method]['MARDE']
        best_mape = metrics[best_method]['MAPE']
        best_r2 = metrics[best_method]['R2']
        best_dir_acc = metrics[best_method]['Direction Accuracy']

        best_theta = best_parameters[best_method]['theta']
        best_mu = best_parameters[best_method]['mu']
        best_sigma = best_parameters[best_method]['sigma']

        print(f"Best optimization method: {best_method}")
        print(f"Best values for theta: {best_theta}, mu: {best_mu}, sigma: {best_sigma}")
        print(f"Mean Squared Error: {best_mse}")
        print(f"Root Mean Squared Error: {best_rmse}")
        print(f"Mean Absolute Error: {best_mae}")
        print(f"Mean Absolute Relative Distance Error: {best_marde}")
        print(f"Mean Absolute Percentage Error: {best_mape}")
        print(f"R-squared: {best_r2}")
        print(f"Direction Accuracy: {best_dir_acc}")

        all_symbols_analysis.loc[symbol] = [best_theta, best_mu, best_sigma, best_method,
                                            metrics[best_method]['MSE'],
                                            metrics[best_method]['RMSE'], metrics[best_method]['MAE'],
                                            metrics[best_method]['MARDE'], metrics[best_method]['MAPE'],
                                            metrics[best_method]['R2'],
                                            metrics[best_method]['Direction Accuracy']]

                # Call the plot_best_method_chart function after determining the best_method
        plot_best_method_chart(
            symbol, best_method, metrics, data, method_predictions[best_method],
            rolling_metrics[best_method]['rolling_mse'],
            rolling_metrics[best_method]['rolling_rmse'],
            rolling_metrics[best_method]['rolling_mae'],
            rolling_metrics[best_method]['rolling_marde'],
            rolling_metrics[best_method]['rolling_mape'],
            rolling_metrics[best_method]['rolling_r2'],
            rolling_metrics[best_method]['rolling_direction_accuracy'],
            best_theta, best_mu, best_sigma
        )

        with open(output_file, "wb") as file:
            pickle.dump(all_symbols_analysis, file)
        # return best_method, best_theta, best_mu, best_sigma, best_mse, best_rmse, best_mae, best_marde, best_mape,best_r2



ornstein_uhlenbeck()

Your final results will look something like this.

True values: μ = 109.00, θ = 22.00, σ = 23.00
MLE estimates: μ = 923764.61, θ = -4591680.43, σ = 69817.80
Least Squares: μ = 108.97, θ = 23.72, σ = 1.00
Method of Moments: μ = 108.98, θ = 23.70, σ = 19.33
Kalman Filter: μ = 0.00, θ = 2.00, σ = 2.42
Ensemble prediction: μ = 108.98, θ = 23.70, σ = 19.33
MLE estimates (Nelder-Mead): μ = 0.00, θ = -3255.78, σ = 18095.76
MLE estimates (Powell): μ = -0.51, θ = -104.01, σ = 16.22
MLE estimates (CG): μ = -31431.14, θ = -159.34, σ = 8154.10
MLE estimates (BFGS): μ = -59278.98, θ = -1326.24, σ = 19548.41
MLE estimates (L-BFGS-B): μ = 0.77, θ = 4.98, σ = 5.30

Process finished with exit code 0

Mean Absolute Relative Difference Error (MARDE) Metric for Improved Forecasting Evaluation

I was running some machine learning models and was struggling to find a metric that could capture errors relative to volatility in the underlying model that I was trying to predict so I created MARDE. Commonly used metrics like Mean Absolute Error (MAE), Mean Absolute Percentage Error (MAPE), Mean Squared Error (MSE), and R-squared (R2) have limitations that failed to capture my model’s performance.

I created a new metric called Mean Absolute Relative Difference Error (MARDE), which addresses some of the shortcomings of the existing metrics. MARDE measures the average absolute error between the true and predicted values, relative to the consecutive differences in the true values. By considering the relative differences, MARDE offers a more nuanced evaluation of model performance, particularly in situations with varying volatility or fluctuating data patterns.

MARDE = (1/n) * Σ(abs_y_error_i / Δy_true_i) * 100, for i=1 to n

Here, Δy_true represents the absolute differences between consecutive elements in y_true, and abs_y_error represents the absolute differences between y_true and y_pred.


def mean_absolute_volatility_error(y_true: Union[List[float], np.ndarray], y_pred: Union[List[float], np.ndarray]) -> float:
    """
    Calculate the mean absolute volatility error for the given true and predicted values.

    :param y_true: List or numpy array of true values
    :param y_pred: List or numpy array of predicted values
    :return: Mean absolute volatility error (in percentage)
    """
    # Convert input lists to numpy arrays
    y_true, y_pred = np.array(y_true), np.array(y_pred)

    # Check if there are less than two elements in y_true
    if len(y_true) < 2:
        return np.nan

    # Calculate the absolute differences between consecutive elements in y_true
    if len(y_true) == 2:
        abs_diff_y_true = np.abs(y_true[1] - y_true[0])
    else:
        abs_diff_y_true = np.abs(np.diff(y_true))
        # Insert the first difference value at the beginning of the array
        abs_diff_y_true = np.insert(abs_diff_y_true, 0, abs_diff_y_true[0])

    # Calculate the absolute differences between y_true and y_pred
    abs_y_error = np.abs(y_true - y_pred)

    # Calculate mean absolute volatility error (in percentage)
    return np.mean(abs_y_error / abs_diff_y_true * 100)

Comparing MARDE to existing metrics:

  1. Mean Absolute Error (MAE): MAE measures the average absolute difference between the true and predicted values. However, it does not account for the inherent volatility or fluctuations in the data. MARDE considers this aspect by normalizing the error with respect to the consecutive differences in the true values.
    • MAE = (1/n) * Σ|y_true_i – y_pred_i|, for i=1 to n
  2. Mean Absolute Percentage Error (MAPE): MAPE is similar to MAE, but it normalizes the error by dividing it by the true value. While MAPE is useful for expressing errors in percentage terms, it can lead to biased results when dealing with very small or zero true values. MARDE overcomes this issue by using consecutive differences as the normalization factor.
    • MAPE = (1/n) * Σ(|(y_true_i – y_pred_i) / y_true_i|) * 100, for i=1 to n
  3. Mean Squared Error (MSE): MSE measures the average squared difference between the true and predicted values. It tends to penalize larger errors more heavily, which can be useful in some cases but might also overemphasize outliers. MARDE, on the other hand, focuses on the absolute differences and is less sensitive to extreme values.
    • MSE = (1/n) * Σ(y_true_i – y_pred_i)^2, for i=1 to n
  4. R-squared (R2): R2 measures the proportion of variance in the true values that can be explained by the model. While R2 is useful for assessing the overall goodness-of-fit, it does not provide a direct measure of the prediction error. MARDE, as a metric focused on errors, offers a complementary perspective to R2 in evaluating model performance.
    • R² = 1 – (SSR / SST)
    • SSR (Sum of Squared Residuals) is the sum of the squared differences between the actual dependent variable values (y) and the predicted values (ŷ) from the model;
    • SST (Total Sum of Squares) is the sum of the squared differences between the actual dependent variable values (y) and the mean of the dependent variable (ȳ).

The Mean Absolute Relative Difference Error (MARDE) is a valuable addition to the set of evaluation metrics used in forecasting and prediction tasks. By taking into account the consecutive differences in the true values, MARDE provides a more granular assessment of model performance, particularly in situations with fluctuating data patterns. It overcomes some of the limitations of existing metrics and can be employed alongside them for a comprehensive evaluation of prediction models.