My Leaky Water Bill – How to Use Machine Learning to Detect Water Leaks

Recently, I encountered an unexpected challenge: a water leak beneath the slab of my house. The ordeal had me up until 1 AM, rerouting the line through my attic with PEX piping. Amidst this late-night task, a thought occurred to me: could machine learning and forecasting have helped me detect this leak earlier, based on my water bill consumption?

I wrote some Python code outlined below that uses statmodels and SARIMAX to predict consumption.

I now wonder why municipalities aren’t incorporating machine learning into data like this to send notices to customers in advance of potential leaks. I imagine this could save millions of gallons of water each year. Full code and explanation follows.

Data Upload and Preparation:

The program starts by uploading a CSV file containing water usage data (in gallons) and the corresponding dates. The CSV must have two column titles date and gallons in order for this to work. This data is then processed to ensure it’s in the correct format. Dates are sorted, and any missing values are filled to maintain continuity.


Creating a Predictive Model:

I used the SARIMAX model from the statsmodels library, a powerful tool for time series forecasting. The model considers both the seasonal nature of water usage and any underlying trends or cycles.


Making Predictions and Comparisons:

The program forecasts future water usage and compares it with actual data.
By analyzing past consumption, it can predict what typical usage should look like and flag any significant deviations.


Visualizing the Data:

The real power of this program lies in its visualization capabilities.
Using Plotly, a versatile graphing library, the program generates an interactive chart. It not only shows actual water usage but also plots predicted values and their confidence intervals.

Highlighting Historical Data:

To provide context, the chart also includes historical data as reference points. These are shown as small horizontal lines, representing the same month in previous years.

Code (Google Colab)

!pip install plotly
!pip install statsmodels

from google.colab import files
import io
import pandas as pd

uploaded = files.upload()

# Use the name of the first uploaded file
filename = next(iter(uploaded))
df = pd.read_csv(io.BytesIO(uploaded[filename]))

df = df[['date', 'gallons']]

# Convert the date column to datetime
df['date'] = pd.to_datetime(df['date'])  
df.sort_values(by='date', inplace=True)

df.set_index('date', inplace=True)
df = df.asfreq('D')
df['gallons'].fillna(method='ffill', inplace=True)
df = df.asfreq('M')

import plotly.graph_objects as go
import pandas as pd
from statsmodels.tsa.statespace.sarimax import SARIMAX

# SARIMA Model for Forecasting
model = SARIMAX(df['gallons'], order=(1, 0, 1), seasonal_order=(1, 1, 1, 12))
results = model.fit()

# In-sample predictions
in_sample_predictions = results.get_prediction(start=pd.to_datetime(df.index[12]), end=pd.to_datetime(df.index[-1]), dynamic=False)
predicted_mean_in_sample = in_sample_predictions.predicted_mean
in_sample_conf_int = in_sample_predictions.conf_int()

# Forecasting for future periods (e.g., the next 12 months)
forecast = results.get_forecast(steps=12)
predicted_mean_forecast = forecast.predicted_mean
forecast_conf_int = forecast.conf_int()

# Prepare the figure
fig = go.Figure()

# Predicted data (in-sample) and confidence intervals
fig.add_trace(go.Scatter(x=predicted_mean_in_sample.index, y=predicted_mean_in_sample, mode='lines', name='Predicted (In-Sample)', line=dict(color='orange')))
fig.add_trace(go.Scatter(x=in_sample_conf_int.index, y=in_sample_conf_int['upper gallons'], fill=None, mode='lines', line=dict(color='lightgray'), showlegend=False))
fig.add_trace(go.Scatter(x=in_sample_conf_int.index, y=in_sample_conf_int['lower gallons'], fill='tonexty', mode='lines', line=dict(color='lightgray'), showlegend=False, name='Predicted CI'))

# Forecasted data (out-of-sample) and confidence intervals
fig.add_trace(go.Scatter(x=predicted_mean_forecast.index, y=predicted_mean_forecast, mode='lines', name='Forecast (Out-of-Sample)', line=dict(color='green')))
fig.add_trace(go.Scatter(x=forecast_conf_int.index, y=forecast_conf_int['upper gallons'], fill=None, mode='lines', line=dict(color='lightgray'), showlegend=False))
fig.add_trace(go.Scatter(x=forecast_conf_int.index, y=forecast_conf_int['lower gallons'], fill='tonexty', mode='lines', line=dict(color='lightgray'), showlegend=False, name='Forecast CI'))

# Actual data (make it bolder and on top)
fig.add_trace(go.Scatter(x=df.index, y=df['gallons'], mode='lines', name='Actual', line=dict(color='blue', width=3)))

# Adding Previous Years' data as small horizontal lines
legend_added = False
for current_date in df.index.union(predicted_mean_forecast.index):
    current_month, current_year = current_date.month, current_date.year
    previous_years_data = df[(df.index.month == current_month) & (df.index.year < current_year)]
    for prev_year_date in previous_years_data.index:
        y_value = previous_years_data.loc[prev_year_date, 'gallons']
        fig.add_shape(type="line", x0=current_date - pd.Timedelta(days=5), y0=y_value, x1=current_date + pd.Timedelta(days=5), y1=y_value, line=dict(color="purple", width=2))
        if not legend_added:
            fig.add_trace(go.Scatter(x=[None], y=[None], mode='lines', name='Previous Years', line=dict(color='purple', width=2)))
            legend_added = True

# Update layout
fig.update_layout(title='Actual vs Predicted vs Forecasted Water Usage', xaxis_title='Date', yaxis_title='Gallons', hovermode='closest')

# Show the plot
fig.show()

Predicting Stock Prices and Corporate Financials Ratios with Facebook Prophet Using Python

I started off wanting to analyze the sustainability of mortgage buydowns by home builders. But as it happens, my ADHD had other plans. Three weeks later, here we are with code that looks for trends and anomalies in corporate financials. The code generates detailed HTML profiles for in-depth financial analysis of stocks.

In a series of previous posts, I’ve explained the extraction of corporate financial data and stock price data. Now, let’s go a step further by utilizing Plotly and Facebook’s Prophet library to predict these financial metrics and stock prices. It’s crucial to note that the goal here isn’t to necessarily predict future values but to model the data in such a way that we can quickly spot any anomalies. In this tutorial, we’ll be extracting data from HDF5 files, which were created in previous posts, so make sure to check those out first.

Generated HTML Output: A Preview of What the Code Produces

The code generates an in-depth profile of any stock specified. In this example, we take a look at Google. The output is a structured HTML file that includes a variety of information sections. First, it provides basic details such as the company’s name, ticker symbol, and the date of the most recent update. Next, it lists essential contact information, including the company’s web URL, phone number, and physical address. The sector and industry in which the company operates, as well as the number of full-time employees, are also displayed. The output then delves into valuation metrics like P/E ratios, enterprise value, and other financial ratios. Finally, it highlights key financial data points, such as market capitalization, EBITDA, and various earnings and revenue estimates. Overall, the code produces a multi-faceted profile that serves as a valuable resource for anyone looking to understand Alphabet Inc’s business and financial standing in great detail.

Forecasting Future Adjusted Close Prices with Facebook Prophet: A One-Year Outlook

This plot visualizes the historical and predicted adjusted close prices for a particular asset, utilizing the Facebook Prophet algorithm for the forecast. The x-axis represents the timeline, extending from the earliest available historical data to one year into the future. The y-axis shows the adjusted close prices. Historical prices are plotted as solid purple lines. The predicted value is a solid green line with blue dotted lines as the high and low predictions. This predictive model leverages the power of Facebook Prophet to analyze seasonal and trend components in the historical data, providing a one-year outlook on potential price movements.

