Using Python to save corporate financial data locally from EODHD

In a previous post, I showed how to store symbol data from EODHD. The purpose of this code is to now iterate through all those symbols and grab the corporate financial data from EODHD using their API. If you’re interested in downloading Open, High, Low, Close, Adjusted Cose, and Volume data you can find that in this blog post.

Output

Script Overview

The Python script is primarily designed to perform the following tasks:

  1. Fetch financial fundamentals for a list of stock symbols.
  2. Store this information in an HDF5 file for optimized data storage.
  3. Handle various categories of financial data including ‘General’, ‘Earnings’, ‘Financials’, and more.
  4. Log activities for better debugging and monitoring.

The script relies on the EOD Historical Data API and uses Python libraries like Pandas, Requests, and h5py.

Utility Functions

  • save_dataframe_to_h5(): Saves a Pandas DataFrame to an HDF5 file.
  • key_exists_in_h5(): Checks if a key exists in an HDF5 file.
  • print_all_keys_in_h5(): Prints all keys in an HDF5 file.
  • symbol_exists_in_h5(): Checks if a symbol exists in an HDF5 file.
  • convert_columns_to_numeric(): Converts DataFrame columns to numeric types, if applicable.
  • update_dataframe(): Updates the DataFrame with new rows of data.

Main Function: fetch_and_store_fundamentals()

This function performs the core operation of fetching and storing data. It takes an API token, a list of stock symbols, a data directory, and some optional parameters as arguments.

The function goes through the following steps for each symbol:

  1. Check If Data Exists: If the skip_existing flag is true, it checks whether the data already exists in the HDF5 storage.
  2. Fetch Data: Downloads JSON data for the stock symbol from the EOD Historical Data API.
  3. Data Processing: Processes different categories of data (General, Financials, Earnings, etc.) and stores them in separate HDF5 files.
  4. Log Update: Updates a log file with the timestamp of the last fetch for each symbol.

Helper Function: get_symbols()

This function populates a global dictionary, symbol_exchange_map, mapping stock symbols to their respective exchanges. It reads this information from an existing HDF5 file.

Code Execution

Finally, the script fetches a list of stock symbols using get_symbols() and then calls the fetch_and_store_fundamentals() function to perform the data fetching and storing.

Conclusion

Automating the process of financial data collection and storage is a crucial step in building robust trading algorithms or investment strategies. This script serves as a foundational block for such endeavors, allowing you to focus more on data analysis rather than data collection.

Code

from keys import api_token
import time
import h5py


data_dir = '/home/shared/algos/data/'
logs_dir = '/home/shared/algos/eodhd_data/logs/'

import logging
import pandas as pd
# Configure Pandas to display all columns
pd.set_option('display.max_columns', None)
import requests
import io
from pathlib import Path
from tqdm import tqdm
import json
import os
from datetime import datetime, timedelta

from dictionaries_and_lists import homebuilders, sp500, companies_with_treasuries, largest_banks


logging.basicConfig(level=logging.INFO)
pd.set_option('display.expand_frame_repr', False)


symbol_exchange_map = {}

def save_dataframe_to_h5(df, h5_path, key, drop_columns=None):
    convert_columns_to_numeric(df)  # Assuming this function converts numeric columns

    column_type_mapping = {
        'Symbol': 'object',
        'Country': 'object',
        'reportDate': 'datetime',
        'filing_date': 'datetime',
        'date': 'datetime',
        'Date': 'datetime',
        'currency_symbol': 'object',
        'currency': 'object',
        'beforeAfterMarket': 'object'
    }

    for column, dtype in column_type_mapping.items():
        if column in df.columns:
            if dtype == 'datetime':
                df[column] = pd.to_datetime(df[column], errors='coerce')
            else:
                df[column] = df[column].astype(dtype)

    if drop_columns:
        df.drop(columns=drop_columns, errors='ignore', inplace=True)

    try:
        df.to_hdf(h5_path, key=key, mode='a')
    except Exception as e:
        logging.error(f"Failed to save DataFrame to HDF5 file {h5_path} with key {key}. Error: {e}")

def key_exists_in_h5(h5_filename, key):
    with pd.HDFStore(h5_filename, 'r') as store:
        return key in store

def print_all_keys_in_h5(h5_filename):
    with pd.HDFStore(h5_filename, 'r') as store:
        print("Keys in HDF5 file:")
        for key in store.keys():
            print(key)

