Recently, I encountered an unexpected challenge: a water leak beneath the slab of my house. The ordeal had me up until 1 AM, rerouting the line through my attic with PEX piping. Amidst this late-night task, a thought occurred to me: could machine learning and forecasting have helped me detect this leak earlier, based on my water bill consumption?
I wrote some Python code outlined below that uses statmodels and SARIMAX to predict consumption.
I now wonder why municipalities aren’t incorporating machine learning into data like this to send notices to customers in advance of potential leaks. I imagine this could save millions of gallons of water each year. Full code and explanation follows.
Data Upload and Preparation:
The program starts by uploading a CSV file containing water usage data (in gallons) and the corresponding dates. The CSV must have two column titles date and gallons in order for this to work. This data is then processed to ensure it’s in the correct format. Dates are sorted, and any missing values are filled to maintain continuity.
Creating a Predictive Model:
I used the SARIMAX model from the statsmodels library, a powerful tool for time series forecasting. The model considers both the seasonal nature of water usage and any underlying trends or cycles.
Making Predictions and Comparisons:
The program forecasts future water usage and compares it with actual data. By analyzing past consumption, it can predict what typical usage should look like and flag any significant deviations.
Visualizing the Data:
The real power of this program lies in its visualization capabilities. Using Plotly, a versatile graphing library, the program generates an interactive chart. It not only shows actual water usage but also plots predicted values and their confidence intervals.
Highlighting Historical Data:
To provide context, the chart also includes historical data as reference points. These are shown as small horizontal lines, representing the same month in previous years.
I started off wanting to analyze the sustainability of mortgage buydowns by home builders. But as it happens, my ADHD had other plans. Three weeks later, here we are with code that looks for trends and anomalies in corporate financials. The code generates detailed HTML profiles for in-depth financial analysis of stocks.
In a series of previous posts, I’ve explained the extraction of corporate financial data and stock price data. Now, let’s go a step further by utilizing Plotly and Facebook’s Prophet library to predict these financial metrics and stock prices. It’s crucial to note that the goal here isn’t to necessarily predict future values but to model the data in such a way that we can quickly spot any anomalies. In this tutorial, we’ll be extracting data from HDF5 files, which were created in previous posts, so make sure to check those out first.
Generated HTML Output: A Preview of What the Code Produces
The code generates an in-depth profile of any stock specified. In this example, we take a look at Google. The output is a structured HTML file that includes a variety of information sections. First, it provides basic details such as the company’s name, ticker symbol, and the date of the most recent update. Next, it lists essential contact information, including the company’s web URL, phone number, and physical address. The sector and industry in which the company operates, as well as the number of full-time employees, are also displayed. The output then delves into valuation metrics like P/E ratios, enterprise value, and other financial ratios. Finally, it highlights key financial data points, such as market capitalization, EBITDA, and various earnings and revenue estimates. Overall, the code produces a multi-faceted profile that serves as a valuable resource for anyone looking to understand Alphabet Inc’s business and financial standing in great detail.
Forecasting Future Adjusted Close Prices with Facebook Prophet: A One-Year Outlook
This plot visualizes the historical and predicted adjusted close prices for a particular asset, utilizing the Facebook Prophet algorithm for the forecast. The x-axis represents the timeline, extending from the earliest available historical data to one year into the future. The y-axis shows the adjusted close prices. Historical prices are plotted as solid purple lines. The predicted value is a solid green line with blue dotted lines as the high and low predictions. This predictive model leverages the power of Facebook Prophet to analyze seasonal and trend components in the historical data, providing a one-year outlook on potential price movements.
Forecasting Future Financial Ratios with Facebook Prophet: A One-Year Outlook
Financial ratios are key indicators of a company’s financial health and performance. They offer crucial insights into various aspects like profitability, liquidity, and valuation. In this section, we’ll leverage the predictive power of Facebook Prophet to forecast these ratios over the next year.
Forecasting Future Earnings and Profitability Metrics from the Income Statement with Facebook Prophet: A One-Year Outlook
These plots present a comprehensive visualization of key earnings and profitability metrics extracted from the income statement, including Net Income, Operating Income, Total Revenue, Gross Profit, and Free Cash Flow. The financial numbers are then forward projected using Facebook Prophet. These predictions not only display the expected trends for the next year but also encompass upper and lower bounds, providing a range of possible outcomes. This integration of historical data with advanced forecasting techniques offers a nuanced understanding of the company’s financial trajectory.
Forecasting Future Balance Sheet Metrics with Facebook Prophet: A One-Year Outlook
These plots offer an in-depth analysis of crucial balance sheet metrics such as Total Assets, Total Stockholder Equity, Retained Earnings, Long-Term Debt, and Total Liabilities. Leveraging the predictive power of Facebook Prophet, these key financial indicators are forecasted into the future. The projections not only illustrate the expected financial posture for the forthcoming year but also include upper and lower prediction intervals, giving a full spectrum of potential financial scenarios. By melding past balance sheet data with sophisticated predictive modeling, the plots provide a multi-dimensional view of the company’s expected financial stability and risk profile.
Forecasting Future Cash Flow Metrics with Facebook Prophet: A One-Year Outlook
These plots dive into essential cash flow metrics, specifically Total Cash from Operating Activities, Capital Expenditures, and Dividends Paid. These key figures are also extended into the future using Facebook Prophet’s forecasting capabilities. The resulting predictions not only map out the anticipated cash flow movements for the next year but are also bracketed by upper and lower confidence intervals, presenting a comprehensive range of financial possibilities. These plots offer a well-rounded perspective on the company’s future liquidity and capital allocation strategies.
Other Financial Metrics
In addition to the key financial metrics, the analysis delves into a diverse set of other metrics, plotting each meticulously. These metrics are categorized as follows:
Efficiency and Activity Metrics from Income Statement:
Research Development
Selling General Administrative
Stockholder’s Equity and Capital Structure:
Common Stock Shares Outstanding
Additional Important Metrics:
Net Debt
Net Receivables
Inventory
Accounts Payable
Total Current Assets
Total Current Liabilities
Less Critical Metrics:
Income Before Tax
Cost of Revenue
Intangible Assets
Earning Assets
Other Current Assets
Deferred Long-Term Liabilities
Other Current Liabilities
Common Stock
Capital Stock
Other Liabilities
Goodwill
Other Assets
Cash
Cash and Equivalents
Current Deferred Revenue
Short-Term Debt
Short/Long-Term Debt
Short/Long-Term Debt Total
Other Stockholder Equity
Property Plant Equipment
Long-Term Investments
Net Tangible Assets
Short-Term Investments
Other Metrics:
Effect of Accounting Charges
Income Tax Expense
Non-Operating Income Net Other
Selling and Marketing Expenses
Common Stock Total Equity
Preferred Stock Total Equity
Retained Earnings Total Equity
Treasury Stock
Accumulated Amortization
Non-Current Assets Other
Deferred Long-Term Asset Charges
Non-Current Assets Total
Capital Lease Obligations
Long-Term Debt Total
Non-Current Liabilities Other
Non-Current Liabilities Total
Negative Goodwill
Warrants
Preferred Stock Redeemable
Capital Surpluse
Liabilities And Stockholders Equity
Cash And Short-Term Investments
Property Plant And Equipment Gross
Property Plant And Equipment Net
Accumulated Depreciation
Total Cash Flows From Investing Activities
Total Cash From Financing Activities
Net Borrowings
Issuance Of Capital Stock
Investments
Change To Liabilities
Change To Operating Activities
Change In Cash
Begin Period Cash Flow
End Period Cash Flow
Depreciation
Other Cash Flows From Investing Activities
Change To Inventory
Change To Account Receivables
Sale Purchase Of Stock
Other Cash Flows From Financing Activities
Change To Net Income
Change Receivables
Cash Flows Other Operating
Exchange Rate Changes
Cash And Cash Equivalents Changes
Change In Working Capital
Stock Based Compensation
Other Non-Cash Items
Code Overview
Prerequisites
Python 3.x
Pandas
Plotly
Prophet
Logging
Pathlib
os
Collections
bs4 (BeautifulSoup)
time
scikit-learn
Helper Functions
The code includes several helper functions, such as:
calculate_mae: Calculates the Mean Absolute Error between the actual and predicted data.
access_hdf5_with_retries: Attempts to read an HDF5 file, retrying up to a specified number of times.
read_general_info: Reads general company information from an HDF5 file.
infer_dtype: Infers the data type of a Pandas Series.
fetch_data_for_symbol_from_multiple_h5: Fetches data for a specific symbol from multiple HDF5 files.
get_company_name: Gets the company name for a given symbol from an HDF5 file.
add_content_before_plot: Adds additional content before the plot in an HTML file.
Forecasting Function
The core of this code is the forecast_with_multiple_metrics function, which:
Accepts a DataFrame of financial metrics.
Performs time-series forecasting on each metric using Facebook’s Prophet.
Plots the actual and forecasted metrics using Plotly.
import pandas as pd
from pathlib import Path
import logging
import os
from plotly.subplots import make_subplots
from prophet import Prophet
import plotly.graph_objects as go
from collections import Counter
from pandas.api.types import is_numeric_dtype
from bs4 import BeautifulSoup
import time
from sklearn.metrics import mean_absolute_error
from math import log10
from dictionaries_and_lists import metric_definitions, reordered_columns, homebuilders, sp500, companies_with_treasuries, largest_banks, percentage_metrics
data_dir = '/home/shared/algos/data/'
plots_dir = '/home/shared/algos/eodhd_data/plots/'
logging.basicConfig(level=logging.DEBUG)
logging.basicConfig(level=logging.INFO)
# Configure Pandas to display all columns
pd.set_option('display.max_columns', None)
pd.set_option('display.expand_frame_repr', False)
symbol_exchange_map = {}
def wrap_text(text, max_length):
"""
Wraps text to a new line at the nearest whitespace of the max_length.
"""
wrapped_lines = []
while len(text) > max_length:
# Find nearest whitespace of the max_length
split_index = text.rfind(' ', 0, max_length + 1)
if split_index == -1: # No whitespace found, force split
split_index = max_length
wrapped_lines.append(text[:split_index])
text = text[split_index:].lstrip()
wrapped_lines.append(text)
return '<br>'.join(wrapped_lines)
def format_large_number(num):
print('Attempting to plot large number')
if num < 1_000:
return str(num)
magnitude = int(log10(num) // 3)
value = num / (10 ** (3 * magnitude))
return f"{value:.2f}{' KMBT'[magnitude]}"
def format_percentage(data):
try:
# Remove any non-numeric characters like commas and percentage signs
if isinstance(data, str):
data = data.replace(',', '').replace('%', '').strip()
# Convert to float and format as a percentage
return "{:.2%}".format(float(data))
except ValueError as e:
print(f"ValueError: Could not convert {data} to a percentage.")
return data # Return the original data if it cannot be converted
def calculate_mae(actual, predicted):
return mean_absolute_error(actual, predicted)
def access_hdf5_with_retries(hdf5_file_path, mode, max_retries=3, sleep_duration=5):
retries = 0
while retries < max_retries:
try:
with pd.HDFStore(hdf5_file_path, mode) as store:
return store.keys() # You can customize this part to return what you need
break # If successful, exit the while loop
except Exception as e: # Replace Exception with a more specific exception if possible
if retries < max_retries - 1:
logging.info(f"An exception occurred while reading {hdf5_file_path}: {e}")
logging.info(f"Retrying in {sleep_duration} seconds...")
time.sleep(sleep_duration)
retries += 1
else:
logging.error(f"Max retries reached. Could not read {hdf5_file_path}. Exiting...")
raise
def read_general_info(symbol, h5_general_path, max_retries=3, sleep_duration=5):
retries = 0
while retries < max_retries:
try:
with pd.HDFStore(h5_general_path, 'r') as store:
key = f'/{symbol}'
if key in store.keys():
general_info = store.get(key)
info_dict = dict(zip(general_info['SubCategory'], general_info['Data']))
sector = info_dict.get('Sector', 'N/A')
industry = info_dict.get('Industry', 'N/A')
description = info_dict.get('Description', 'N/A')
full_time_employees = info_dict.get('FullTimeEmployees', 'N/A')
updated_at = info_dict.get('UpdatedAt', 'N/A')
web_url = info_dict.get('WebURL', 'N/A')
phone = info_dict.get('Phone', 'N/A')
address = info_dict.get('Address', 'N/A')
name = info_dict.get('Name', 'N/A')
exchange = info_dict.get('Exchange', 'N/A')
return sector, industry, description, full_time_employees, updated_at, web_url, phone, address, name, exchange
else:
return ['N/A']*10
break
except Exception as e:
if retries < max_retries - 1:
print(f"An exception occurred while reading {h5_general_path}: {e}")
print(f"Retrying in {sleep_duration} seconds...")
time.sleep(sleep_duration)
retries += 1
else:
print(f"Max retries reached. Could not read {h5_general_path}. Exiting...")
raise
def infer_dtype(series):
sample = series.dropna().head(100) # Sample the first 100 non-null rows
# Debugging line
print(f"Sample for {series.name}: {sample}")
if sample.empty:
return None
if all(sample.apply(lambda x: isinstance(x, str))):
return 'object'
if is_numeric_dtype(sample) and all(sample.apply(lambda x: x == int(x))):
return 'int64'
if is_numeric_dtype(sample):
return 'float64'
return None
def fetch_data_for_symbol_from_multiple_h5(h5_filepaths, symbol, max_retries=3, sleep_duration=5):
combined_data = None
common_keys = ['Symbol', 'date', 'filing_date', 'currency_symbol']
for h5_filepath in h5_filepaths:
h5_path = Path(h5_filepath)
if not h5_path.exists():
logging.info(f"The file {h5_filepath} does not exist.")
continue
retries = 0
while retries < max_retries:
try:
with pd.HDFStore(h5_filepath, 'r') as store:
if f"/{symbol}" in store.keys():
logging.info(f"Symbol {symbol} found in {h5_filepath}.")
symbol_data = store.select(symbol)
if 'date' not in symbol_data.columns and 'Date' not in symbol_data.columns:
if isinstance(symbol_data.index, pd.DatetimeIndex):
symbol_data.reset_index(inplace=True)
symbol_data.rename(columns={'index': 'date'}, inplace=True)
logging.info(
f"'date' and 'Date' columns not found, but datetime index exists in {h5_filepath} for symbol {symbol}.")
symbol_data = symbol_data.reset_index().rename(columns={'index': 'date'})
else:
logging.warning(
f"'date' and 'Date' columns and datetime index not found in {h5_filepath} for symbol {symbol}.")
break
else:
# Ensure the date column is standardized to 'date'
if 'Date' in symbol_data.columns:
symbol_data.rename(columns={'Date': 'date'}, inplace=True)
# Rename 'netIncome' if it's present in the dataframe
if 'netIncome' in symbol_data.columns:
# Extract the type of financial statement from the filename
parts = h5_path.stem.split('_')
statement_type = parts[1]
metric_name = parts[2]
statement_readable = {
"Cash": "Cash Flow from Operating Activities",
"Income": "netIncome",
}.get(statement_type, statement_type)
metric_title = f"{metric_name} ({statement_readable})"
# Ensure statement_type is one of the reports where 'netIncome' appears before renaming
if statement_type in ["Income", "Cash"]:
symbol_data.rename(columns={'netIncome': metric_title}, inplace=True)
if combined_data is None:
combined_data = symbol_data
else:
# Dynamically adjust common keys based on available columns
keys_for_merge = [key for key in common_keys if
key in symbol_data.columns and key in combined_data.columns]
combined_data = pd.merge(combined_data, symbol_data, on=keys_for_merge, how='outer')
break
else:
logging.info(f"Symbol {symbol} not found in {h5_filepath}.")
break
except Exception as e:
if retries < max_retries - 1:
logging.info(f"An exception occurred while reading {h5_filepath}: {e}")
logging.info(f"Retrying in {sleep_duration} seconds...")
time.sleep(sleep_duration)
retries += 1
else:
logging.info(f"Max retries reached. Could not read {h5_filepath}. Exiting...")
raise
if combined_data is None:
logging.info(f"Symbol {symbol} not found in any of the HDF5 files.")
return None
else:
for col in combined_data.select_dtypes(include=['object']).columns:
combined_data[col] = pd.to_numeric(combined_data[col], errors='ignore')
if 'date' in combined_data.columns:
combined_data.sort_values(by='date', inplace=True)
else:
logging.info(f"'date' column not found in combined data for symbol {symbol}. Sorting by index instead.")
combined_data.sort_index(inplace=True)
logging.info(f"{symbol}: successfully combined all HDF5 files.")
return combined_data
def get_company_name(symbol, h5_file_path, country_code='US'):
try:
# Read the DataFrame for the entire country from the HDF5 file
symbols_df = pd.read_hdf(h5_file_path, key=f'/{country_code}')
# Filter by the specific symbol to get the company name
company_name_row = symbols_df[symbols_df['Code'] == symbol]
if not company_name_row.empty:
return company_name_row['Name'].iloc[0]
else:
logging.error(f"No data for symbol {symbol} in the DataFrame.")
return "Unknown"
except KeyError:
logging.error(f"No object named {country_code} in the file {h5_file_path}")
return "Unknown"
def add_content_before_plot(symbol, max_retries=3, sleep_duration=5):
target_filename = f"{symbol}_quarterly.html"
html_file_path = os.path.join(plots_dir, target_filename)
if not os.path.exists(html_file_path):
print(f"HTML file for symbol {symbol} not found.")
return
# Fetch additional info from General.h5
exchange = symbol_exchange_map.get(symbol, 'Other') # Default to 'Other' if exchange is not found
h5_general_path = os.path.join(data_dir, f"{exchange}_General.h5")
sector, industry, description, full_time_employees, updated_at, web_url, phone, address, name, exchange = read_general_info(symbol, h5_general_path, max_retries, sleep_duration)
# Prepare new HTML content
new_html_content = f"""
<h1>{symbol} - {name} - {exchange}</h1>
<p>Updated At: {updated_at}</p>
<p><strong style='font-size: larger;'>WebURL:</strong> <a href='{web_url}' target='_blank'>{web_url}</a>
<strong style='font-size: larger;'>Phone:</strong> {phone}
<strong style='font-size: larger;'>Address:</strong> {address}<br>
<strong style='font-size: larger;'>Sector:</strong> {sector}
<strong style='font-size: larger;'>Industry:</strong> {industry}<br>
<strong style='font-size: larger;'>Full Time Employees:</strong> {full_time_employees}</p><br>
<strong style='font-size: larger;'>Description:</strong> {description}<br><br>
"""
# Read valuation.h5 data for the symbol
h5_valuation_path = os.path.join(data_dir, f"{exchange}_Valuation.h5")
retries = 0
valuation_data = None
while retries < max_retries:
try:
with pd.HDFStore(h5_valuation_path, 'r') as store:
if symbol in store:
valuation_data = store.get(symbol)
else:
valuation_data = pd.DataFrame(columns=['SubCategory', 'Data'])
print(f"The symbol {symbol} valuation data does not exist in {h5_valuation_path}")
break
except Exception as e:
if retries < max_retries - 1:
print(f"An exception occurred while reading {h5_valuation_path}: {e}")
print(f"Retrying in {sleep_duration} seconds...")
time.sleep(sleep_duration)
retries += 1
else:
print(f"Max retries reached. Could not read {h5_valuation_path}. Exiting...")
raise
# Add valuation_data to new_html_content
valuation_html = "<h2>Valuation</h2><div style='display: flex; flex-wrap: wrap;'>"
if valuation_data is not None:
print(f"valuation_data: {valuation_data}")
valuation_data['Data'] = pd.to_numeric(valuation_data['Data'], errors='coerce')
for index, row in valuation_data.iterrows():
data = row['Data']
if pd.notnull(data): # Check if 'Data' is not NaN
if index in percentage_metrics:
formatted_data = "{:.2%}".format(float(data))
elif isinstance(data, (int, float)) and abs(data) >= 1_000: # Large numbers
formatted_data = format_large_number(data)
elif isinstance(data, (int, float)):
print('formatting numbers with commas in Valuation')
print(f"data: {data}, type: {type(data)}")
formatted_data = f"{data:,.2f}" # For other numbers, just format with commas
else:
formatted_data = data # For non-numeric data, leave as is
else:
# If data is NaN or non-numeric, leave as is
formatted_data = row['Data']
valuation_html += f"<div style='flex: 0 0 calc(33.333% - 10px); margin-right: 10px; margin-bottom: 10px;'>"
valuation_html += f"<strong>{row['SubCategory']}</strong>: {formatted_data}</div>"
valuation_html += "</div>"
new_html_content += valuation_html # Append valuation data to new_html_content
# Read highlights.h5 data for the symbol
h5_highlights_path = os.path.join(data_dir, f"{exchange}_Highlights.h5")
retries = 0
while retries < max_retries:
try:
with pd.HDFStore(h5_highlights_path, 'r') as store:
if symbol in store:
highlights_data = store.get(symbol)
else:
highlights_data = pd.DataFrame(columns=['SubCategory', 'Data'])
print(f"The symbol {symbol} highlights data does not exist in {h5_highlights_path}")
break
except Exception as e:
if retries < max_retries - 1:
print(f"An exception occurred while reading {h5_highlights_path}: {e}")
print(f"Retrying in {sleep_duration} seconds...")
time.sleep(sleep_duration)
retries += 1
else:
print(f"Max retries reached. Could not read {h5_highlights_path}. Exiting...")
raise
# Add title before highlights
new_html_content += "<h2>Highlights</h2>"
# Add highlights_data to new_html_content
highlights_html = "<div style='display: flex; flex-wrap: wrap;'>"
highlights_data['Data'] = highlights_data['Data'].apply(pd.to_numeric, errors='ignore')
for index, row in highlights_data.iterrows():
subcategory = row['SubCategory']
data = row['Data']
if subcategory in percentage_metrics:
formatted_data = format_percentage(data)
elif pd.notnull(data) and isinstance(data, (int, float)):
if abs(data) >= 1_000: # Large numbers
print(f"{symbol}: {subcategory} is a large number. Formatting {data} with commas...")
formatted_data = format_large_number(data)
else: # Other numeric data that is not a large number
print(f"{symbol}: {subcategory} is a numeric value. Formatting {data} with commas...")
formatted_data = f"{data:,.2f}"
else:
print(f'{symbol}: {subcategory} is a non-numeric value. Data: {data}')
formatted_data = data # For non-numeric data, leave as is
highlights_html += f"""<div style='flex: 0 0 calc(33.333% - 10px); margin-right: 10px; margin-bottom: 10px;'>
<strong>{subcategory}</strong>: {formatted_data}
</div>"""
highlights_html += "</div>"
new_html_content += highlights_html # App
# Add the disclaimer to the end of new_html_content
disclaimer = "<br><br><p><strong>Disclaimer:</strong> The last data point has been excluded from the Prophet prediction.</p>"
new_html_content += disclaimer
with open(html_file_path, 'r', encoding='utf-8') as f:
html_content = f.read()
# Parse the HTML content with BeautifulSoup
soup = BeautifulSoup(html_content, 'html.parser')
# Find the div that contains the Plotly plot
plot_div = soup.find('div', {'class': 'plotly-graph-div'})
if plot_div:
# Create a BeautifulSoup object from the new_content string
new_content_soup = BeautifulSoup(new_html_content, 'html.parser')
# Insert the new content before the Plotly plot
plot_div.insert_before(new_content_soup)
# Save the modified HTML back to disk
with open(html_file_path, 'w', encoding='utf-8') as f:
f.write(str(soup))
else:
print(f"Plotly plot not found in the HTML file {html_file_path}.")
def forecast_with_multiple_metrics(df: pd.DataFrame, periods: int = 4, save_dir: str = "plots/", use_all_data: bool = True, eod_price_data: str = 'eod_price_data.h5'):
def plot_adjusted_close_from_h5(fig, symbol, hdf5_file_path, max_retries=3, sleep_duration=5):
hdf5_file_path = data_dir + hdf5_file_path
keys = access_hdf5_with_retries(hdf5_file_path, 'r', max_retries, sleep_duration)
if f'/{symbol}' in keys:
with pd.HDFStore(hdf5_file_path, 'r') as store:
stock_data = store.get(symbol)
# Add actual adjusted_close to the plot
fig.add_trace(
go.Scatter(x=stock_data.index, y=stock_data['Adjusted_close'],
mode='lines+markers',
name='Actual Adjusted_close',
legendgroup='Actual',
line=dict(color='purple'),
showlegend=True),
row=1, col=1
)
prophet_df = stock_data.reset_index()[['Date', 'Adjusted_close']].rename(
columns={'Date': 'ds', 'Adjusted_close': 'y'})
best_mae = float('inf')
best_forecast = None
best_mode = None
for mode in ['additive', 'multiplicative']:
model = Prophet(
seasonality_mode=mode,
yearly_seasonality=True,
weekly_seasonality=False,
daily_seasonality=False)
model.fit(prophet_df)
future = model.make_future_dataframe(periods=365) # 1-year prediction
forecast = model.predict(future)
mae = calculate_mae(prophet_df['y'], forecast.loc[:len(prophet_df) - 1, 'yhat'])
if mae < best_mae:
best_mae = mae
best_forecast = forecast
best_mode = mode
# Add the best forecast to the plot
fig.add_trace(
go.Scatter(x=best_forecast['ds'], y=best_forecast['yhat'],
mode='lines',
name='Adjusted_close Forecast',
legendgroup='Forecast',
line=dict(color='green'),
showlegend=True),
row=1, col=1
)
# Add yhat_upper and yhat_lower to the plot
fig.add_trace(
go.Scatter(x=best_forecast['ds'], y=best_forecast['yhat_upper'],
mode='lines',
name='Upper Forecast',
legendgroup='Upper Forecast',
line=dict(color='blue', dash='dash'),
showlegend=True),
row=1, col=1
)
fig.add_trace(
go.Scatter(x=best_forecast['ds'], y=best_forecast['yhat_lower'],
mode='lines',
name='Lower Forecast',
legendgroup='Lower Forecast',
line=dict(color='blue', dash='dash'),
showlegend=True),
row=1, col=1
)
else:
logging.warning(f"No Adjusted_close data found for symbol: {symbol}")
pd.set_option('display.max_columns', None)
logging.debug(df.columns.tolist())
# Get the columns that are both in df.columns and reordered_columns
common_columns = [col for col in reordered_columns if col in df.columns]
# Check if common_columns is not empty
if common_columns:
# Reorder the DataFrame using the common columns
df = df[common_columns + [col for col in df.columns if col not in common_columns]]
else:
logging.warning("No common columns between df and reordered_columns.")
metrics = [col for col in df.columns if df[col].dtype in ['int64', 'float64']]
logging.info(metrics)
subplot_titles = []
for metric in metrics:
definition = metric_definitions.get(metric, 'No definition available')
wrapped_definition = wrap_text(definition, 160) # Adjust the max_length as needed
title = f"<b>{metric}</b><br><span style='font-size: smaller;'>{wrapped_definition}</span>"
subplot_titles.append(title)
fig = make_subplots(rows=len(subplot_titles) + 1, cols=1, subplot_titles=['Adjusted_close'] + subplot_titles)
symbol = df['Symbol'].iloc[0]
plot_adjusted_close_from_h5(fig, symbol, eod_price_data)
# Fetch the company_name
h5_symbols = os.path.join(data_dir, "symbols.h5")
company_name = get_company_name(symbol, h5_symbols)
if not os.path.exists(save_dir):
os.makedirs(save_dir)
if not use_all_data:
disclaimer = "Disclaimer: The last data point has been excluded from the Prophet prediction. \n "
disclaimer += "There may not be enough data points for Prophet to make accurate predictions. \n"
fig.add_annotation(
dict(
x=0,
y=1.1,
xref="paper",
yref="paper",
text=disclaimer,
showarrow=False,
font=dict(size=16)
)
)
for i, metric in enumerate(metrics):
row = i + 2
plotting_df = df[['date', metric]].copy().dropna()
prophet_df = plotting_df.copy()
# Convert the specified metrics to percentages by multiplying by 100
if metric in ['ROE', 'Earnings_Yield', 'Dividend_Yield']:
prophet_df[metric] = prophet_df[metric] * 100 # Convert to percentage
plotting_df[metric] = plotting_df[metric] * 100 # Convert to percentage
if not use_all_data:
prophet_df = prophet_df.iloc[:-1, :]
prophet_df.rename(columns={'date': 'ds', metric: 'y'}, inplace=True)
if prophet_df.shape[0] < 2:
logging.warning(f"Skipping {metric} because it has less than 2 non-NaN rows.")
continue
best_mae = float('inf')
best_forecast = None
best_mode = None
for mode in ['additive', 'multiplicative']:
model = Prophet(
seasonality_mode=mode,
yearly_seasonality=True,
weekly_seasonality=False,
daily_seasonality=False)
model.fit(prophet_df)
future = model.make_future_dataframe(periods=periods, freq='Q')
forecast = model.predict(future)
mae = calculate_mae(prophet_df['y'], forecast.loc[:len(prophet_df) - 1, 'yhat'])
if mae < best_mae:
best_mae = mae
best_forecast = forecast
best_mode = mode
logging.info(f"Best seasonality mode for {metric} is {best_mode} with MAE {best_mae}")
hover_format = '%{x}: %{y:.2f}%' if metric in ['ROE', 'Earnings_Yield', 'Dividend_Yield'] else '%{x}: %{y:,}'
fig.add_trace(
go.Scatter(x=plotting_df['date'], y=plotting_df[metric],
mode='lines+markers',
name=f'Actual {metric}',
legendgroup='Actual',
line=dict(color='purple'),
showlegend=(i == 0),
hovertemplate=hover_format),
row=row, col=1
)
# Plot the forecasted metric data
fig.add_trace(
go.Scatter(x=best_forecast['ds'],
y=best_forecast['yhat'],
mode='lines+markers',
name=f'Forecasted {metric}',
legendgroup='Forecast',
line=dict(color='green'),
showlegend=(i == 0),
hovertemplate=hover_format),
row=row, col=1
)
# Plot the upper forecast interval
fig.add_trace(
go.Scatter(x=best_forecast['ds'], y=best_forecast['yhat_upper'],
mode='lines',
name=f'Upper Bound {metric}',
legendgroup='Upper Forecast',
line=dict(color='blue', dash='dash'),
showlegend=(i == 0),
hovertemplate=hover_format),
row=row, col=1
)
# Plot the lower forecast interval
fig.add_trace(
go.Scatter(x=best_forecast['ds'], y=best_forecast['yhat_lower'],
mode='lines',
name=f'Lower Bound {metric}',
legendgroup='Lower Forecast',
line=dict(color='blue', dash='dash'),
showlegend=(i == 0),
hovertemplate=hover_format),
row=row, col=1
)
fig.update_layout(
height=350 * len(metrics),
width=1800,
title_font_size=16 # You can change this value as needed
)
plot_file_path = os.path.join(save_dir, f"{symbol}_quarterly.html")
fig.write_html(plot_file_path)
logging.info(f"Saved Prophet plot for {company_name} to {plot_file_path}")
add_content_before_plot(symbol)
def get_symbols(h5_file_path, key='US'):
"""
Open an HDF5 file and populate the global dictionary symbol_exchange_map
where the symbol is the key and the exchange is the value.
Parameters:
h5_file_path (str): The path to the HDF5 file.
key (str): The key to use when reading the HDF5 file. Default is 'US'.
Returns:
None
"""
h5_file_path = Path(h5_file_path)
# Check if the file exists
if not h5_file_path.exists():
logging.info(f"The file {h5_file_path} does not exist.")
return
try:
# Read the DataFrame from the HDF5 file
df = pd.read_hdf(h5_file_path, key=key)
# Check if 'Code' and 'Exchange' columns exist
if 'Code' not in df.columns or 'Exchange' not in df.columns:
logging.info(f"The 'Code' or 'Exchange' column does not exist in the DataFrame.")
return
# Populate the global symbol_exchange_map
global symbol_exchange_map
symbol_exchange_map = dict(zip(df['Code'], df['Exchange']))
return list(symbol_exchange_map.keys())
except Exception as e:
logging.error(f"An error occurred: {e}")
return
symbols = get_symbols(data_dir + 'symbols.h5', key='US')
symbols = ['UBER', 'LYFT', 'WE', 'IEP', 'AAPL'] + sp500 + largest_banks + companies_with_treasuries + homebuilders
symbols = ['GDHG', 'IEP']
symbols = ['GOOG']
h5_files = set() # Use a set to automatically handle duplicates
for symbol in symbols:
exchange = symbol_exchange_map.get(symbol, 'Other') # Default to 'Other' if not found
current_h5_files = [
f"{data_dir}/{exchange}_Cash_Flow_quarterly.h5",
f"{data_dir}/{exchange}_Balance_Sheet_quarterly.h5",
f"{data_dir}/{exchange}_Income_Statement_quarterly.h5",
f"{data_dir}/{exchange}_Ratios.h5"
]
h5_files.update(current_h5_files) # Update the set with the new file paths
logging.info(f'Processing {symbol}')
try:
# Pass only the HDF5 files corresponding to the current symbol's exchange
df = fetch_data_for_symbol_from_multiple_h5(list(current_h5_files), symbol, max_retries=3, sleep_duration=5)
if df is None:
logging.warning(f"No data found for symbol {symbol}. Skipping to the next symbol.")
continue # Skip to the next iteration of the loop forecast_with_multiple_metrics(df, periods=4, save_dir="plots/", use_all_data=False)
logging.info(df.head(20))
forecast_with_multiple_metrics(df)
except Exception as e:
logging.error(f"An unexpected error occurred while processing symbol {symbol}: {e}")
If you’ve spent any time working with large datasets, especially in scientific computing, machine learning, or finance, you’ve probably come across HDF5 files. But what are they, and why are they so popular for storing complex data structures? In this blog post, we’ll explore the HDF5 file format, its architecture, and its various use-cases.
What is HDF5?
HDF5 stands for Hierarchical Data Format version 5. It is a file format that allows for a flexible and efficient way of storing complex data relationships, including metadata. Developed by the HDF Group, it is designed to store and organize large amounts of data, and to facilitate the sharing of that data with others.
Key Features
Hierarchical Structure
The “Hierarchical” in HDF5 refers to its tree-like structure, much like a file system. This allows for a more organized way to store datasets and metadata. In an HDF5 file, you can have groups that contain other groups or datasets, akin to folders and files in a computer’s file system.
High Performance
HDF5 is designed with performance in mind, allowing for fast read and write operations. This is crucial when working with large datasets that need to be accessed or modified frequently.
Extensible
You can add new data to an existing HDF5 file without disturbing its structure, making it a highly extensible format. This is particularly useful in scientific research and other evolving projects.
Portability
HDF5 files are easily shareable and are compatible across various platforms and languages. Libraries for interacting with HDF5 files are available in languages like Python, C, C++, and Java, among others.
Compression
HDF5 files support on-the-fly compression, saving valuable disk space. This is particularly useful when dealing with very large datasets.
Use Cases
Scientific Computing
In fields like physics, astronomy, and bioinformatics, HDF5 is often the go-to solution for handling complex data relationships and large datasets.
Financial Data
In finance, HDF5 files are commonly used to store time-series data, like stock prices, and complex financial models, making it easier for data analysts and algorithmic traders to backtest strategies.
Machine Learning
When dealing with large and complex datasets for training machine learning models, the efficiency and flexibility of HDF5 make it an ideal choice.
How to Interact with HDF5 Files
Python
In Python, one of the most commonly used libraries for interacting with HDF5 files is h5py. Here’s a quick example of how to create an HDF5 file and add a dataset:
import h5py
with h5py.File('example.h5', 'w') as f:
dset = f.create_dataset("my_dataset", (100,), dtype='i')
Conclusion
HDF5 files offer a robust, efficient, and flexible way to store complex data structures. Whether you are a researcher, a data scientist, or a financial analyst, understanding how to use HDF5 files can be a significant advantage in your work.
So the next time you need to store large amounts of structured data, consider using HDF5. It’s a powerful tool that can simplify your data management tasks and improve the efficiency of your data operations.
In a previous post, I showed how to store symbol data from EODHD. The purpose of this code is to now iterate through all those symbols and grab the corporate financial data from EODHD using their API. If you’re interested in downloading Open, High, Low, Close, Adjusted Cose, and Volume data you can find that in this blog post.
Output
Script Overview
The Python script is primarily designed to perform the following tasks:
Store this information in an HDF5 file for optimized data storage.
Handle various categories of financial data including ‘General’, ‘Earnings’, ‘Financials’, and more.
Log activities for better debugging and monitoring.
The script relies on the EOD Historical Data API and uses Python libraries like Pandas, Requests, and h5py.
Utility Functions
save_dataframe_to_h5(): Saves a Pandas DataFrame to an HDF5 file.
key_exists_in_h5(): Checks if a key exists in an HDF5 file.
print_all_keys_in_h5(): Prints all keys in an HDF5 file.
symbol_exists_in_h5(): Checks if a symbol exists in an HDF5 file.
convert_columns_to_numeric(): Converts DataFrame columns to numeric types, if applicable.
update_dataframe(): Updates the DataFrame with new rows of data.
Main Function: fetch_and_store_fundamentals()
This function performs the core operation of fetching and storing data. It takes an API token, a list of stock symbols, a data directory, and some optional parameters as arguments.
The function goes through the following steps for each symbol:
Check If Data Exists: If the skip_existing flag is true, it checks whether the data already exists in the HDF5 storage.
Fetch Data: Downloads JSON data for the stock symbol from the EOD Historical Data API.
Data Processing: Processes different categories of data (General, Financials, Earnings, etc.) and stores them in separate HDF5 files.
Log Update: Updates a log file with the timestamp of the last fetch for each symbol.
Helper Function: get_symbols()
This function populates a global dictionary, symbol_exchange_map, mapping stock symbols to their respective exchanges. It reads this information from an existing HDF5 file.
Code Execution
Finally, the script fetches a list of stock symbols using get_symbols() and then calls the fetch_and_store_fundamentals() function to perform the data fetching and storing.
Conclusion
Automating the process of financial data collection and storage is a crucial step in building robust trading algorithms or investment strategies. This script serves as a foundational block for such endeavors, allowing you to focus more on data analysis rather than data collection.
Code
from keys import api_token
import time
import h5py
data_dir = '/home/shared/algos/data/'
logs_dir = '/home/shared/algos/eodhd_data/logs/'
import logging
import pandas as pd
# Configure Pandas to display all columns
pd.set_option('display.max_columns', None)
import requests
import io
from pathlib import Path
from tqdm import tqdm
import json
import os
from datetime import datetime, timedelta
from dictionaries_and_lists import homebuilders, sp500, companies_with_treasuries, largest_banks
logging.basicConfig(level=logging.INFO)
pd.set_option('display.expand_frame_repr', False)
symbol_exchange_map = {}
def save_dataframe_to_h5(df, h5_path, key, drop_columns=None):
convert_columns_to_numeric(df) # Assuming this function converts numeric columns
column_type_mapping = {
'Symbol': 'object',
'Country': 'object',
'reportDate': 'datetime',
'filing_date': 'datetime',
'date': 'datetime',
'Date': 'datetime',
'currency_symbol': 'object',
'currency': 'object',
'beforeAfterMarket': 'object'
}
for column, dtype in column_type_mapping.items():
if column in df.columns:
if dtype == 'datetime':
df[column] = pd.to_datetime(df[column], errors='coerce')
else:
df[column] = df[column].astype(dtype)
if drop_columns:
df.drop(columns=drop_columns, errors='ignore', inplace=True)
try:
df.to_hdf(h5_path, key=key, mode='a')
except Exception as e:
logging.error(f"Failed to save DataFrame to HDF5 file {h5_path} with key {key}. Error: {e}")
def key_exists_in_h5(h5_filename, key):
with pd.HDFStore(h5_filename, 'r') as store:
return key in store
def print_all_keys_in_h5(h5_filename):
with pd.HDFStore(h5_filename, 'r') as store:
print("Keys in HDF5 file:")
for key in store.keys():
print(key)
def symbol_exists_in_h5(h5_filepath, symbol):
try:
with pd.HDFStore(h5_filepath, 'r') as store:
return f'/{symbol}' in store.keys()
except HDF5Error:
return False
def convert_columns_to_numeric(df):
for col in df.columns:
df[col] = pd.to_numeric(df[col], errors='ignore')
def update_dataframe(df, symbol, sub_category, sub_category_data, time_frame=None):
row_data = {
'Symbol': symbol,
'SubCategory': sub_category,
}
if time_frame is not None:
row_data['TimeFrame'] = time_frame
if isinstance(sub_category_data, dict):
row_data.update(sub_category_data)
else:
row_data['Data'] = str(sub_category_data)
new_row = pd.DataFrame([row_data])
df = pd.concat([df, new_row], ignore_index=True)
return df
def fetch_and_store_fundamentals(api_token, symbols, data_dir, skip_existing=False, hours_to_skip=72):
log_file = logs_dir + 'symbol_time_log.json'
symbol_time_dict = {}
try:
with open(log_file, 'r') as f:
content = f.read()
symbol_time_dict = json.loads(content)
except json.JSONDecodeError as e:
logging.error(
f"JSON Decode Error at line {e.lineno}, column {e.colno}. Content around error position: {content[e.pos - 10:e.pos + 10]}")
except FileNotFoundError:
logging.info(f"File {log_file} not found. An empty dictionary will be used.")
except Exception as e:
logging.error(f"An unexpected error occurred: {e}")
for symbol in tqdm(symbols, desc='Fetching and storing fundamentals'):
last_downloaded_time = symbol_time_dict.get(symbol, None)
if last_downloaded_time:
last_downloaded_time = datetime.fromisoformat(last_downloaded_time)
time_since_last_download = datetime.now() - last_downloaded_time
if time_since_last_download < timedelta(hours=hours_to_skip):
logging.info(f"Data for symbol {symbol} was downloaded recently. Skipping...")
continue
if skip_existing:
# Assuming the symbol is known at this point in your code
exchange = symbol_exchange_map.get(symbol, '') # Get exchange name from global dict
# Create the new h5 path with the exchange name prepended
h5_path_check = Path(data_dir, f"{exchange}_General.h5") if exchange else Path(data_dir, "General.h5")
if h5_path_check.exists() and key_exists_in_h5(h5_path_check, f'/{symbol}'):
logging.info(f"Data for symbol {symbol} already exists. Skipping...")
continue
logging.info(f"\n{symbol}: Downloading from EODHD...")
try:
url = f"https://eodhd.com/api/fundamentals/{symbol}.US?api_token={api_token}"
response = requests.get(url)
except ConnectionError:
logging.error(f"ConnectionError occurred while fetching data for symbol {symbol}. Retrying in 60 seconds.")
time.sleep(60)
continue
except ConnectionRefusedError:
logging.error(
f"ConnectionRefusedError occurred while fetching data for symbol {symbol}. Retrying in 60 seconds.")
time.sleep(60)
continue
except Exception as e:
logging.error(f"An unexpected error occurred: {e}")
continue
if response.status_code != 200:
logging.error(f"Failed to fetch data for symbol {symbol}. HTTP Status Code: {response.status_code} \n Sleeping for 60 seconds")
time.sleep(60) # Sleep for 60 seconds
continue # Continue to next iteration
json_data = response.json()
logging.info(f"\n{symbol}: Finished downloading from EODHD...")
# Check if the logging level is set to DEBUG
if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
json_dump_file = logs_dir + 'api_response_output.txt'
with open(json_dump_file, 'w') as f:
f.write("JSON Data:\n")
f.write(json.dumps(json_data, indent=4))
for category, category_data in json_data.items():
if category_data is None:
logging.warning(f"Data for category {category} is None.")
continue
exchange = symbol_exchange_map.get(symbol, 'Other')
h5_path = Path(data_dir, f"{exchange}_{category}.h5")
df = pd.DataFrame()
if category == 'ESGScores':
continue
elif category == 'Holders':
for holder_type, holder_data in category_data.items():
h5_path = Path(data_dir, f"{exchange}_Holders_{holder_type}.h5")
for holder_id, holder_info in holder_data.items():
df = update_dataframe(df, symbol, holder_type, holder_info)
save_dataframe_to_h5(df, h5_path, key=symbol, drop_columns=['SubCategory'])
logging.info(f"{symbol} finished processing category Holders")
elif category == 'SplitsDividends':
logging.debug(f"Processing 'SplitsDividends' category. Data: {category_data}")
# h5_path = Path(data_dir, f"{category}.h5")
for sub_key, sub_data in category_data.items():
logging.debug(f"Processing key: {sub_key}")
logging.debug(f"Data for key {sub_key}: {sub_data}")
if sub_key == 'NumberDividendsByYear':
nested_h5_path = Path(data_dir, f"{exchange}_SplitsDividends_{sub_key}.h5")
nested_df = pd.DataFrame()
for item_key, item_data in sub_data.items():
logging.debug(f"Item data: {item_key}")
nested_df = update_dataframe(nested_df, symbol, sub_key, item_data)
# Sort and remove duplicates based on the Year column
# Before sorting, check if 'Year' exists
if 'Year' in nested_df.columns:
nested_df = nested_df.sort_values(by=['Year'])
nested_df.drop_duplicates(subset=['Year'], keep='last', inplace=True)
save_dataframe_to_h5(nested_df, nested_h5_path, key=symbol, drop_columns=['SubCategory'])
else:
df = update_dataframe(df, symbol, sub_key, sub_data)
save_dataframe_to_h5(df, h5_path, key=symbol)
logging.info(f"{symbol} finished processing category SplitsDividends")
elif category == 'General':
logging.debug(f"Processing 'General' category. Data: {category_data}")
for sub_key, sub_data in category_data.items():
logging.debug(f"Processing key: {sub_key}")
logging.debug(f"Data for key {sub_key}: {sub_data}")
if sub_key in ['Listings', 'Officers']:
continue # Skip 'Listings' and 'Officers'
df = update_dataframe(df, symbol, sub_key, sub_data)
save_dataframe_to_h5(df, h5_path, key=symbol)
logging.info(f"{symbol} finished processing category General")
elif category == 'Financials':
# Iterate through report types like 'Balance Sheet', 'Cash Flow', 'Income Statements'
for report_type, report_data in category_data.items():
# Iterate through time frames like 'annual' or 'quarterly'
for time_frame, time_frame_data in report_data.items():
if time_frame == 'currency_symbol':
continue # Skip the 'currency_symbol'
# Create a specific .h5 path for each combination of report_type and time_frame
h5_path = Path(data_dir, f"{exchange}_{report_type}_{time_frame}.h5")
df = pd.DataFrame()
# Update the DataFrame with financial data
for sub_category, sub_category_data in time_frame_data.items():
df = update_dataframe(df, symbol, sub_category, sub_category_data)
# Save this specific DataFrame to its respective .h5 file
save_dataframe_to_h5(df, h5_path, key=symbol, drop_columns=['SubCategory'])
logging.info(f"{symbol} finished processing category Financials")
elif category == 'Earnings':
for sub_category, sub_category_data in category_data.items():
# Create a specific .h5 path for each sub-category
h5_path = Path(data_dir, f"{exchange}_{category}_{sub_category}.h5")
df = pd.DataFrame()
for date_entry, date_entry_data in sub_category_data.items(): # Iterate through each date entry in the subcategory
if date_entry == 'currency_symbol':
continue
# Adding a date field to each entry
date_entry_data['Date'] = date_entry
# Update the dataframe with new row
df = update_dataframe(df, symbol, sub_category, date_entry_data)
# Save this specific DataFrame to its respective .h5 file
if 'SubCategory' in df.columns:
save_dataframe_to_h5(df, h5_path, key=symbol, drop_columns=['SubCategory']) # Drop the 'SubCategory' column
else:
save_dataframe_to_h5(df, h5_path, key=symbol)
logging.info(f"{symbol} finished processing category Earnings")
elif category == 'outstandingShares':
for time_frame, time_frame_data in category_data.items(): # time_frame can be 'annual' or 'quarterly'
# Create a specific .h5 path for each time frame
h5_path = Path(data_dir, f"{exchange}_{category}_{time_frame}.h5")
df = pd.DataFrame()
for entry_id, entry_data in time_frame_data.items():
df = update_dataframe(df, symbol, time_frame, entry_data)
# Save this specific DataFrame to its respective .h5 file
save_dataframe_to_h5(df, h5_path, key=symbol, drop_columns=['SubCategory'])
logging.info(f"{symbol} finished processing category outstandingShares")
elif category == 'ETF_Data':
# DataFrame for the top-level ETF_Data, excluding subcategories that will be handled separately
top_level_df = pd.DataFrame()
# Dictionary to hold top-level data for this stock symbol
top_level_data = {'Symbol': symbol}
for sub_category, sub_category_data in category_data.items():
if sub_category in ['Asset_Allocation', 'World_Regions', 'Sector_Weights', 'Fixed_Income', 'Top_10_Holdings', 'Holdings', 'Valuations_Growth', 'Market_Capitalisation', 'MorningStar', 'Performance', ]:
# Create a specific .h5 path for each sub-category
h5_path = Path(data_dir, f"{exchange}_ETF_{sub_category}.h5")
df = pd.DataFrame()
# Update the DataFrame with sub-category data
for item_key, item_data in sub_category_data.items():
df = update_dataframe(df, symbol, item_key, item_data)
# Save this specific DataFrame to its respective .h5 file
save_dataframe_to_h5(df, h5_path, key=symbol)
else:
# Populate the top-level data dictionary
top_level_data[sub_category] = sub_category_data
# Convert the top-level data dictionary to a DataFrame and append it to top_level_df
new_row = pd.DataFrame([top_level_data])
top_level_df = pd.concat([top_level_df, new_row], ignore_index=True)
# Save the top-level ETF_Data DataFrame to its respective .h5 file
top_level_h5_path = Path(data_dir, "ETF_Data.h5")
save_dataframe_to_h5(top_level_df, top_level_h5_path, key=symbol)
logging.info(f"{symbol} finished processing category ETF_Data")
else:
logging.debug(f'Processing other category {category}')
if h5_path.exists() and symbol_exists_in_h5(h5_path, symbol):
df = pd.read_hdf(h5_path, key=symbol)
else:
df = pd.DataFrame()
if isinstance(category_data, dict):
for sub_category, sub_category_data in category_data.items():
df = update_dataframe(df, symbol, sub_category, sub_category_data)
else:
df = update_dataframe(df, symbol, category, category_data)
save_dataframe_to_h5(df, h5_path, key=symbol)
logging.info(f"{symbol} finished processing category {category}")
logging.info(f"{symbol} updating symbol status log file.")
symbol_time_dict[symbol] = datetime.now().isoformat()
with open(log_file, 'w') as f:
json.dump(symbol_time_dict, f)
logging.info(f"{symbol} finished processing")
def get_symbols(h5_file_path, key='US'):
"""
Open an HDF5 file and populate the global dictionary symbol_exchange_map
where the symbol is the key and the exchange is the value.
Parameters:
h5_file_path (str): The path to the HDF5 file.
key (str): The key to use when reading the HDF5 file. Default is 'US'.
Returns:
None
"""
h5_file_path = Path(h5_file_path)
# Check if the file exists
if not h5_file_path.exists():
logging.info(f"The file {h5_file_path} does not exist.")
return
try:
# Read the DataFrame from the HDF5 file
df = pd.read_hdf(h5_file_path, key=key)
# Check if 'Code' and 'Exchange' columns exist
if 'Code' not in df.columns or 'Exchange' not in df.columns:
logging.info(f"The 'Code' or 'Exchange' column does not exist in the DataFrame.")
return
# Populate the global symbol_exchange_map
global symbol_exchange_map
symbol_exchange_map = dict(zip(df['Code'], df['Exchange']))
return list(symbol_exchange_map.keys())
except Exception as e:
logging.error(f"An error occurred: {e}")
return
symbols = get_symbols(data_dir + 'symbols.h5', key='US')
fetch_and_store_fundamentals(api_token, symbols, data_dir, skip_existing=True, hours_to_skip=72)