Forecasting Future Financial Ratios with Facebook Prophet: A One-Year Outlook

Financial ratios are key indicators of a company’s financial health and performance. They offer crucial insights into various aspects like profitability, liquidity, and valuation. In this section, we’ll leverage the predictive power of Facebook Prophet to forecast these ratios over the next year.

Forecasting Future Earnings and Profitability Metrics from the Income Statement with Facebook Prophet: A One-Year Outlook

These plots present a comprehensive visualization of key earnings and profitability metrics extracted from the income statement, including Net Income, Operating Income, Total Revenue, Gross Profit, and Free Cash Flow. The financial numbers are then forward projected using Facebook Prophet. These predictions not only display the expected trends for the next year but also encompass upper and lower bounds, providing a range of possible outcomes. This integration of historical data with advanced forecasting techniques offers a nuanced understanding of the company’s financial trajectory.

Forecasting Future Balance Sheet Metrics with Facebook Prophet: A One-Year Outlook

These plots offer an in-depth analysis of crucial balance sheet metrics such as Total Assets, Total Stockholder Equity, Retained Earnings, Long-Term Debt, and Total Liabilities. Leveraging the predictive power of Facebook Prophet, these key financial indicators are forecasted into the future. The projections not only illustrate the expected financial posture for the forthcoming year but also include upper and lower prediction intervals, giving a full spectrum of potential financial scenarios. By melding past balance sheet data with sophisticated predictive modeling, the plots provide a multi-dimensional view of the company’s expected financial stability and risk profile.

Forecasting Future Cash Flow Metrics with Facebook Prophet: A One-Year Outlook

These plots dive into essential cash flow metrics, specifically Total Cash from Operating Activities, Capital Expenditures, and Dividends Paid. These key figures are also extended into the future using Facebook Prophet’s forecasting capabilities. The resulting predictions not only map out the anticipated cash flow movements for the next year but are also bracketed by upper and lower confidence intervals, presenting a comprehensive range of financial possibilities. These plots offer a well-rounded perspective on the company’s future liquidity and capital allocation strategies.

Other Financial Metrics

In addition to the key financial metrics, the analysis delves into a diverse set of other metrics, plotting each meticulously. These metrics are categorized as follows:

Efficiency and Activity Metrics from Income Statement:

  • Research Development
  • Selling General Administrative


Stockholder’s Equity and Capital Structure:

  • Common Stock Shares Outstanding


Additional Important Metrics:

  • Net Debt
  • Net Receivables
  • Inventory
  • Accounts Payable
  • Total Current Assets
  • Total Current Liabilities

Less Critical Metrics:

  • Income Before Tax
  • Cost of Revenue
  • Intangible Assets
  • Earning Assets
  • Other Current Assets
  • Deferred Long-Term Liabilities
  • Other Current Liabilities
  • Common Stock
  • Capital Stock
  • Other Liabilities
  • Goodwill
  • Other Assets
  • Cash
  • Cash and Equivalents
  • Current Deferred Revenue
  • Short-Term Debt
  • Short/Long-Term Debt
  • Short/Long-Term Debt Total
  • Other Stockholder Equity
  • Property Plant Equipment
  • Long-Term Investments
  • Net Tangible Assets
  • Short-Term Investments

Other Metrics:

  • Effect of Accounting Charges
  • Income Tax Expense
  • Non-Operating Income Net Other
  • Selling and Marketing Expenses
  • Common Stock Total Equity
  • Preferred Stock Total Equity
  • Retained Earnings Total Equity
  • Treasury Stock
  • Accumulated Amortization
  • Non-Current Assets Other
  • Deferred Long-Term Asset Charges
  • Non-Current Assets Total
  • Capital Lease Obligations
  • Long-Term Debt Total
  • Non-Current Liabilities Other
  • Non-Current Liabilities Total
  • Negative Goodwill
  • Warrants
  • Preferred Stock Redeemable
  • Capital Surpluse
  • Liabilities And Stockholders Equity
  • Cash And Short-Term Investments
  • Property Plant And Equipment Gross
  • Property Plant And Equipment Net
  • Accumulated Depreciation
  • Total Cash Flows From Investing Activities
  • Total Cash From Financing Activities
  • Net Borrowings
  • Issuance Of Capital Stock
  • Investments
  • Change To Liabilities
  • Change To Operating Activities
  • Change In Cash
  • Begin Period Cash Flow
  • End Period Cash Flow
  • Depreciation
  • Other Cash Flows From Investing Activities
  • Change To Inventory
  • Change To Account Receivables
  • Sale Purchase Of Stock
  • Other Cash Flows From Financing Activities
  • Change To Net Income
  • Change Receivables
  • Cash Flows Other Operating
  • Exchange Rate Changes
  • Cash And Cash Equivalents Changes
  • Change In Working Capital
  • Stock Based Compensation
  • Other Non-Cash Items

Code Overview

Prerequisites

  • Python 3.x
  • Pandas
  • Plotly
  • Prophet
  • Logging
  • Pathlib
  • os
  • Collections
  • bs4 (BeautifulSoup)
  • time
  • scikit-learn

Helper Functions

The code includes several helper functions, such as:

  • calculate_mae: Calculates the Mean Absolute Error between the actual and predicted data.
  • access_hdf5_with_retries: Attempts to read an HDF5 file, retrying up to a specified number of times.
  • read_general_info: Reads general company information from an HDF5 file.
  • infer_dtype: Infers the data type of a Pandas Series.
  • fetch_data_for_symbol_from_multiple_h5: Fetches data for a specific symbol from multiple HDF5 files.
  • get_company_name: Gets the company name for a given symbol from an HDF5 file.
  • add_content_before_plot: Adds additional content before the plot in an HTML file.

Forecasting Function

The core of this code is the forecast_with_multiple_metrics function, which:

  1. Accepts a DataFrame of financial metrics.
  2. Performs time-series forecasting on each metric using Facebook’s Prophet.
  3. Plots the actual and forecasted metrics using Plotly.
def forecast_with_multiple_metrics(df: pd.DataFrame, periods: int = 4, save_dir: str = "plots/", use_all_data: bool = True, eod_price_data: str = 'eod_price_data.h5'):
    ...

Code

import pandas as pd
from pathlib import Path
import logging
import os
from plotly.subplots import make_subplots
from prophet import Prophet
import plotly.graph_objects as go
from collections import Counter
from pandas.api.types import is_numeric_dtype
from bs4 import BeautifulSoup
import time
from sklearn.metrics import mean_absolute_error
from math import log10


from dictionaries_and_lists import metric_definitions, reordered_columns, homebuilders, sp500, companies_with_treasuries, largest_banks, percentage_metrics

data_dir = '/home/shared/algos/data/'
plots_dir = '/home/shared/algos/eodhd_data/plots/'

logging.basicConfig(level=logging.DEBUG)
logging.basicConfig(level=logging.INFO)

# Configure Pandas to display all columns
pd.set_option('display.max_columns', None)
pd.set_option('display.expand_frame_repr', False)

symbol_exchange_map = {}


def wrap_text(text, max_length):
    """
    Wraps text to a new line at the nearest whitespace of the max_length.
    """
    wrapped_lines = []
    while len(text) > max_length:
        # Find nearest whitespace of the max_length
        split_index = text.rfind(' ', 0, max_length + 1)
        if split_index == -1:  # No whitespace found, force split
            split_index = max_length
        wrapped_lines.append(text[:split_index])
        text = text[split_index:].lstrip()
    wrapped_lines.append(text)
    return '<br>'.join(wrapped_lines)