def symbol_exists_in_h5(h5_filepath, symbol):
    try:
        with pd.HDFStore(h5_filepath, 'r') as store:
            return f'/{symbol}' in store.keys()
    except HDF5Error:
        return False

def convert_columns_to_numeric(df):
    for col in df.columns:
        df[col] = pd.to_numeric(df[col], errors='ignore')


def update_dataframe(df, symbol, sub_category, sub_category_data, time_frame=None):
    row_data = {
        'Symbol': symbol,
        'SubCategory': sub_category,
    }

    if time_frame is not None:
        row_data['TimeFrame'] = time_frame

    if isinstance(sub_category_data, dict):
        row_data.update(sub_category_data)
    else:
        row_data['Data'] = str(sub_category_data)

    new_row = pd.DataFrame([row_data])
    df = pd.concat([df, new_row], ignore_index=True)
    return df


def fetch_and_store_fundamentals(api_token, symbols, data_dir, skip_existing=False, hours_to_skip=72):
    log_file = logs_dir + 'symbol_time_log.json'
    symbol_time_dict = {}

    try:
        with open(log_file, 'r') as f:
            content = f.read()
        symbol_time_dict = json.loads(content)
    except json.JSONDecodeError as e:
        logging.error(
            f"JSON Decode Error at line {e.lineno}, column {e.colno}. Content around error position: {content[e.pos - 10:e.pos + 10]}")
    except FileNotFoundError:
        logging.info(f"File {log_file} not found. An empty dictionary will be used.")
    except Exception as e:
        logging.error(f"An unexpected error occurred: {e}")


    for symbol in tqdm(symbols, desc='Fetching and storing fundamentals'):
        last_downloaded_time = symbol_time_dict.get(symbol, None)
        if last_downloaded_time:
            last_downloaded_time = datetime.fromisoformat(last_downloaded_time)
            time_since_last_download = datetime.now() - last_downloaded_time
            if time_since_last_download < timedelta(hours=hours_to_skip):
                logging.info(f"Data for symbol {symbol} was downloaded recently. Skipping...")
                continue

        if skip_existing:
            # Assuming the symbol is known at this point in your code
            exchange = symbol_exchange_map.get(symbol, '')  # Get exchange name from global dict

            # Create the new h5 path with the exchange name prepended
            h5_path_check = Path(data_dir, f"{exchange}_General.h5") if exchange else Path(data_dir, "General.h5")

            if h5_path_check.exists() and key_exists_in_h5(h5_path_check, f'/{symbol}'):
                logging.info(f"Data for symbol {symbol} already exists. Skipping...")
                continue

        logging.info(f"\n{symbol}: Downloading from EODHD...")

        try:
            url = f"https://eodhd.com/api/fundamentals/{symbol}.US?api_token={api_token}"
            response = requests.get(url)
        except ConnectionError:
            logging.error(f"ConnectionError occurred while fetching data for symbol {symbol}. Retrying in 60 seconds.")
            time.sleep(60)
            continue
        except ConnectionRefusedError:
            logging.error(
                f"ConnectionRefusedError occurred while fetching data for symbol {symbol}. Retrying in 60 seconds.")
            time.sleep(60)
            continue
        except Exception as e:
            logging.error(f"An unexpected error occurred: {e}")
            continue

        if response.status_code != 200:
            logging.error(f"Failed to fetch data for symbol {symbol}. HTTP Status Code: {response.status_code} \n Sleeping for 60 seconds")
            time.sleep(60)  # Sleep for 60 seconds
            continue  # Continue to next iteration

        json_data = response.json()

        logging.info(f"\n{symbol}: Finished downloading from EODHD...")

        # Check if the logging level is set to DEBUG
        if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
            json_dump_file = logs_dir + 'api_response_output.txt'

            with open(json_dump_file, 'w') as f:
                f.write("JSON Data:\n")
                f.write(json.dumps(json_data, indent=4))

        for category, category_data in json_data.items():
            if category_data is None:
                logging.warning(f"Data for category {category} is None.")
                continue

            exchange = symbol_exchange_map.get(symbol, 'Other')
            h5_path = Path(data_dir, f"{exchange}_{category}.h5")
            df = pd.DataFrame()

            if category == 'ESGScores':
                continue

            elif category == 'Holders':
                for holder_type, holder_data in category_data.items():
                    h5_path = Path(data_dir, f"{exchange}_Holders_{holder_type}.h5")

                    for holder_id, holder_info in holder_data.items():
                        df = update_dataframe(df, symbol, holder_type, holder_info)
                    save_dataframe_to_h5(df, h5_path, key=symbol, drop_columns=['SubCategory'])
                    logging.info(f"{symbol} finished processing category Holders")

            elif category == 'SplitsDividends':
                logging.debug(f"Processing 'SplitsDividends' category. Data: {category_data}")

                # h5_path = Path(data_dir, f"{category}.h5")
                for sub_key, sub_data in category_data.items():
                    logging.debug(f"Processing key: {sub_key}")
                    logging.debug(f"Data for key {sub_key}: {sub_data}")

                    if sub_key == 'NumberDividendsByYear':
                        nested_h5_path = Path(data_dir, f"{exchange}_SplitsDividends_{sub_key}.h5")

                        nested_df = pd.DataFrame()

                        for item_key, item_data in sub_data.items():
                            logging.debug(f"Item data: {item_key}")
                            nested_df = update_dataframe(nested_df, symbol, sub_key, item_data)

                        # Sort and remove duplicates based on the Year column
                        # Before sorting, check if 'Year' exists
                        if 'Year' in nested_df.columns:
                            nested_df = nested_df.sort_values(by=['Year'])
                            nested_df.drop_duplicates(subset=['Year'], keep='last', inplace=True)

                        save_dataframe_to_h5(nested_df, nested_h5_path, key=symbol, drop_columns=['SubCategory'])
                    else:
                        df = update_dataframe(df, symbol, sub_key, sub_data)

                save_dataframe_to_h5(df, h5_path, key=symbol)
                logging.info(f"{symbol} finished processing category SplitsDividends")


            elif category == 'General':
                logging.debug(f"Processing 'General' category. Data: {category_data}")
                for sub_key, sub_data in category_data.items():
                    logging.debug(f"Processing key: {sub_key}")
                    logging.debug(f"Data for key {sub_key}: {sub_data}")

                    if sub_key in ['Listings', 'Officers']:
                        continue  # Skip 'Listings' and 'Officers'

                    df = update_dataframe(df, symbol, sub_key, sub_data)


                save_dataframe_to_h5(df, h5_path, key=symbol)
                logging.info(f"{symbol} finished processing category General")


            elif category == 'Financials':
                # Iterate through report types like 'Balance Sheet', 'Cash Flow', 'Income Statements'
                for report_type, report_data in category_data.items():
                    # Iterate through time frames like 'annual' or 'quarterly'
                    for time_frame, time_frame_data in report_data.items():
                        if time_frame == 'currency_symbol':
                            continue  # Skip the 'currency_symbol'

                        # Create a specific .h5 path for each combination of report_type and time_frame
                        h5_path = Path(data_dir, f"{exchange}_{report_type}_{time_frame}.h5")

                        df = pd.DataFrame()

                        # Update the DataFrame with financial data
                        for sub_category, sub_category_data in time_frame_data.items():
                            df = update_dataframe(df, symbol, sub_category, sub_category_data)

                        # Save this specific DataFrame to its respective .h5 file
                        save_dataframe_to_h5(df, h5_path, key=symbol, drop_columns=['SubCategory'])
                        logging.info(f"{symbol} finished processing category Financials")


            elif category == 'Earnings':
                for sub_category, sub_category_data in category_data.items():
                    # Create a specific .h5 path for each sub-category
                    h5_path = Path(data_dir, f"{exchange}_{category}_{sub_category}.h5")

                    df = pd.DataFrame()


                    for date_entry, date_entry_data in sub_category_data.items():  # Iterate through each date entry in the subcategory
                        if date_entry == 'currency_symbol':
                            continue

                        # Adding a date field to each entry
                        date_entry_data['Date'] = date_entry

                        # Update the dataframe with new row
                        df = update_dataframe(df, symbol, sub_category, date_entry_data)

                    # Save this specific DataFrame to its respective .h5 file
                    if 'SubCategory' in df.columns:
                        save_dataframe_to_h5(df, h5_path, key=symbol, drop_columns=['SubCategory'])  # Drop the 'SubCategory' column
                    else:
                        save_dataframe_to_h5(df, h5_path, key=symbol)
                    logging.info(f"{symbol} finished processing category Earnings")


            elif category == 'outstandingShares':
                for time_frame, time_frame_data in category_data.items():  # time_frame can be 'annual' or 'quarterly'
                    # Create a specific .h5 path for each time frame
                    h5_path = Path(data_dir, f"{exchange}_{category}_{time_frame}.h5")

                    df = pd.DataFrame()


                    for entry_id, entry_data in time_frame_data.items():
                        df = update_dataframe(df, symbol, time_frame, entry_data)

                    # Save this specific DataFrame to its respective .h5 file
                    save_dataframe_to_h5(df, h5_path, key=symbol, drop_columns=['SubCategory'])
                    logging.info(f"{symbol} finished processing category outstandingShares")


            elif category == 'ETF_Data':
                # DataFrame for the top-level ETF_Data, excluding subcategories that will be handled separately
                top_level_df = pd.DataFrame()

                # Dictionary to hold top-level data for this stock symbol
                top_level_data = {'Symbol': symbol}

                for sub_category, sub_category_data in category_data.items():
                    if sub_category in ['Asset_Allocation', 'World_Regions', 'Sector_Weights', 'Fixed_Income', 'Top_10_Holdings', 'Holdings', 'Valuations_Growth', 'Market_Capitalisation', 'MorningStar', 'Performance', ]:
                        # Create a specific .h5 path for each sub-category
                        h5_path = Path(data_dir, f"{exchange}_ETF_{sub_category}.h5")

                        df = pd.DataFrame()

                        # Update the DataFrame with sub-category data
                        for item_key, item_data in sub_category_data.items():
                            df = update_dataframe(df, symbol, item_key, item_data)

                        # Save this specific DataFrame to its respective .h5 file
                        save_dataframe_to_h5(df, h5_path, key=symbol)

                    else:
                        # Populate the top-level data dictionary
                        top_level_data[sub_category] = sub_category_data

                # Convert the top-level data dictionary to a DataFrame and append it to top_level_df
                new_row = pd.DataFrame([top_level_data])
                top_level_df = pd.concat([top_level_df, new_row], ignore_index=True)

                # Save the top-level ETF_Data DataFrame to its respective .h5 file
                top_level_h5_path = Path(data_dir, "ETF_Data.h5")
                save_dataframe_to_h5(top_level_df, top_level_h5_path, key=symbol)
                logging.info(f"{symbol} finished processing category ETF_Data")

            else:
                logging.debug(f'Processing other category {category}')

                if h5_path.exists() and symbol_exists_in_h5(h5_path, symbol):
                    df = pd.read_hdf(h5_path, key=symbol)
                else:
                    df = pd.DataFrame()

                if isinstance(category_data, dict):
                    for sub_category, sub_category_data in category_data.items():
                        df = update_dataframe(df, symbol, sub_category, sub_category_data)
                else:
                    df = update_dataframe(df, symbol, category, category_data)

                save_dataframe_to_h5(df, h5_path, key=symbol)
                logging.info(f"{symbol} finished processing category {category}")

        logging.info(f"{symbol} updating symbol status log file.")
        symbol_time_dict[symbol] = datetime.now().isoformat()
        with open(log_file, 'w') as f:
            json.dump(symbol_time_dict, f)

        logging.info(f"{symbol} finished processing")


