In a previous post, I showed how to store symbol data from EODHD. The purpose of this code is to now iterate through all those symbols and grab the corporate financial data from EODHD using their API. If you’re interested in downloading Open, High, Low, Close, Adjusted Cose, and Volume data you can find that in this blog post.
Output
Script Overview
The Python script is primarily designed to perform the following tasks:
- Fetch financial fundamentals for a list of stock symbols.
- Store this information in an HDF5 file for optimized data storage.
- Handle various categories of financial data including ‘General’, ‘Earnings’, ‘Financials’, and more.
- Log activities for better debugging and monitoring.
The script relies on the EOD Historical Data API and uses Python libraries like Pandas, Requests, and h5py.
Utility Functions
save_dataframe_to_h5()
: Saves a Pandas DataFrame to an HDF5 file.
key_exists_in_h5()
: Checks if a key exists in an HDF5 file.
print_all_keys_in_h5()
: Prints all keys in an HDF5 file.
symbol_exists_in_h5()
: Checks if a symbol exists in an HDF5 file.
convert_columns_to_numeric()
: Converts DataFrame columns to numeric types, if applicable.
update_dataframe()
: Updates the DataFrame with new rows of data.
Main Function: fetch_and_store_fundamentals()
This function performs the core operation of fetching and storing data. It takes an API token, a list of stock symbols, a data directory, and some optional parameters as arguments.
The function goes through the following steps for each symbol:
- Check If Data Exists: If the
skip_existing
flag is true, it checks whether the data already exists in the HDF5 storage.
- Fetch Data: Downloads JSON data for the stock symbol from the EOD Historical Data API.
- Data Processing: Processes different categories of data (
General
, Financials
, Earnings
, etc.) and stores them in separate HDF5 files.
- Log Update: Updates a log file with the timestamp of the last fetch for each symbol.
Helper Function: get_symbols()
This function populates a global dictionary, symbol_exchange_map
, mapping stock symbols to their respective exchanges. It reads this information from an existing HDF5 file.
Code Execution
Finally, the script fetches a list of stock symbols using get_symbols()
and then calls the fetch_and_store_fundamentals()
function to perform the data fetching and storing.
Conclusion
Automating the process of financial data collection and storage is a crucial step in building robust trading algorithms or investment strategies. This script serves as a foundational block for such endeavors, allowing you to focus more on data analysis rather than data collection.
Code
from keys import api_token
import time
import h5py
data_dir = '/home/shared/algos/data/'
logs_dir = '/home/shared/algos/eodhd_data/logs/'
import logging
import pandas as pd
# Configure Pandas to display all columns
pd.set_option('display.max_columns', None)
import requests
import io
from pathlib import Path
from tqdm import tqdm
import json
import os
from datetime import datetime, timedelta
from dictionaries_and_lists import homebuilders, sp500, companies_with_treasuries, largest_banks
logging.basicConfig(level=logging.INFO)
pd.set_option('display.expand_frame_repr', False)
symbol_exchange_map = {}
def save_dataframe_to_h5(df, h5_path, key, drop_columns=None):
convert_columns_to_numeric(df) # Assuming this function converts numeric columns
column_type_mapping = {
'Symbol': 'object',
'Country': 'object',
'reportDate': 'datetime',
'filing_date': 'datetime',
'date': 'datetime',
'Date': 'datetime',
'currency_symbol': 'object',
'currency': 'object',
'beforeAfterMarket': 'object'
}
for column, dtype in column_type_mapping.items():
if column in df.columns:
if dtype == 'datetime':
df[column] = pd.to_datetime(df[column], errors='coerce')
else:
df[column] = df[column].astype(dtype)
if drop_columns:
df.drop(columns=drop_columns, errors='ignore', inplace=True)
try:
df.to_hdf(h5_path, key=key, mode='a')
except Exception as e:
logging.error(f"Failed to save DataFrame to HDF5 file {h5_path} with key {key}. Error: {e}")
def key_exists_in_h5(h5_filename, key):
with pd.HDFStore(h5_filename, 'r') as store:
return key in store
def print_all_keys_in_h5(h5_filename):
with pd.HDFStore(h5_filename, 'r') as store:
print("Keys in HDF5 file:")
for key in store.keys():
print(key)
def symbol_exists_in_h5(h5_filepath, symbol):
try:
with pd.HDFStore(h5_filepath, 'r') as store:
return f'/{symbol}' in store.keys()
except HDF5Error:
return False
def convert_columns_to_numeric(df):
for col in df.columns:
df[col] = pd.to_numeric(df[col], errors='ignore')
def update_dataframe(df, symbol, sub_category, sub_category_data, time_frame=None):
row_data = {
'Symbol': symbol,
'SubCategory': sub_category,
}
if time_frame is not None:
row_data['TimeFrame'] = time_frame
if isinstance(sub_category_data, dict):
row_data.update(sub_category_data)
else:
row_data['Data'] = str(sub_category_data)
new_row = pd.DataFrame([row_data])
df = pd.concat([df, new_row], ignore_index=True)
return df
def fetch_and_store_fundamentals(api_token, symbols, data_dir, skip_existing=False, hours_to_skip=72):
log_file = logs_dir + 'symbol_time_log.json'
symbol_time_dict = {}
try:
with open(log_file, 'r') as f:
content = f.read()
symbol_time_dict = json.loads(content)
except json.JSONDecodeError as e:
logging.error(
f"JSON Decode Error at line {e.lineno}, column {e.colno}. Content around error position: {content[e.pos - 10:e.pos + 10]}")
except FileNotFoundError:
logging.info(f"File {log_file} not found. An empty dictionary will be used.")
except Exception as e:
logging.error(f"An unexpected error occurred: {e}")
for symbol in tqdm(symbols, desc='Fetching and storing fundamentals'):
last_downloaded_time = symbol_time_dict.get(symbol, None)
if last_downloaded_time:
last_downloaded_time = datetime.fromisoformat(last_downloaded_time)
time_since_last_download = datetime.now() - last_downloaded_time
if time_since_last_download < timedelta(hours=hours_to_skip):
logging.info(f"Data for symbol {symbol} was downloaded recently. Skipping...")
continue
if skip_existing:
# Assuming the symbol is known at this point in your code
exchange = symbol_exchange_map.get(symbol, '') # Get exchange name from global dict
# Create the new h5 path with the exchange name prepended
h5_path_check = Path(data_dir, f"{exchange}_General.h5") if exchange else Path(data_dir, "General.h5")
if h5_path_check.exists() and key_exists_in_h5(h5_path_check, f'/{symbol}'):
logging.info(f"Data for symbol {symbol} already exists. Skipping...")
continue
logging.info(f"\n{symbol}: Downloading from EODHD...")
try:
url = f"https://eodhd.com/api/fundamentals/{symbol}.US?api_token={api_token}"
response = requests.get(url)
except ConnectionError:
logging.error(f"ConnectionError occurred while fetching data for symbol {symbol}. Retrying in 60 seconds.")
time.sleep(60)
continue
except ConnectionRefusedError:
logging.error(
f"ConnectionRefusedError occurred while fetching data for symbol {symbol}. Retrying in 60 seconds.")
time.sleep(60)
continue
except Exception as e:
logging.error(f"An unexpected error occurred: {e}")
continue
if response.status_code != 200:
logging.error(f"Failed to fetch data for symbol {symbol}. HTTP Status Code: {response.status_code} \n Sleeping for 60 seconds")
time.sleep(60) # Sleep for 60 seconds
continue # Continue to next iteration
json_data = response.json()
logging.info(f"\n{symbol}: Finished downloading from EODHD...")
# Check if the logging level is set to DEBUG
if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
json_dump_file = logs_dir + 'api_response_output.txt'
with open(json_dump_file, 'w') as f:
f.write("JSON Data:\n")
f.write(json.dumps(json_data, indent=4))
for category, category_data in json_data.items():
if category_data is None:
logging.warning(f"Data for category {category} is None.")
continue
exchange = symbol_exchange_map.get(symbol, 'Other')
h5_path = Path(data_dir, f"{exchange}_{category}.h5")
df = pd.DataFrame()
if category == 'ESGScores':
continue
elif category == 'Holders':
for holder_type, holder_data in category_data.items():
h5_path = Path(data_dir, f"{exchange}_Holders_{holder_type}.h5")
for holder_id, holder_info in holder_data.items():
df = update_dataframe(df, symbol, holder_type, holder_info)
save_dataframe_to_h5(df, h5_path, key=symbol, drop_columns=['SubCategory'])
logging.info(f"{symbol} finished processing category Holders")
elif category == 'SplitsDividends':
logging.debug(f"Processing 'SplitsDividends' category. Data: {category_data}")
# h5_path = Path(data_dir, f"{category}.h5")
for sub_key, sub_data in category_data.items():
logging.debug(f"Processing key: {sub_key}")
logging.debug(f"Data for key {sub_key}: {sub_data}")
if sub_key == 'NumberDividendsByYear':
nested_h5_path = Path(data_dir, f"{exchange}_SplitsDividends_{sub_key}.h5")
nested_df = pd.DataFrame()
for item_key, item_data in sub_data.items():
logging.debug(f"Item data: {item_key}")
nested_df = update_dataframe(nested_df, symbol, sub_key, item_data)
# Sort and remove duplicates based on the Year column
# Before sorting, check if 'Year' exists
if 'Year' in nested_df.columns:
nested_df = nested_df.sort_values(by=['Year'])
nested_df.drop_duplicates(subset=['Year'], keep='last', inplace=True)
save_dataframe_to_h5(nested_df, nested_h5_path, key=symbol, drop_columns=['SubCategory'])
else:
df = update_dataframe(df, symbol, sub_key, sub_data)
save_dataframe_to_h5(df, h5_path, key=symbol)
logging.info(f"{symbol} finished processing category SplitsDividends")
elif category == 'General':
logging.debug(f"Processing 'General' category. Data: {category_data}")
for sub_key, sub_data in category_data.items():
logging.debug(f"Processing key: {sub_key}")
logging.debug(f"Data for key {sub_key}: {sub_data}")
if sub_key in ['Listings', 'Officers']:
continue # Skip 'Listings' and 'Officers'
df = update_dataframe(df, symbol, sub_key, sub_data)
save_dataframe_to_h5(df, h5_path, key=symbol)
logging.info(f"{symbol} finished processing category General")
elif category == 'Financials':
# Iterate through report types like 'Balance Sheet', 'Cash Flow', 'Income Statements'
for report_type, report_data in category_data.items():
# Iterate through time frames like 'annual' or 'quarterly'
for time_frame, time_frame_data in report_data.items():
if time_frame == 'currency_symbol':
continue # Skip the 'currency_symbol'
# Create a specific .h5 path for each combination of report_type and time_frame
h5_path = Path(data_dir, f"{exchange}_{report_type}_{time_frame}.h5")
df = pd.DataFrame()
# Update the DataFrame with financial data
for sub_category, sub_category_data in time_frame_data.items():
df = update_dataframe(df, symbol, sub_category, sub_category_data)
# Save this specific DataFrame to its respective .h5 file
save_dataframe_to_h5(df, h5_path, key=symbol, drop_columns=['SubCategory'])
logging.info(f"{symbol} finished processing category Financials")
elif category == 'Earnings':
for sub_category, sub_category_data in category_data.items():
# Create a specific .h5 path for each sub-category
h5_path = Path(data_dir, f"{exchange}_{category}_{sub_category}.h5")
df = pd.DataFrame()
for date_entry, date_entry_data in sub_category_data.items(): # Iterate through each date entry in the subcategory
if date_entry == 'currency_symbol':
continue
# Adding a date field to each entry
date_entry_data['Date'] = date_entry
# Update the dataframe with new row
df = update_dataframe(df, symbol, sub_category, date_entry_data)
# Save this specific DataFrame to its respective .h5 file
if 'SubCategory' in df.columns:
save_dataframe_to_h5(df, h5_path, key=symbol, drop_columns=['SubCategory']) # Drop the 'SubCategory' column
else:
save_dataframe_to_h5(df, h5_path, key=symbol)
logging.info(f"{symbol} finished processing category Earnings")
elif category == 'outstandingShares':
for time_frame, time_frame_data in category_data.items(): # time_frame can be 'annual' or 'quarterly'
# Create a specific .h5 path for each time frame
h5_path = Path(data_dir, f"{exchange}_{category}_{time_frame}.h5")
df = pd.DataFrame()
for entry_id, entry_data in time_frame_data.items():
df = update_dataframe(df, symbol, time_frame, entry_data)
# Save this specific DataFrame to its respective .h5 file
save_dataframe_to_h5(df, h5_path, key=symbol, drop_columns=['SubCategory'])
logging.info(f"{symbol} finished processing category outstandingShares")
elif category == 'ETF_Data':
# DataFrame for the top-level ETF_Data, excluding subcategories that will be handled separately
top_level_df = pd.DataFrame()
# Dictionary to hold top-level data for this stock symbol
top_level_data = {'Symbol': symbol}
for sub_category, sub_category_data in category_data.items():
if sub_category in ['Asset_Allocation', 'World_Regions', 'Sector_Weights', 'Fixed_Income', 'Top_10_Holdings', 'Holdings', 'Valuations_Growth', 'Market_Capitalisation', 'MorningStar', 'Performance', ]:
# Create a specific .h5 path for each sub-category
h5_path = Path(data_dir, f"{exchange}_ETF_{sub_category}.h5")
df = pd.DataFrame()
# Update the DataFrame with sub-category data
for item_key, item_data in sub_category_data.items():
df = update_dataframe(df, symbol, item_key, item_data)
# Save this specific DataFrame to its respective .h5 file
save_dataframe_to_h5(df, h5_path, key=symbol)
else:
# Populate the top-level data dictionary
top_level_data[sub_category] = sub_category_data
# Convert the top-level data dictionary to a DataFrame and append it to top_level_df
new_row = pd.DataFrame([top_level_data])
top_level_df = pd.concat([top_level_df, new_row], ignore_index=True)
# Save the top-level ETF_Data DataFrame to its respective .h5 file
top_level_h5_path = Path(data_dir, "ETF_Data.h5")
save_dataframe_to_h5(top_level_df, top_level_h5_path, key=symbol)
logging.info(f"{symbol} finished processing category ETF_Data")
else:
logging.debug(f'Processing other category {category}')
if h5_path.exists() and symbol_exists_in_h5(h5_path, symbol):
df = pd.read_hdf(h5_path, key=symbol)
else:
df = pd.DataFrame()
if isinstance(category_data, dict):
for sub_category, sub_category_data in category_data.items():
df = update_dataframe(df, symbol, sub_category, sub_category_data)
else:
df = update_dataframe(df, symbol, category, category_data)
save_dataframe_to_h5(df, h5_path, key=symbol)
logging.info(f"{symbol} finished processing category {category}")
logging.info(f"{symbol} updating symbol status log file.")
symbol_time_dict[symbol] = datetime.now().isoformat()
with open(log_file, 'w') as f:
json.dump(symbol_time_dict, f)
logging.info(f"{symbol} finished processing")
def get_symbols(h5_file_path, key='US'):
"""
Open an HDF5 file and populate the global dictionary symbol_exchange_map
where the symbol is the key and the exchange is the value.
Parameters:
h5_file_path (str): The path to the HDF5 file.
key (str): The key to use when reading the HDF5 file. Default is 'US'.
Returns:
None
"""
h5_file_path = Path(h5_file_path)
# Check if the file exists
if not h5_file_path.exists():
logging.info(f"The file {h5_file_path} does not exist.")
return
try:
# Read the DataFrame from the HDF5 file
df = pd.read_hdf(h5_file_path, key=key)
# Check if 'Code' and 'Exchange' columns exist
if 'Code' not in df.columns or 'Exchange' not in df.columns:
logging.info(f"The 'Code' or 'Exchange' column does not exist in the DataFrame.")
return
# Populate the global symbol_exchange_map
global symbol_exchange_map
symbol_exchange_map = dict(zip(df['Code'], df['Exchange']))
return list(symbol_exchange_map.keys())
except Exception as e:
logging.error(f"An error occurred: {e}")
return
symbols = get_symbols(data_dir + 'symbols.h5', key='US')
fetch_and_store_fundamentals(api_token, symbols, data_dir, skip_existing=True, hours_to_skip=72)