def format_large_number(num):
    print('Attempting to plot large number')
    if num < 1_000:
        return str(num)
    magnitude = int(log10(num) // 3)
    value = num / (10 ** (3 * magnitude))
    return f"{value:.2f}{' KMBT'[magnitude]}"

def format_percentage(data):
    try:
        # Remove any non-numeric characters like commas and percentage signs
        if isinstance(data, str):
            data = data.replace(',', '').replace('%', '').strip()
        # Convert to float and format as a percentage
        return "{:.2%}".format(float(data))
    except ValueError as e:
        print(f"ValueError: Could not convert {data} to a percentage.")
        return data  # Return the original data if it cannot be converted


def calculate_mae(actual, predicted):
    return mean_absolute_error(actual, predicted)

def access_hdf5_with_retries(hdf5_file_path, mode, max_retries=3, sleep_duration=5):
    retries = 0
    while retries < max_retries:
        try:
            with pd.HDFStore(hdf5_file_path, mode) as store:
                return store.keys()  # You can customize this part to return what you need
            break  # If successful, exit the while loop
        except Exception as e:  # Replace Exception with a more specific exception if possible
            if retries < max_retries - 1:
                logging.info(f"An exception occurred while reading {hdf5_file_path}: {e}")
                logging.info(f"Retrying in {sleep_duration} seconds...")
                time.sleep(sleep_duration)
                retries += 1
            else:
                logging.error(f"Max retries reached. Could not read {hdf5_file_path}. Exiting...")
                raise

def read_general_info(symbol, h5_general_path, max_retries=3, sleep_duration=5):
    retries = 0
    while retries < max_retries:
        try:
            with pd.HDFStore(h5_general_path, 'r') as store:
                key = f'/{symbol}'
                if key in store.keys():
                    general_info = store.get(key)
                    info_dict = dict(zip(general_info['SubCategory'], general_info['Data']))
                    sector = info_dict.get('Sector', 'N/A')
                    industry = info_dict.get('Industry', 'N/A')
                    description = info_dict.get('Description', 'N/A')
                    full_time_employees = info_dict.get('FullTimeEmployees', 'N/A')
                    updated_at = info_dict.get('UpdatedAt', 'N/A')
                    web_url = info_dict.get('WebURL', 'N/A')
                    phone = info_dict.get('Phone', 'N/A')
                    address = info_dict.get('Address', 'N/A')
                    name = info_dict.get('Name', 'N/A')
                    exchange = info_dict.get('Exchange', 'N/A')
                    return sector, industry, description, full_time_employees, updated_at, web_url, phone, address, name, exchange
                else:
                    return ['N/A']*10
            break
        except Exception as e:
            if retries < max_retries - 1:
                print(f"An exception occurred while reading {h5_general_path}: {e}")
                print(f"Retrying in {sleep_duration} seconds...")
                time.sleep(sleep_duration)
                retries += 1
            else:
                print(f"Max retries reached. Could not read {h5_general_path}. Exiting...")
                raise



def infer_dtype(series):
    sample = series.dropna().head(100)  # Sample the first 100 non-null rows

    # Debugging line
    print(f"Sample for {series.name}: {sample}")

    if sample.empty:
        return None

    if all(sample.apply(lambda x: isinstance(x, str))):
        return 'object'

    if is_numeric_dtype(sample) and all(sample.apply(lambda x: x == int(x))):
        return 'int64'

    if is_numeric_dtype(sample):
        return 'float64'
    return None


def fetch_data_for_symbol_from_multiple_h5(h5_filepaths, symbol, max_retries=3, sleep_duration=5):
    combined_data = None
    common_keys = ['Symbol', 'date', 'filing_date', 'currency_symbol']

    for h5_filepath in h5_filepaths:
        h5_path = Path(h5_filepath)
        if not h5_path.exists():
            logging.info(f"The file {h5_filepath} does not exist.")
            continue

        retries = 0
        while retries < max_retries:
            try:
                with pd.HDFStore(h5_filepath, 'r') as store:
                    if f"/{symbol}" in store.keys():
                        logging.info(f"Symbol {symbol} found in {h5_filepath}.")
                        symbol_data = store.select(symbol)

                        if 'date' not in symbol_data.columns and 'Date' not in symbol_data.columns:
                            if isinstance(symbol_data.index, pd.DatetimeIndex):
                                symbol_data.reset_index(inplace=True)
                                symbol_data.rename(columns={'index': 'date'}, inplace=True)
                                logging.info(
                                    f"'date' and 'Date' columns not found, but datetime index exists in {h5_filepath} for symbol {symbol}.")
                                symbol_data = symbol_data.reset_index().rename(columns={'index': 'date'})
                            else:
                                logging.warning(
                                    f"'date' and 'Date' columns and datetime index not found in {h5_filepath} for symbol {symbol}.")
                                break
                        else:
                            # Ensure the date column is standardized to 'date'
                            if 'Date' in symbol_data.columns:
                                symbol_data.rename(columns={'Date': 'date'}, inplace=True)


                        # Rename 'netIncome' if it's present in the dataframe
                        if 'netIncome' in symbol_data.columns:
                            # Extract the type of financial statement from the filename
                            parts = h5_path.stem.split('_')
                            statement_type = parts[1]
                            metric_name = parts[2]

                            statement_readable = {
                                "Cash": "Cash Flow from Operating Activities",
                                "Income": "netIncome",
                            }.get(statement_type, statement_type)

                            metric_title = f"{metric_name} ({statement_readable})"

                            # Ensure statement_type is one of the reports where 'netIncome' appears before renaming
                            if statement_type in ["Income", "Cash"]:
                                symbol_data.rename(columns={'netIncome': metric_title}, inplace=True)

                        if combined_data is None:
                            combined_data = symbol_data
                        else:
                            # Dynamically adjust common keys based on available columns
                            keys_for_merge = [key for key in common_keys if
                                              key in symbol_data.columns and key in combined_data.columns]
                            combined_data = pd.merge(combined_data, symbol_data, on=keys_for_merge, how='outer')
                        break

                    else:
                        logging.info(f"Symbol {symbol} not found in {h5_filepath}.")
                        break

            except Exception as e:
                if retries < max_retries - 1:
                    logging.info(f"An exception occurred while reading {h5_filepath}: {e}")
                    logging.info(f"Retrying in {sleep_duration} seconds...")
                    time.sleep(sleep_duration)
                    retries += 1
                else:
                    logging.info(f"Max retries reached. Could not read {h5_filepath}. Exiting...")
                    raise

    if combined_data is None:
        logging.info(f"Symbol {symbol} not found in any of the HDF5 files.")
        return None
    else:
        for col in combined_data.select_dtypes(include=['object']).columns:
            combined_data[col] = pd.to_numeric(combined_data[col], errors='ignore')

        if 'date' in combined_data.columns:
            combined_data.sort_values(by='date', inplace=True)
        else:
            logging.info(f"'date' column not found in combined data for symbol {symbol}. Sorting by index instead.")
            combined_data.sort_index(inplace=True)
        logging.info(f"{symbol}: successfully combined all HDF5 files.")
        return combined_data



def get_company_name(symbol, h5_file_path, country_code='US'):
    try:
        # Read the DataFrame for the entire country from the HDF5 file
        symbols_df = pd.read_hdf(h5_file_path, key=f'/{country_code}')

        # Filter by the specific symbol to get the company name
        company_name_row = symbols_df[symbols_df['Code'] == symbol]
        if not company_name_row.empty:
            return company_name_row['Name'].iloc[0]
        else:
            logging.error(f"No data for symbol {symbol} in the DataFrame.")
            return "Unknown"
    except KeyError:
        logging.error(f"No object named {country_code} in the file {h5_file_path}")
        return "Unknown"


def add_content_before_plot(symbol, max_retries=3, sleep_duration=5):
    target_filename = f"{symbol}_quarterly.html"
    html_file_path = os.path.join(plots_dir, target_filename)

    if not os.path.exists(html_file_path):
        print(f"HTML file for symbol {symbol} not found.")
        return

    # Fetch additional info from General.h5
    exchange = symbol_exchange_map.get(symbol, 'Other')  # Default to 'Other' if exchange is not found
    h5_general_path = os.path.join(data_dir, f"{exchange}_General.h5")

    sector, industry, description, full_time_employees, updated_at, web_url, phone, address, name, exchange = read_general_info(symbol, h5_general_path, max_retries, sleep_duration)

    # Prepare new HTML content
    new_html_content = f"""
    <h1>{symbol} - {name} - {exchange}</h1>
    <p>Updated At: {updated_at}</p>
    <p><strong style='font-size: larger;'>WebURL:</strong> <a href='{web_url}' target='_blank'>{web_url}</a>
    <strong style='font-size: larger;'>Phone:</strong> {phone}
    <strong style='font-size: larger;'>Address:</strong> {address}<br>
    <strong style='font-size: larger;'>Sector:</strong> {sector}
    <strong style='font-size: larger;'>Industry:</strong> {industry}<br>
    <strong style='font-size: larger;'>Full Time Employees:</strong> {full_time_employees}</p><br>
    <strong style='font-size: larger;'>Description:</strong> {description}<br><br>
    """

    # Read valuation.h5 data for the symbol
    h5_valuation_path = os.path.join(data_dir, f"{exchange}_Valuation.h5")

    retries = 0
    valuation_data = None
    while retries < max_retries:
        try:
            with pd.HDFStore(h5_valuation_path, 'r') as store:
                if symbol in store:
                    valuation_data = store.get(symbol)
                else:
                    valuation_data = pd.DataFrame(columns=['SubCategory', 'Data'])
                    print(f"The symbol {symbol} valuation data does not exist in {h5_valuation_path}")
            break
        except Exception as e:
            if retries < max_retries - 1:
                print(f"An exception occurred while reading {h5_valuation_path}: {e}")
                print(f"Retrying in {sleep_duration} seconds...")
                time.sleep(sleep_duration)
                retries += 1
            else:
                print(f"Max retries reached. Could not read {h5_valuation_path}. Exiting...")
                raise

    # Add valuation_data to new_html_content
    valuation_html = "<h2>Valuation</h2><div style='display: flex; flex-wrap: wrap;'>"
    if valuation_data is not None:
        print(f"valuation_data: {valuation_data}")
        valuation_data['Data'] = pd.to_numeric(valuation_data['Data'], errors='coerce')

        for index, row in valuation_data.iterrows():
            data = row['Data']
            if pd.notnull(data):  # Check if 'Data' is not NaN
                if index in percentage_metrics:
                    formatted_data = "{:.2%}".format(float(data))
                elif isinstance(data, (int, float)) and abs(data) >= 1_000:  # Large numbers
                    formatted_data = format_large_number(data)
                elif isinstance(data, (int, float)):
                    print('formatting numbers with commas in Valuation')
                    print(f"data: {data}, type: {type(data)}")
                    formatted_data = f"{data:,.2f}"  # For other numbers, just format with commas
                else:
                    formatted_data = data  # For non-numeric data, leave as is
            else:
                # If data is NaN or non-numeric, leave as is
                formatted_data = row['Data']

            valuation_html += f"<div style='flex: 0 0 calc(33.333% - 10px); margin-right: 10px; margin-bottom: 10px;'>"
            valuation_html += f"<strong>{row['SubCategory']}</strong>: {formatted_data}</div>"
    valuation_html += "</div>"

    new_html_content += valuation_html  # Append valuation data to new_html_content



    # Read highlights.h5 data for the symbol
    h5_highlights_path = os.path.join(data_dir, f"{exchange}_Highlights.h5")
    retries = 0
    while retries < max_retries:
        try:
            with pd.HDFStore(h5_highlights_path, 'r') as store:
                if symbol in store:
                    highlights_data = store.get(symbol)
                else:
                    highlights_data = pd.DataFrame(columns=['SubCategory', 'Data'])
                    print(f"The symbol {symbol} highlights data does not exist in {h5_highlights_path}")
            break
        except Exception as e:
            if retries < max_retries - 1:
                print(f"An exception occurred while reading {h5_highlights_path}: {e}")
                print(f"Retrying in {sleep_duration} seconds...")
                time.sleep(sleep_duration)
                retries += 1
            else:
                print(f"Max retries reached. Could not read {h5_highlights_path}. Exiting...")
                raise

    # Add title before highlights
    new_html_content += "<h2>Highlights</h2>"
    # Add highlights_data to new_html_content
    highlights_html = "<div style='display: flex; flex-wrap: wrap;'>"

    highlights_data['Data'] = highlights_data['Data'].apply(pd.to_numeric, errors='ignore')

    for index, row in highlights_data.iterrows():
        subcategory = row['SubCategory']
        data = row['Data']

        if subcategory in percentage_metrics:
            formatted_data = format_percentage(data)
        elif pd.notnull(data) and isinstance(data, (int, float)):
            if abs(data) >= 1_000:  # Large numbers
                print(f"{symbol}: {subcategory} is a large number. Formatting {data} with commas...")
                formatted_data = format_large_number(data)
            else:  # Other numeric data that is not a large number
                print(f"{symbol}: {subcategory} is a numeric value. Formatting {data} with commas...")
                formatted_data = f"{data:,.2f}"
        else:
            print(f'{symbol}: {subcategory} is a non-numeric value. Data: {data}')
            formatted_data = data  # For non-numeric data, leave as is

        highlights_html += f"""<div style='flex: 0 0 calc(33.333% - 10px); margin-right: 10px; margin-bottom: 10px;'>
                                <strong>{subcategory}</strong>: {formatted_data}
                               </div>"""
    highlights_html += "</div>"
    new_html_content += highlights_html  # App

    # Add the disclaimer to the end of new_html_content
    disclaimer = "<br><br><p><strong>Disclaimer:</strong> The last data point has been excluded from the Prophet prediction.</p>"
    new_html_content += disclaimer


    with open(html_file_path, 'r', encoding='utf-8') as f:
        html_content = f.read()

    # Parse the HTML content with BeautifulSoup
    soup = BeautifulSoup(html_content, 'html.parser')

    # Find the div that contains the Plotly plot
    plot_div = soup.find('div', {'class': 'plotly-graph-div'})

    if plot_div:
        # Create a BeautifulSoup object from the new_content string
        new_content_soup = BeautifulSoup(new_html_content, 'html.parser')

        # Insert the new content before the Plotly plot
        plot_div.insert_before(new_content_soup)

        # Save the modified HTML back to disk
        with open(html_file_path, 'w', encoding='utf-8') as f:
            f.write(str(soup))
    else:
        print(f"Plotly plot not found in the HTML file {html_file_path}.")


def forecast_with_multiple_metrics(df: pd.DataFrame, periods: int = 4, save_dir: str = "plots/", use_all_data: bool = True, eod_price_data: str = 'eod_price_data.h5'):
    def plot_adjusted_close_from_h5(fig, symbol, hdf5_file_path, max_retries=3, sleep_duration=5):
        hdf5_file_path = data_dir + hdf5_file_path
        keys = access_hdf5_with_retries(hdf5_file_path, 'r', max_retries, sleep_duration)

        if f'/{symbol}' in keys:
            with pd.HDFStore(hdf5_file_path, 'r') as store:
                stock_data = store.get(symbol)

                # Add actual adjusted_close to the plot
                fig.add_trace(
                    go.Scatter(x=stock_data.index, y=stock_data['Adjusted_close'],
                               mode='lines+markers',
                               name='Actual Adjusted_close',
                               legendgroup='Actual',
                               line=dict(color='purple'),
                               showlegend=True),
                    row=1, col=1
                )

                prophet_df = stock_data.reset_index()[['Date', 'Adjusted_close']].rename(
                    columns={'Date': 'ds', 'Adjusted_close': 'y'})

                best_mae = float('inf')
                best_forecast = None
                best_mode = None

                for mode in ['additive', 'multiplicative']:
                    model = Prophet(
                        seasonality_mode=mode,
                        yearly_seasonality=True,
                        weekly_seasonality=False,
                        daily_seasonality=False)
                    model.fit(prophet_df)
                    future = model.make_future_dataframe(periods=365)  # 1-year prediction
                    forecast = model.predict(future)

                    mae = calculate_mae(prophet_df['y'], forecast.loc[:len(prophet_df) - 1, 'yhat'])

                    if mae < best_mae:
                        best_mae = mae
                        best_forecast = forecast
                        best_mode = mode

                # Add the best forecast to the plot
                fig.add_trace(
                    go.Scatter(x=best_forecast['ds'], y=best_forecast['yhat'],
                               mode='lines',
                               name='Adjusted_close Forecast',
                               legendgroup='Forecast',
                               line=dict(color='green'),
                               showlegend=True),
                    row=1, col=1
                )

                # Add yhat_upper and yhat_lower to the plot
                fig.add_trace(
                    go.Scatter(x=best_forecast['ds'], y=best_forecast['yhat_upper'],
                               mode='lines',
                               name='Upper Forecast',
                               legendgroup='Upper Forecast',
                               line=dict(color='blue', dash='dash'),
                               showlegend=True),
                    row=1, col=1
                )

                fig.add_trace(
                    go.Scatter(x=best_forecast['ds'], y=best_forecast['yhat_lower'],
                               mode='lines',
                               name='Lower Forecast',
                               legendgroup='Lower Forecast',
                               line=dict(color='blue', dash='dash'),
                               showlegend=True),
                    row=1, col=1
                )

        else:
            logging.warning(f"No Adjusted_close data found for symbol: {symbol}")

    pd.set_option('display.max_columns', None)
    logging.debug(df.columns.tolist())

    # Get the columns that are both in df.columns and reordered_columns
    common_columns = [col for col in reordered_columns if col in df.columns]

    # Check if common_columns is not empty
    if common_columns:
        # Reorder the DataFrame using the common columns
        df = df[common_columns + [col for col in df.columns if col not in common_columns]]
    else:
        logging.warning("No common columns between df and reordered_columns.")

    metrics = [col for col in df.columns if df[col].dtype in ['int64', 'float64']]

    logging.info(metrics)

    subplot_titles = []
    for metric in metrics:
        definition = metric_definitions.get(metric, 'No definition available')
        wrapped_definition = wrap_text(definition, 160)  # Adjust the max_length as needed
        title = f"<b>{metric}</b><br><span style='font-size: smaller;'>{wrapped_definition}</span>"
        subplot_titles.append(title)

    fig = make_subplots(rows=len(subplot_titles) + 1, cols=1, subplot_titles=['Adjusted_close'] + subplot_titles)

    symbol = df['Symbol'].iloc[0]

    plot_adjusted_close_from_h5(fig, symbol, eod_price_data)




    # Fetch the company_name
    h5_symbols = os.path.join(data_dir, "symbols.h5")
    company_name = get_company_name(symbol, h5_symbols)

    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    if not use_all_data:
        disclaimer = "Disclaimer: The last data point has been excluded from the Prophet prediction. \n "
        disclaimer += "There may not be enough data points for Prophet to make accurate predictions. \n"
        fig.add_annotation(
            dict(
                x=0,
                y=1.1,
                xref="paper",
                yref="paper",
                text=disclaimer,
                showarrow=False,
                font=dict(size=16)
            )
        )


    for i, metric in enumerate(metrics):
        row = i + 2

        plotting_df = df[['date', metric]].copy().dropna()
        prophet_df = plotting_df.copy()

        # Convert the specified metrics to percentages by multiplying by 100
        if metric in ['ROE', 'Earnings_Yield', 'Dividend_Yield']:
            prophet_df[metric] = prophet_df[metric] * 100  # Convert to percentage
            plotting_df[metric] = plotting_df[metric] * 100  # Convert to percentage


        if not use_all_data:
            prophet_df = prophet_df.iloc[:-1, :]

        prophet_df.rename(columns={'date': 'ds', metric: 'y'}, inplace=True)

        if prophet_df.shape[0] < 2:
            logging.warning(f"Skipping {metric} because it has less than 2 non-NaN rows.")
            continue

        best_mae = float('inf')
        best_forecast = None
        best_mode = None

        for mode in ['additive', 'multiplicative']:
            model = Prophet(
                seasonality_mode=mode,
                yearly_seasonality=True,
                weekly_seasonality=False,
                daily_seasonality=False)
            model.fit(prophet_df)
            future = model.make_future_dataframe(periods=periods, freq='Q')
            forecast = model.predict(future)

            mae = calculate_mae(prophet_df['y'], forecast.loc[:len(prophet_df) - 1, 'yhat'])

            if mae < best_mae:
                best_mae = mae
                best_forecast = forecast
                best_mode = mode

        logging.info(f"Best seasonality mode for {metric} is {best_mode} with MAE {best_mae}")


        hover_format = '%{x}: %{y:.2f}%' if metric in ['ROE', 'Earnings_Yield', 'Dividend_Yield'] else '%{x}: %{y:,}'

        fig.add_trace(
            go.Scatter(x=plotting_df['date'], y=plotting_df[metric],
                       mode='lines+markers',
                       name=f'Actual {metric}',
                       legendgroup='Actual',
                       line=dict(color='purple'),
                       showlegend=(i == 0),
                       hovertemplate=hover_format),
            row=row, col=1
        )

        # Plot the forecasted metric data
        fig.add_trace(
            go.Scatter(x=best_forecast['ds'],
                       y=best_forecast['yhat'],
                       mode='lines+markers',
                       name=f'Forecasted {metric}',
                       legendgroup='Forecast',
                       line=dict(color='green'),
                       showlegend=(i == 0),
                       hovertemplate=hover_format),
            row=row, col=1
        )

        # Plot the upper forecast interval
        fig.add_trace(
            go.Scatter(x=best_forecast['ds'], y=best_forecast['yhat_upper'],
                       mode='lines',
                       name=f'Upper Bound {metric}',
                       legendgroup='Upper Forecast',
                       line=dict(color='blue', dash='dash'),
                       showlegend=(i == 0),
                       hovertemplate=hover_format),
            row=row, col=1
        )

        # Plot the lower forecast interval
        fig.add_trace(
            go.Scatter(x=best_forecast['ds'], y=best_forecast['yhat_lower'],
                       mode='lines',
                       name=f'Lower Bound {metric}',
                       legendgroup='Lower Forecast',
                       line=dict(color='blue', dash='dash'),
                       showlegend=(i == 0),
                       hovertemplate=hover_format),
            row=row, col=1
        )


    fig.update_layout(
        height=350 * len(metrics),
        width=1800,
        title_font_size=16  # You can change this value as needed
    )

    plot_file_path = os.path.join(save_dir, f"{symbol}_quarterly.html")

    fig.write_html(plot_file_path)
    logging.info(f"Saved Prophet plot for {company_name} to {plot_file_path}")

    add_content_before_plot(symbol)

def get_symbols(h5_file_path, key='US'):
    """
    Open an HDF5 file and populate the global dictionary symbol_exchange_map
    where the symbol is the key and the exchange is the value.

    Parameters:
        h5_file_path (str): The path to the HDF5 file.
        key (str): The key to use when reading the HDF5 file. Default is 'US'.

    Returns:
        None
    """

    h5_file_path = Path(h5_file_path)

    # Check if the file exists
    if not h5_file_path.exists():
        logging.info(f"The file {h5_file_path} does not exist.")
        return

    try:
        # Read the DataFrame from the HDF5 file
        df = pd.read_hdf(h5_file_path, key=key)

        # Check if 'Code' and 'Exchange' columns exist
        if 'Code' not in df.columns or 'Exchange' not in df.columns:
            logging.info(f"The 'Code' or 'Exchange' column does not exist in the DataFrame.")
            return

        # Populate the global symbol_exchange_map
        global symbol_exchange_map
        symbol_exchange_map = dict(zip(df['Code'], df['Exchange']))
        return list(symbol_exchange_map.keys())
    except Exception as e:
        logging.error(f"An error occurred: {e}")
        return

symbols = get_symbols(data_dir + 'symbols.h5', key='US')

symbols = ['UBER', 'LYFT', 'WE', 'IEP', 'AAPL'] + sp500 + largest_banks + companies_with_treasuries + homebuilders
symbols = ['GDHG', 'IEP']
symbols = ['GOOG']



h5_files = set()  # Use a set to automatically handle duplicates

for symbol in symbols:
    exchange = symbol_exchange_map.get(symbol, 'Other')  # Default to 'Other' if not found
    current_h5_files = [
        f"{data_dir}/{exchange}_Cash_Flow_quarterly.h5",
        f"{data_dir}/{exchange}_Balance_Sheet_quarterly.h5",
        f"{data_dir}/{exchange}_Income_Statement_quarterly.h5",
        f"{data_dir}/{exchange}_Ratios.h5"
    ]
    h5_files.update(current_h5_files)  # Update the set with the new file paths

    logging.info(f'Processing {symbol}')
    try:
        # Pass only the HDF5 files corresponding to the current symbol's exchange
        df = fetch_data_for_symbol_from_multiple_h5(list(current_h5_files), symbol, max_retries=3, sleep_duration=5)


        if df is None:
            logging.warning(f"No data found for symbol {symbol}. Skipping to the next symbol.")
            continue  # Skip to the next iteration of the loop    forecast_with_multiple_metrics(df, periods=4, save_dir="plots/", use_all_data=False)

        logging.info(df.head(20))
        forecast_with_multiple_metrics(df)
    except Exception as e:
        logging.error(f"An unexpected error occurred while processing symbol {symbol}: {e}")

What is an HDF5 File? A Deep Dive into Hierarchical Data Format

Introduction

If you’ve spent any time working with large datasets, especially in scientific computing, machine learning, or finance, you’ve probably come across HDF5 files. But what are they, and why are they so popular for storing complex data structures? In this blog post, we’ll explore the HDF5 file format, its architecture, and its various use-cases.

What is HDF5?

HDF5 stands for Hierarchical Data Format version 5. It is a file format that allows for a flexible and efficient way of storing complex data relationships, including metadata. Developed by the HDF Group, it is designed to store and organize large amounts of data, and to facilitate the sharing of that data with others.

Key Features

Hierarchical Structure

The “Hierarchical” in HDF5 refers to its tree-like structure, much like a file system. This allows for a more organized way to store datasets and metadata. In an HDF5 file, you can have groups that contain other groups or datasets, akin to folders and files in a computer’s file system.

High Performance

HDF5 is designed with performance in mind, allowing for fast read and write operations. This is crucial when working with large datasets that need to be accessed or modified frequently.

Extensible

You can add new data to an existing HDF5 file without disturbing its structure, making it a highly extensible format. This is particularly useful in scientific research and other evolving projects.

Portability

HDF5 files are easily shareable and are compatible across various platforms and languages. Libraries for interacting with HDF5 files are available in languages like Python, C, C++, and Java, among others.

Compression

HDF5 files support on-the-fly compression, saving valuable disk space. This is particularly useful when dealing with very large datasets.

Use Cases

Scientific Computing

In fields like physics, astronomy, and bioinformatics, HDF5 is often the go-to solution for handling complex data relationships and large datasets.

Financial Data

In finance, HDF5 files are commonly used to store time-series data, like stock prices, and complex financial models, making it easier for data analysts and algorithmic traders to backtest strategies.

Machine Learning

When dealing with large and complex datasets for training machine learning models, the efficiency and flexibility of HDF5 make it an ideal choice.

How to Interact with HDF5 Files

Python

In Python, one of the most commonly used libraries for interacting with HDF5 files is h5py. Here’s a quick example of how to create an HDF5 file and add a dataset:

import h5py

with h5py.File('example.h5', 'w') as f:
    dset = f.create_dataset("my_dataset", (100,), dtype='i')

Conclusion

HDF5 files offer a robust, efficient, and flexible way to store complex data structures. Whether you are a researcher, a data scientist, or a financial analyst, understanding how to use HDF5 files can be a significant advantage in your work.

So the next time you need to store large amounts of structured data, consider using HDF5. It’s a powerful tool that can simplify your data management tasks and improve the efficiency of your data operations.

Using Python to save corporate financial data locally from EODHD

In a previous post, I showed how to store symbol data from EODHD. The purpose of this code is to now iterate through all those symbols and grab the corporate financial data from EODHD using their API. If you’re interested in downloading Open, High, Low, Close, Adjusted Cose, and Volume data you can find that in this blog post.

Output

Script Overview

The Python script is primarily designed to perform the following tasks:

  1. Fetch financial fundamentals for a list of stock symbols.
  2. Store this information in an HDF5 file for optimized data storage.
  3. Handle various categories of financial data including ‘General’, ‘Earnings’, ‘Financials’, and more.
  4. Log activities for better debugging and monitoring.

The script relies on the EOD Historical Data API and uses Python libraries like Pandas, Requests, and h5py.

Utility Functions

  • save_dataframe_to_h5(): Saves a Pandas DataFrame to an HDF5 file.
  • key_exists_in_h5(): Checks if a key exists in an HDF5 file.
  • print_all_keys_in_h5(): Prints all keys in an HDF5 file.
  • symbol_exists_in_h5(): Checks if a symbol exists in an HDF5 file.
  • convert_columns_to_numeric(): Converts DataFrame columns to numeric types, if applicable.
  • update_dataframe(): Updates the DataFrame with new rows of data.

Main Function: fetch_and_store_fundamentals()

This function performs the core operation of fetching and storing data. It takes an API token, a list of stock symbols, a data directory, and some optional parameters as arguments.

The function goes through the following steps for each symbol:

  1. Check If Data Exists: If the skip_existing flag is true, it checks whether the data already exists in the HDF5 storage.
  2. Fetch Data: Downloads JSON data for the stock symbol from the EOD Historical Data API.
  3. Data Processing: Processes different categories of data (General, Financials, Earnings, etc.) and stores them in separate HDF5 files.
  4. Log Update: Updates a log file with the timestamp of the last fetch for each symbol.

Helper Function: get_symbols()

This function populates a global dictionary, symbol_exchange_map, mapping stock symbols to their respective exchanges. It reads this information from an existing HDF5 file.

Code Execution

Finally, the script fetches a list of stock symbols using get_symbols() and then calls the fetch_and_store_fundamentals() function to perform the data fetching and storing.

Conclusion

Automating the process of financial data collection and storage is a crucial step in building robust trading algorithms or investment strategies. This script serves as a foundational block for such endeavors, allowing you to focus more on data analysis rather than data collection.

Code

from keys import api_token
import time
import h5py


data_dir = '/home/shared/algos/data/'
logs_dir = '/home/shared/algos/eodhd_data/logs/'

import logging
import pandas as pd
# Configure Pandas to display all columns
pd.set_option('display.max_columns', None)
import requests
import io
from pathlib import Path
from tqdm import tqdm
import json
import os
from datetime import datetime, timedelta

from dictionaries_and_lists import homebuilders, sp500, companies_with_treasuries, largest_banks


logging.basicConfig(level=logging.INFO)
pd.set_option('display.expand_frame_repr', False)


symbol_exchange_map = {}

def save_dataframe_to_h5(df, h5_path, key, drop_columns=None):
    convert_columns_to_numeric(df)  # Assuming this function converts numeric columns

    column_type_mapping = {
        'Symbol': 'object',
        'Country': 'object',
        'reportDate': 'datetime',
        'filing_date': 'datetime',
        'date': 'datetime',
        'Date': 'datetime',
        'currency_symbol': 'object',
        'currency': 'object',
        'beforeAfterMarket': 'object'
    }

    for column, dtype in column_type_mapping.items():
        if column in df.columns:
            if dtype == 'datetime':
                df[column] = pd.to_datetime(df[column], errors='coerce')
            else:
                df[column] = df[column].astype(dtype)

    if drop_columns:
        df.drop(columns=drop_columns, errors='ignore', inplace=True)

    try:
        df.to_hdf(h5_path, key=key, mode='a')
    except Exception as e:
        logging.error(f"Failed to save DataFrame to HDF5 file {h5_path} with key {key}. Error: {e}")

def key_exists_in_h5(h5_filename, key):
    with pd.HDFStore(h5_filename, 'r') as store:
        return key in store

def print_all_keys_in_h5(h5_filename):
    with pd.HDFStore(h5_filename, 'r') as store:
        print("Keys in HDF5 file:")
        for key in store.keys():
            print(key)

def symbol_exists_in_h5(h5_filepath, symbol):
    try:
        with pd.HDFStore(h5_filepath, 'r') as store:
            return f'/{symbol}' in store.keys()
    except HDF5Error:
        return False

def convert_columns_to_numeric(df):
    for col in df.columns:
        df[col] = pd.to_numeric(df[col], errors='ignore')


def update_dataframe(df, symbol, sub_category, sub_category_data, time_frame=None):
    row_data = {
        'Symbol': symbol,
        'SubCategory': sub_category,
    }

    if time_frame is not None:
        row_data['TimeFrame'] = time_frame

    if isinstance(sub_category_data, dict):
        row_data.update(sub_category_data)
    else:
        row_data['Data'] = str(sub_category_data)

    new_row = pd.DataFrame([row_data])
    df = pd.concat([df, new_row], ignore_index=True)
    return df


def fetch_and_store_fundamentals(api_token, symbols, data_dir, skip_existing=False, hours_to_skip=72):
    log_file = logs_dir + 'symbol_time_log.json'
    symbol_time_dict = {}

    try:
        with open(log_file, 'r') as f:
            content = f.read()
        symbol_time_dict = json.loads(content)
    except json.JSONDecodeError as e:
        logging.error(
            f"JSON Decode Error at line {e.lineno}, column {e.colno}. Content around error position: {content[e.pos - 10:e.pos + 10]}")
    except FileNotFoundError:
        logging.info(f"File {log_file} not found. An empty dictionary will be used.")
    except Exception as e:
        logging.error(f"An unexpected error occurred: {e}")


    for symbol in tqdm(symbols, desc='Fetching and storing fundamentals'):
        last_downloaded_time = symbol_time_dict.get(symbol, None)
        if last_downloaded_time:
            last_downloaded_time = datetime.fromisoformat(last_downloaded_time)
            time_since_last_download = datetime.now() - last_downloaded_time
            if time_since_last_download < timedelta(hours=hours_to_skip):
                logging.info(f"Data for symbol {symbol} was downloaded recently. Skipping...")
                continue

        if skip_existing:
            # Assuming the symbol is known at this point in your code
            exchange = symbol_exchange_map.get(symbol, '')  # Get exchange name from global dict

            # Create the new h5 path with the exchange name prepended
            h5_path_check = Path(data_dir, f"{exchange}_General.h5") if exchange else Path(data_dir, "General.h5")

            if h5_path_check.exists() and key_exists_in_h5(h5_path_check, f'/{symbol}'):
                logging.info(f"Data for symbol {symbol} already exists. Skipping...")
                continue

        logging.info(f"\n{symbol}: Downloading from EODHD...")

        try:
            url = f"https://eodhd.com/api/fundamentals/{symbol}.US?api_token={api_token}"
            response = requests.get(url)
        except ConnectionError:
            logging.error(f"ConnectionError occurred while fetching data for symbol {symbol}. Retrying in 60 seconds.")
            time.sleep(60)
            continue
        except ConnectionRefusedError:
            logging.error(
                f"ConnectionRefusedError occurred while fetching data for symbol {symbol}. Retrying in 60 seconds.")
            time.sleep(60)
            continue
        except Exception as e:
            logging.error(f"An unexpected error occurred: {e}")
            continue

        if response.status_code != 200:
            logging.error(f"Failed to fetch data for symbol {symbol}. HTTP Status Code: {response.status_code} \n Sleeping for 60 seconds")
            time.sleep(60)  # Sleep for 60 seconds
            continue  # Continue to next iteration

        json_data = response.json()

        logging.info(f"\n{symbol}: Finished downloading from EODHD...")

        # Check if the logging level is set to DEBUG
        if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
            json_dump_file = logs_dir + 'api_response_output.txt'

            with open(json_dump_file, 'w') as f:
                f.write("JSON Data:\n")
                f.write(json.dumps(json_data, indent=4))

        for category, category_data in json_data.items():
            if category_data is None:
                logging.warning(f"Data for category {category} is None.")
                continue

            exchange = symbol_exchange_map.get(symbol, 'Other')
            h5_path = Path(data_dir, f"{exchange}_{category}.h5")
            df = pd.DataFrame()

            if category == 'ESGScores':
                continue

            elif category == 'Holders':
                for holder_type, holder_data in category_data.items():
                    h5_path = Path(data_dir, f"{exchange}_Holders_{holder_type}.h5")

                    for holder_id, holder_info in holder_data.items():
                        df = update_dataframe(df, symbol, holder_type, holder_info)
                    save_dataframe_to_h5(df, h5_path, key=symbol, drop_columns=['SubCategory'])
                    logging.info(f"{symbol} finished processing category Holders")

            elif category == 'SplitsDividends':
                logging.debug(f"Processing 'SplitsDividends' category. Data: {category_data}")

                # h5_path = Path(data_dir, f"{category}.h5")
                for sub_key, sub_data in category_data.items():
                    logging.debug(f"Processing key: {sub_key}")
                    logging.debug(f"Data for key {sub_key}: {sub_data}")

                    if sub_key == 'NumberDividendsByYear':
                        nested_h5_path = Path(data_dir, f"{exchange}_SplitsDividends_{sub_key}.h5")

                        nested_df = pd.DataFrame()

                        for item_key, item_data in sub_data.items():
                            logging.debug(f"Item data: {item_key}")
                            nested_df = update_dataframe(nested_df, symbol, sub_key, item_data)

                        # Sort and remove duplicates based on the Year column
                        # Before sorting, check if 'Year' exists
                        if 'Year' in nested_df.columns:
                            nested_df = nested_df.sort_values(by=['Year'])
                            nested_df.drop_duplicates(subset=['Year'], keep='last', inplace=True)

                        save_dataframe_to_h5(nested_df, nested_h5_path, key=symbol, drop_columns=['SubCategory'])
                    else:
                        df = update_dataframe(df, symbol, sub_key, sub_data)

                save_dataframe_to_h5(df, h5_path, key=symbol)
                logging.info(f"{symbol} finished processing category SplitsDividends")


            elif category == 'General':
                logging.debug(f"Processing 'General' category. Data: {category_data}")
                for sub_key, sub_data in category_data.items():
                    logging.debug(f"Processing key: {sub_key}")
                    logging.debug(f"Data for key {sub_key}: {sub_data}")

                    if sub_key in ['Listings', 'Officers']:
                        continue  # Skip 'Listings' and 'Officers'

                    df = update_dataframe(df, symbol, sub_key, sub_data)


                save_dataframe_to_h5(df, h5_path, key=symbol)
                logging.info(f"{symbol} finished processing category General")


            elif category == 'Financials':
                # Iterate through report types like 'Balance Sheet', 'Cash Flow', 'Income Statements'
                for report_type, report_data in category_data.items():
                    # Iterate through time frames like 'annual' or 'quarterly'
                    for time_frame, time_frame_data in report_data.items():
                        if time_frame == 'currency_symbol':
                            continue  # Skip the 'currency_symbol'

                        # Create a specific .h5 path for each combination of report_type and time_frame
                        h5_path = Path(data_dir, f"{exchange}_{report_type}_{time_frame}.h5")

                        df = pd.DataFrame()

                        # Update the DataFrame with financial data
                        for sub_category, sub_category_data in time_frame_data.items():
                            df = update_dataframe(df, symbol, sub_category, sub_category_data)

                        # Save this specific DataFrame to its respective .h5 file
                        save_dataframe_to_h5(df, h5_path, key=symbol, drop_columns=['SubCategory'])
                        logging.info(f"{symbol} finished processing category Financials")


            elif category == 'Earnings':
                for sub_category, sub_category_data in category_data.items():
                    # Create a specific .h5 path for each sub-category
                    h5_path = Path(data_dir, f"{exchange}_{category}_{sub_category}.h5")

                    df = pd.DataFrame()


                    for date_entry, date_entry_data in sub_category_data.items():  # Iterate through each date entry in the subcategory
                        if date_entry == 'currency_symbol':
                            continue

                        # Adding a date field to each entry
                        date_entry_data['Date'] = date_entry

                        # Update the dataframe with new row
                        df = update_dataframe(df, symbol, sub_category, date_entry_data)

                    # Save this specific DataFrame to its respective .h5 file
                    if 'SubCategory' in df.columns:
                        save_dataframe_to_h5(df, h5_path, key=symbol, drop_columns=['SubCategory'])  # Drop the 'SubCategory' column
                    else:
                        save_dataframe_to_h5(df, h5_path, key=symbol)
                    logging.info(f"{symbol} finished processing category Earnings")


            elif category == 'outstandingShares':
                for time_frame, time_frame_data in category_data.items():  # time_frame can be 'annual' or 'quarterly'
                    # Create a specific .h5 path for each time frame
                    h5_path = Path(data_dir, f"{exchange}_{category}_{time_frame}.h5")

                    df = pd.DataFrame()


                    for entry_id, entry_data in time_frame_data.items():
                        df = update_dataframe(df, symbol, time_frame, entry_data)

                    # Save this specific DataFrame to its respective .h5 file
                    save_dataframe_to_h5(df, h5_path, key=symbol, drop_columns=['SubCategory'])
                    logging.info(f"{symbol} finished processing category outstandingShares")


            elif category == 'ETF_Data':
                # DataFrame for the top-level ETF_Data, excluding subcategories that will be handled separately
                top_level_df = pd.DataFrame()

                # Dictionary to hold top-level data for this stock symbol
                top_level_data = {'Symbol': symbol}

                for sub_category, sub_category_data in category_data.items():
                    if sub_category in ['Asset_Allocation', 'World_Regions', 'Sector_Weights', 'Fixed_Income', 'Top_10_Holdings', 'Holdings', 'Valuations_Growth', 'Market_Capitalisation', 'MorningStar', 'Performance', ]:
                        # Create a specific .h5 path for each sub-category
                        h5_path = Path(data_dir, f"{exchange}_ETF_{sub_category}.h5")

                        df = pd.DataFrame()

                        # Update the DataFrame with sub-category data
                        for item_key, item_data in sub_category_data.items():
                            df = update_dataframe(df, symbol, item_key, item_data)

                        # Save this specific DataFrame to its respective .h5 file
                        save_dataframe_to_h5(df, h5_path, key=symbol)

                    else:
                        # Populate the top-level data dictionary
                        top_level_data[sub_category] = sub_category_data

                # Convert the top-level data dictionary to a DataFrame and append it to top_level_df
                new_row = pd.DataFrame([top_level_data])
                top_level_df = pd.concat([top_level_df, new_row], ignore_index=True)

                # Save the top-level ETF_Data DataFrame to its respective .h5 file
                top_level_h5_path = Path(data_dir, "ETF_Data.h5")
                save_dataframe_to_h5(top_level_df, top_level_h5_path, key=symbol)
                logging.info(f"{symbol} finished processing category ETF_Data")

            else:
                logging.debug(f'Processing other category {category}')

                if h5_path.exists() and symbol_exists_in_h5(h5_path, symbol):
                    df = pd.read_hdf(h5_path, key=symbol)
                else:
                    df = pd.DataFrame()

                if isinstance(category_data, dict):
                    for sub_category, sub_category_data in category_data.items():
                        df = update_dataframe(df, symbol, sub_category, sub_category_data)
                else:
                    df = update_dataframe(df, symbol, category, category_data)

                save_dataframe_to_h5(df, h5_path, key=symbol)
                logging.info(f"{symbol} finished processing category {category}")

        logging.info(f"{symbol} updating symbol status log file.")
        symbol_time_dict[symbol] = datetime.now().isoformat()
        with open(log_file, 'w') as f:
            json.dump(symbol_time_dict, f)

        logging.info(f"{symbol} finished processing")


def get_symbols(h5_file_path, key='US'):
    """
    Open an HDF5 file and populate the global dictionary symbol_exchange_map
    where the symbol is the key and the exchange is the value.

    Parameters:
        h5_file_path (str): The path to the HDF5 file.
        key (str): The key to use when reading the HDF5 file. Default is 'US'.

    Returns:
        None
    """

    h5_file_path = Path(h5_file_path)

    # Check if the file exists
    if not h5_file_path.exists():
        logging.info(f"The file {h5_file_path} does not exist.")
        return

    try:
        # Read the DataFrame from the HDF5 file
        df = pd.read_hdf(h5_file_path, key=key)

        # Check if 'Code' and 'Exchange' columns exist
        if 'Code' not in df.columns or 'Exchange' not in df.columns:
            logging.info(f"The 'Code' or 'Exchange' column does not exist in the DataFrame.")
            return

        # Populate the global symbol_exchange_map
        global symbol_exchange_map
        symbol_exchange_map = dict(zip(df['Code'], df['Exchange']))
        return list(symbol_exchange_map.keys())
    except Exception as e:
        logging.error(f"An error occurred: {e}")
        return


symbols = get_symbols(data_dir + 'symbols.h5', key='US')

fetch_and_store_fundamentals(api_token, symbols, data_dir, skip_existing=True, hours_to_skip=72)