def get_symbols(h5_file_path, key='US'):
    """
    Open an HDF5 file and populate the global dictionary symbol_exchange_map
    where the symbol is the key and the exchange is the value.

    Parameters:
        h5_file_path (str): The path to the HDF5 file.
        key (str): The key to use when reading the HDF5 file. Default is 'US'.

    Returns:
        None
    """

    h5_file_path = Path(h5_file_path)

    # Check if the file exists
    if not h5_file_path.exists():
        logging.info(f"The file {h5_file_path} does not exist.")
        return

    try:
        # Read the DataFrame from the HDF5 file
        df = pd.read_hdf(h5_file_path, key=key)

        # Check if 'Code' and 'Exchange' columns exist
        if 'Code' not in df.columns or 'Exchange' not in df.columns:
            logging.info(f"The 'Code' or 'Exchange' column does not exist in the DataFrame.")
            return

        # Populate the global symbol_exchange_map
        global symbol_exchange_map
        symbol_exchange_map = dict(zip(df['Code'], df['Exchange']))
        return list(symbol_exchange_map.keys())
    except Exception as e:
        logging.error(f"An error occurred: {e}")
        return


symbols = get_symbols(data_dir + 'symbols.h5', key='US')

fetch_and_store_fundamentals(api_token, symbols, data_dir, skip_existing=True, hours_to_skip=72)

Leave a Reply