• Building a DIY Powerwall to offset peak demand cost PT2, sizing the battery

    In the first part of this series, we discussed why building a DIY Powerwall is a smart move to offset peak demand costs. In this part, we’ll focus on how to determine the appropriate battery size for your needs using historical usage data from your SRP bills. The Python program below will output these charts and recommendations.

    Output

    The max you used during any 2-8 PM period is 58.90 kWh on 2021-09-05. Based on historical usage, this battery size would eliminate your electricity usage during peak hours.

    The average you used during all 2-8 PM periods is 10.92 kWh. Based on historical usage, a battery of this size would eliminate your peak electricity usage on average.

    If you did not have solar, you would need a battery with a size of 87.20 kWh to eliminate the maximum demand during peak hours. This maximum was recorded on 2021-09-05.

    If you did not have solar, you would need a battery with a size of 39.04 kWh to eliminate the average demand during peak hours.

    If you did not have solar, you would need a battery with a size of 232.80 kWh to power your house for a full day based on the maximum daily consumption. This maximum was recorded on 2021-08-03.

    If you did not have solar, you would need a battery with a size of 116.08 kWh to power your house for a full day based on the average daily consumption.

    Because you have solar, this reduces your battery size by 32.45% to eliminate all your peak energy.

    Because you have solar, this reduces your battery size by 72.02% to eliminate on average all of your peak energy.

    Collecting your data

    To use this program, you need to download your hourly usage data from SRP:

    1. Log in to Your SRP Account:
      • Navigate to the usage section.
      • Select hourly usage and choose your date range.
      • Export the data to Excel for the categories: net energy, generation, and usage. If you do not have solar, you may only have usage data, which is fine.
    2. Data Export:
      • Download the data for May through October to focus on summer consumption months, as winter usage without AC is significantly lower.
      • Export data for multiple past years to allow the program to determine maximum and average values accurately.



    You should now have some files that look like this.

    With your data files ready, you can now run the Python program:

    Time to buy the equipment

    Now that you know your required battery size, you can begin shopping for components and building your DIY Powerwall. Stay tuned for the next part, where we’ll dive into the construction process!

  • Building a DIY Powerwall to offset peak demand cost PT1, why?

    Recently, I decided to embark on a DIY Powerwall project for my house. The first challenge I faced was determining the appropriate size for the system. Living in Arizona, where APS and SRP are the primary utilities, my goal was to eliminate or shift peak demand. Specifically, I aimed to build a battery that discharges between the peak hours of 2 PM to 8 PM and recharges at night when energy is less expensive.

    Why Build a DIY Powerwall?


    The primary reason for this project is that demand charges and peak energy costs are incredibly expensive. SRP’s peak demand charges are calculated by breaking down your entire month into 30-minute increments. They then determine the highest 30-minute increment of kW usage for that month and apply it to the chart below to determine your peak demand cost. This cost is a one-time charge applied to your monthly bill but can be significant.

    In addition to peak demand costs, you still have to pay per kWh for on-peak and off-peak usage. Here is the breakdown of SRP’s E27 plan:


    You can see the prices above are a lot cheaper than SRP’s traditional plan

    • Winter 42% cheaper
    • Summer 58% cheaper
    • Summer peak 56% cheaper

    Understanding the Costs

    The prices on the E27 plan are significantly lower than SRP’s traditional plan. However, to achieve these savings, you need to shift your electricity usage to off-peak hours. If you don’t offset your 2 PM to 8 PM usage on this plan, you will incur substantial demand charges, which can easily exceed $1,000 in a single month. This highlights the importance of avoiding peak demand charges to benefit from cheaper electricity rates.

    The Goal

    Our objective is to eliminate electricity usage from 2 PM to 8 PM by discharging a battery and then recharging it at night. The next step is to determine how much energy I have historically used during these hours.

    In part two of this article, I will share a Python program I wrote that analyzes all your SRP hourly data to recommend an appropriate battery size.


  • Automating notification updates on court cases for Maricopa County Superior Court

    Recently, I needed real-time updates for a court case affecting a house I was purchasing. Instead of constantly checking the case manually, I wrote a Python script to automate this process. The script monitors court case information, party details, case documents, and the case calendar. Whenever it detects an update, it sends an email notification.

    How It Works

    The script downloads the latest court case information and saves it to the ./data/ directory in the following CSV formats:

    • {case_number}_case_information.csv
    • {case_number}_party_information.csv
    • {case_number}_case_documents.csv
    • {case_number}_case_calendar.csv

    If there are new updates, it emails the specific rows with the changes.


    {case_number}_case_information.csv
    {case_number}_party_information.csv
    {case_number}_case_documents.csv
    {case_number}_case_calendar.csv

    Gmail JSON file


    To send emails, you need a JSON file to access your Gmail account. Follow these steps:

    1. Go to the Google Cloud Console:
    2. Create a New Project (if you don’t already have one):
      • Click on the project dropdown at the top and select “New Project.”
      • Enter a name for your project and click “Create.”
    3. Enable the Gmail API:
      • In the Google Cloud Console, go to the “APIs & Services” dashboard.
      • Click on “Enable APIs and Services.”
      • Search for “Gmail API” and click on it.
      • Click “Enable” to enable the Gmail API for your project.
    4. Create Credentials:
      • Go to the “Credentials” tab on the left sidebar.
      • Click on “Create Credentials” and select “OAuth 2.0 Client ID.”
      • You may be prompted to configure the OAuth consent screen. If so, follow the instructions to set up the consent screen (you can fill in minimal information for internal use).
      • Choose “Desktop app” for the application type and give it a name.
      • Click “Create.”
    5. Download the JSON File:
      • After creating the credentials, you will see a section with your newly created OAuth 2.0 Client ID.
      • Click the “Download” button to download the JSON file containing your client secrets.
      • Rename this file to credentials.json and place it in the same directory as your script or specify the correct path in your script.

    Code

    import os
    import pickle
    import base64
    from google_auth_oauthlib.flow import InstalledAppFlow
    from googleapiclient.discovery import build
    from googleapiclient.errors import HttpError
    from google.auth.transport.requests import Request
    import requests
    from bs4 import BeautifulSoup
    import logging
    import csv
    import pandas as pd
    from email.mime.text import MIMEText
    
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
    
    # If modifying these SCOPES, delete the file token.pickle.
    SCOPES = ['https://www.googleapis.com/auth/gmail.send']
    
    def get_credentials():
        creds = None
        if os.path.exists('/home/shared/algos/monitor_court_case/token.pickle'):
            with open('/home/shared/algos/monitor_court_case/token.pickle', 'rb') as token:
                creds = pickle.load(token)
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                try:
                    creds.refresh(Request())
                except Exception as e:
                    if os.path.exists('/home/shared/algos/monitor_court_case/token.pickle'):
                        os.remove('/home/shared/algos/monitor_court_case/token.pickle')
                    flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
                    creds = flow.run_local_server(port=0)
            else:
                flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
                creds = flow.run_local_server(port=0)
            with open('/home/shared/algos/monitor_court_case/token.pickle', 'wb') as token:
                pickle.dump(creds, token)
        return creds
    
    
    def send_email(subject, body, recipients):
        creds = get_credentials()
        service = build('gmail', 'v1', credentials=creds)
    
        to_addresses = ", ".join(recipients)
        message = {
            'raw': base64.urlsafe_b64encode(
                f"From: me\nTo: {to_addresses}\nSubject: {subject}\nContent-Type: text/html; charset=UTF-8\n\n{body}".encode(
                    "utf-8")).decode("utf-8")
        }
    
        try:
            message = (service.users().messages().send(userId="me", body=message).execute())
            logging.info(f"Message Id: {message['id']}")
            return message
        except HttpError as error:
            logging.error(f'An error occurred: {error}')
            return None
    
    def create_html_table(df):
        return df.to_html(index=False)
    
    def fetch_webpage(url):
        response = requests.get(url)
        response.raise_for_status()
        soup = BeautifulSoup(response.content, 'html.parser')
        return soup
    
    def parse_case_information(soup):
        div = soup.find('div', id='tblForms')
        rows = []
        if div:
            headers = ["Case Number", "Judge", "File Date", "Location", "Case Type"]
            rows.append(headers)
            case_info = {}
            data_divs = div.find_all('div', class_='row g-0 py-0')
            for data_div in data_divs:
                key_elements = data_div.find_all('div', class_='col-4 col-lg-3 bold-font')
                value_elements = data_div.find_all('div', class_='col-8 col-lg-3')
                for key_elem, value_elem in zip(key_elements, value_elements):
                    key = key_elem.get_text(strip=True).replace(':', '')
                    value = value_elem.get_text(strip=True)
                    case_info[key] = value
            rows.append([
                case_info.get("Case Number", ""),
                case_info.get("Judge", ""),
                case_info.get("File Date", ""),
                case_info.get("Location", ""),
                case_info.get("Case Type", "")
            ])
        return rows
    
    def parse_party_information(soup):
        div = soup.find('div', class_='zebraRowTable grid-um')
        rows = []
        if div:
            headers = ["Party Name", "Relationship", "Sex", "Attorney"]
            rows.append(headers)
            data_divs = div.find_all('div', class_='row g-0 pt-3 pb-3', id='tblForms2')
            for data_div in data_divs:
                party_name = data_div.find('div', class_='col-8 col-lg-4').get_text(strip=True)
                relationship = data_div.find('div', class_='col-8 col-lg-3').get_text(strip=True)
                sex = data_div.find('div', class_='col-8 col-lg-2').get_text(strip=True)
                attorney = data_div.find('div', class_='col-8 col-lg-3').get_text(strip=True)
                cols = [party_name, relationship, sex, attorney]
                logging.debug(f"Parsed row: {cols}")
                rows.append(cols)
        return rows
    
    def parse_case_documents(soup):
        div = soup.find('div', id='tblForms3')
        rows = []
        if div:
            headers = ["Filing Date", "Description", "Docket Date", "Filing Party", "Note"]
            rows.append(headers)
            data_divs = div.find_all('div', class_='row g-0')
            for data_div in data_divs:
                cols = [col.get_text(strip=True) for col in
                        data_div.find_all('div', class_=['col-lg-2', 'col-lg-5', 'col-lg-3'])]
    
                note_text = ""
                note_div = data_div.find('div', class_='col-8 pl-3 emphasis')
                if note_div:
                    note_text = note_div.get_text(strip=True).replace("NOTE:", "").strip()
    
                if len(cols) >= 5:
                    cols[4] = note_text  # Update the existing 5th column (Note)
                else:
                    cols.append(note_text)  # Append note text if columns are less than 5
    
                if any(cols):
                    rows.append(cols)
        return rows
    
    def parse_case_calendar(soup):
        div = soup.find('div', id='tblForms4')
        rows = []
        if div:
            headers = ["Date", "Time", "Event"]
            rows.append(headers)
            data_divs = div.find_all('div', class_='row g-0')
            current_row = []
            for data_div in data_divs:
                cols = [col.get_text(strip=True) for col in data_div.find_all('div', class_=['col-lg-2', 'col-lg-8'])]
                if cols:
                    current_row.extend(cols)
                    if len(current_row) == 3:
                        rows.append(current_row)
                        current_row = []
        return rows
    
    def save_table_to_csv(table, csv_filename):
        if table:
            with open(csv_filename, 'w', newline='') as csvfile:
                writer = csv.writer(csvfile)
                for row in table:
                    writer.writerow(row)
            logging.info(f"Table saved to {csv_filename}")
        else:
            logging.warning(f"No data found to save in {csv_filename}")
    
    
    def compare_dates_and_save_table(new_table, csv_filename, date_column, recipients):
        if not os.path.exists(csv_filename):
            logging.info(f"{csv_filename} does not exist. Creating new file.")
            save_table_to_csv(new_table, csv_filename)
            return
    
        old_table_df = pd.read_csv(csv_filename)
        new_table_df = pd.DataFrame(new_table[1:], columns=new_table[0])
    
        old_dates = set(old_table_df[date_column])
        new_dates = set(new_table_df[date_column])
    
        new_rows = new_table_df[new_table_df[date_column].isin(new_dates - old_dates)]
    
        if not new_rows.empty:
            logging.info(f"Changes detected in {csv_filename}. Updating file.")
            logging.info(f'Here are the new rows \n {new_rows} \n Here is the new table \n {new_table_df}')
            logging.info(f"New rows:\n{new_rows.to_string(index=False)}")
            html_table = create_html_table(new_rows)
            email_body = f"""
            

    Changes detected in court case {case_number}

    {html_table} """ send_email(f"Changes detected in court case {case_number}", email_body, recipients) save_table_to_csv(new_table, csv_filename) else: logging.info(f"No changes detected in {csv_filename}") if __name__ == "__main__": case_number = 'FN2020-096853' url = f"https://www.superiorcourt.maricopa.gov/docket/FamilyCourtCases/caseInfo.asp?caseNumber={case_number}" data_dir = './data' os.makedirs(data_dir, exist_ok=True) recipients = ['email@email.com', 'email@email.com', 'email@email.com'] # Add the recipient email addresses here soup = fetch_webpage(url) case_information_table = parse_case_information(soup) party_information_table = parse_party_information(soup) case_documents_table = parse_case_documents(soup) case_calendar_table = parse_case_calendar(soup) save_table_to_csv(case_information_table, os.path.join(data_dir, f"/home/shared/algos/monitor_court_case/data/{case_number}_case_information.csv")) save_table_to_csv(party_information_table, os.path.join(data_dir, f"/home/shared/algos/monitor_court_case/data/{case_number}_party_information.csv")) compare_dates_and_save_table(case_documents_table, os.path.join(data_dir, f"/home/shared/algos/monitor_court_case/data/{case_number}_case_documents.csv"), "Filing Date", recipients) compare_dates_and_save_table(case_calendar_table, os.path.join(data_dir, f"/home/shared/algos/monitor_court_case/data/{case_number}_case_calendar.csv"), "Date", recipients)
  • Automating FinCEN Beneficial Ownership Information Reporting for Multiple Organizations

    The Financial Crimes Enforcement Network (FinCEN) now requires most businesses to report their beneficial owners, a mandate stemming from the U.S. Corporate Transparency Act. The penalties for non-compliance are exceptionally harsh. Anyone willfully violating the reporting requirements could face penalties of up to $500 for each day of continued violation, with criminal penalties including up to two years of imprisonment and fines up to $10,000. All the FAQs for BOIR reporting can be found here.

    Fortunately, if you only have a few companies, filling out this form should take no longer than 15 minutes. You can access the online filing system here. However, if you have hundreds of LLCs, this process can quickly become overwhelming, potentially taking days to complete multiple forms. To address this, I’ve developed a code to automate the process. If all your companies have the same beneficial owners, this script will streamline the reporting process. However, you will still need to verify your humanity and the information filled out when you reach the CAPTCHA at the end.

    Using Selenium in Python to Navigate FinCEN

    These lines need to be modified with your information

    last_name_field.send_keys("Last Name")
    first_name_field.send_keys("First Name")
    dob_field.send_keys("01/01/1982")
    address_field.send_keys("Address field")
    city_field.send_keys("City")
    fill_dropdown(driver, country_field, "United States of America")
    fill_dropdown(driver, state_field, "Arizona")
    zip_field.send_keys("00000")
    id_number_field.send_keys("Drivers license number")
    fill_dropdown(driver, id_country_field, "United States of America")
    fill_dropdown(driver, id_state_field, "Arizona")
    id_file_field.send_keys("/full/path/of/ID.jpeg")
    last_name_field_2.send_keys("Beneficial Owner #2 Last Name")
    first_name_field_2.send_keys("Beneficial Owner #2 First Name")
    dob_field_2.send_keys("Beneficial Owner #2 DOB")
    address_field_2.send_keys("Beneficial Owner #2 Address")
    city_field_2.send_keys("Beneficial Owner #2 City")
    fill_dropdown(driver, state_field_2, "Beneficial Owner #2 State")
    zip_field_2.send_keys("Beneficial Owner #2 ZIP")
    fill_dropdown(driver, id_type_field_2, "State/local/tribe-issued ID")
    id_number_field_2.send_keys("Beneficial Owner #2 ID number")
    fill_dropdown(driver, id_country_field_2, "United States of America")
    fill_dropdown(driver, id_state_field_2, "Arizona")
    id_file_field_2.send_keys("beneficial/owner2/full/path/of/ID.jpeg")
    email_field.send_keys("submitter email")
    confirm_email_field.send_keys("submitter email confirm")
    first_name_field.send_keys("submitter first name")
    last_name_field.send_keys("submitter last name")
    fill_field(driver, By.ID, 'rc.jurisdiction', 'reporting company country)
    fill_field(driver, By.ID, 'rc.domesticState', 'reporting company state')
    fill_field(driver, By.ID, 'rc.address.value', 'reporting company address')
    fill_field(driver, By.ID, 'rc.city.value', 'reporting company city')
    fill_field(driver, By.ID, 'rc.country.value', 'reporting company country')
    fill_field(driver, By.ID, 'rc.state.value', 'reporting company state')
    fill_field(driver, By.ID, 'rc.zip.value', 'reporting company zip')
    #should only be true if company was established before 2024    
    click_element(driver, By.CSS_SELECTOR, 'label[for="rc.isExistingReportingCompany"]', use_js=True)
    
    
    

    Code

    import logging
    import time
    from selenium import webdriver
    from selenium.webdriver.chrome.service import Service
    from selenium.webdriver.common.by import By
    from selenium.webdriver.common.keys import Keys
    from selenium.webdriver.support.ui import WebDriverWait
    from selenium.webdriver.support import expected_conditions as EC
    from webdriver_manager.chrome import ChromeDriverManager
    from selenium.common.exceptions import WebDriverException, UnexpectedAlertPresentException, NoAlertPresentException
    import os
    import signal
    
    import csv
    import pandas as pd
    
    # Configure logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    
    def read_companies_from_csv(file_path):
        df = pd.read_csv(file_path, dtype={'EIN': str, 'SSN/ITIN': str})
        return df
    
    def update_csv(file_path, df):
        df.to_csv(file_path, index=False)
    
    def mark_company_complete(df, company_name):
        df.loc[df['LLC'] == company_name, 'complete'] = 'Yes'
    
    def wait_for_element(driver, by, value, timeout=10):
        try:
            element = WebDriverWait(driver, timeout).until(
                EC.presence_of_element_located((by, value))
            )
            return element
        except Exception as e:
            logger.error(f"Error waiting for element {value}: {e}")
            return None
    
    
    def click_element(driver, by, value, use_js=False):
        element = wait_for_element(driver, by, value)
        if element:
            try:
                if use_js:
                    driver.execute_script("arguments[0].click();", element)
                else:
                    element.click()
                logger.info(f"Clicked element: {value}")
            except Exception as e:
                logger.error(f"Error clicking element {value}: {e}")
        else:
            logger.error(f"Element not found to click: {value}")
    
    
    def fill_field(driver, by, value, text):
        element = wait_for_element(driver, by, value)
        if element:
            try:
                element.clear()
                element.send_keys(text)
                element.send_keys(Keys.RETURN)
                logger.info(f"Filled field {value} with text: {text}")
            except Exception as e:
                logger.error(f"Error filling field {value}: {e}")
        else:
            logger.error(f"Field not found to fill: {value}")
    
    
    def click_yes_button(driver):
        try:
            WebDriverWait(driver, 10).until(
                EC.presence_of_element_located((By.CSS_SELECTOR, 'button[data-testid="modal-confirm-button"]'))
            )
            actions = webdriver.ActionChains(driver)
            actions.send_keys(Keys.TAB * 3)  # Adjust the number of TAB presses as needed
            actions.send_keys(Keys.ENTER)
            actions.perform()
            logger.info("Yes button on popup clicked using TAB and ENTER keys.")
        except Exception as e:
            logger.error(f"Error clicking the Yes button: {e}")
    
    def fill_dropdown(driver, element, value):
        driver.execute_script("arguments[0].scrollIntoView(true);", element)
        driver.execute_script("arguments[0].click();", element)
        element.send_keys(value)
        element.send_keys(Keys.ENTER)
    
    def fill_beneficial_owners(driver):
        wait = WebDriverWait(driver, 10)
    
        last_name_field = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="bo[0].lastName.value"]')))
        last_name_field.send_keys("Last Name")
        logger.info("Last name field filled.")
    
        first_name_field = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="bo[0].firstName.value"]')))
        first_name_field.send_keys("First Name")
        logger.info("First name field filled.")
    
        dob_field = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="bo[0].dob.value"]')))
        dob_field.send_keys("01/01/1982")
        logger.info("Date of birth field filled.")
    
        address_field = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="bo[0].address.value"]')))
        address_field.send_keys("Address field")
        logger.info("Address field filled.")
    
        city_field = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="bo[0].city.value"]')))
        city_field.send_keys("City")
        logger.info("City field filled.")
    
        country_field = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="bo[0].country.value"]')))
        fill_dropdown(driver, country_field, "United States of America")
        logger.info("Country field filled.")
    
        state_field = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="bo[0].state.value"]')))
        fill_dropdown(driver, state_field, "Arizona")
        logger.info("State field filled.")
    
        zip_field = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="bo[0].zip.value"]')))
        zip_field.send_keys("00000")
        logger.info("ZIP code field filled.")
    
        id_type_field = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="bo[0].identification.type.value"]')))
        fill_dropdown(driver, id_type_field, "State/local/tribe-issued ID")
        logger.info("ID type field filled.")
    
        id_number_field = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="bo[0].identification.id.value"]')))
        id_number_field.send_keys("Drivers license number")
        logger.info("ID number field filled.")
    
        id_country_field = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="bo[0].identification.jurisdiction.value"]')))
        fill_dropdown(driver, id_country_field, "United States of America")
        logger.info("ID country field filled.")
    
        id_state_field = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="bo[0].identification.state.value"]')))
        fill_dropdown(driver, id_state_field, "Arizona")
        logger.info("ID state field filled.")
    
        id_file_field = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="bo[0].identification.image.value"]')))
        id_file_field.send_keys("/full/path/of/ID.jpeg")
        logger.info("ID file uploaded.")
    
        add_bo_button = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, 'button[data-testid="bo-add-button"]')))
        driver.execute_script("arguments[0].scrollIntoView(true);", add_bo_button)
        driver.execute_script("arguments[0].click();", add_bo_button)
        logger.info("Add Beneficial Owner button clicked.")
    
        # Fill out the details for the second beneficial owner
        last_name_field_2 = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="bo[1].lastName.value"]')))
        last_name_field_2.send_keys("Beneficial Owner #2 Last Name")
        logger.info("Last name field for second beneficial owner filled.")
    
        first_name_field_2 = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="bo[1].firstName.value"]')))
        first_name_field_2.send_keys("Beneficial Owner #2 First Name")
        logger.info("First name field for second beneficial owner filled.")
    
        dob_field_2 = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="bo[1].dob.value"]')))
        dob_field_2.send_keys("Beneficial Owner #2 DOB")
        logger.info("Date of birth field for second beneficial owner filled.")
    
        address_field_2 = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="bo[1].address.value"]')))
        address_field_2.send_keys("Beneficial Owner #2 Address")
        logger.info("Address field for second beneficial owner filled.")
    
        city_field_2 = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="bo[1].city.value"]')))
        city_field_2.send_keys("Beneficial Owner #2 City")
        logger.info("City field for second beneficial owner filled.")
    
        country_field_2 = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="bo[1].country.value"]')))
        fill_dropdown(driver, country_field_2, "United States of America")
        logger.info("Country field for second beneficial owner filled.")
    
        state_field_2 = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="bo[1].state.value"]')))
        fill_dropdown(driver, state_field_2, "Beneficial Owner #2 State")
        logger.info("State field for second beneficial owner filled.")
    
        zip_field_2 = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="bo[1].zip.value"]')))
        zip_field_2.send_keys("Beneficial Owner #2 ZIP")
        logger.info("ZIP code field for second beneficial owner filled.")
    
        id_type_field_2 = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="bo[1].identification.type.value"]')))
        fill_dropdown(driver, id_type_field_2, "State/local/tribe-issued ID")
        logger.info("ID type field for second beneficial owner filled.")
    
        id_number_field_2 = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="bo[1].identification.id.value"]')))
        id_number_field_2.send_keys("Beneficial Owner #2 ID number")
        logger.info("ID number field for second beneficial owner filled.")
    
        id_country_field_2 = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="bo[1].identification.jurisdiction.value"]')))
        fill_dropdown(driver, id_country_field_2, "United States of America")
        logger.info("ID country field for second beneficial owner filled.")
    
        id_state_field_2 = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="bo[1].identification.state.value"]')))
        fill_dropdown(driver, id_state_field_2, "Arizona")
        logger.info("ID state field for second beneficial owner filled.")
    
        id_file_field_2 = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="bo[1].identification.image.value"]')))
        id_file_field_2.send_keys("beneficial/owner2/full/path/of/ID.jpeg")
        logger.info("ID file uploaded.")
    
        # Click the Next button to proceed to the submission page
        next_button = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, 'button[data-testid="bottom-next-button"]')))
        driver.execute_script("arguments[0].scrollIntoView(true);", next_button)
        driver.execute_script("arguments[0].click();", next_button)
        logger.info("Next button clicked to proceed to submission page.")
    
        logger.info("Beneficial owners' information filled out successfully.")
    
    
    
    def fill_submit_page(driver):
        wait = WebDriverWait(driver, 10)
    
    
        # Fill out the email field
        email_field = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="email"]')))
        email_field.send_keys("submitter email")
        logger.info("Email field filled.")
    
        # Fill out the confirm email field
        confirm_email_field = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="confirmEmail"]')))
        confirm_email_field.send_keys("submitter email confirm")
        logger.info("Confirm email field filled.")
    
        # Fill out the first name field
        first_name_field = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="firstName"]')))
        first_name_field.send_keys("submitter first name")
        logger.info("First name field filled.")
    
        # Fill out the last name field
        last_name_field = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '[id="lastName"]')))
        last_name_field.send_keys("submitter last name")
        logger.info("Last name field filled.")
    
        # Check the "I agree" checkbox
        agree_checkbox_label = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, 'label[for="certifyCB"]')))
        driver.execute_script("arguments[0].scrollIntoView(true);", agree_checkbox_label)
        driver.execute_script("arguments[0].click();", agree_checkbox_label)
        logger.info("I agree checkbox selected.")
    
    
    
    def fill_filing_information(driver):
        # Agree to the terms
        click_element(driver, By.CSS_SELECTOR, 'button[data-testid="agree-button"]')
        time.sleep(1)
    
        # Initial report radio button
        click_element(driver, By.CSS_SELECTOR, 'label[for="fi.filingType.value1"]')
    
        # Next button
        click_element(driver, By.CSS_SELECTOR, 'button[data-testid="bottom-next-button"]', use_js=True)
    
    
    
    def fill_reporting_company(driver, llc, ein, ssn_itin):
        wait = WebDriverWait(driver, 10)
    
        # Fill out the reporting company information
        fill_field(driver, By.ID, 'rc.legalName', llc)
    
        if pd.notna(ein):
            fill_field(driver, By.ID, 'rc.taxType', 'EIN')
            fill_field(driver, By.ID, 'rc.taxId', ein)
        elif pd.notna(ssn_itin):
            fill_field(driver, By.ID, 'rc.taxType', 'SSN/ITIN')
            fill_field(driver, By.ID, 'rc.taxId', ssn_itin)
    
    
        fill_field(driver, By.ID, 'rc.jurisdiction', 'reporting company country)
        fill_field(driver, By.ID, 'rc.domesticState', 'reporting company state')
        fill_field(driver, By.ID, 'rc.address.value', 'reporting company address')
        fill_field(driver, By.ID, 'rc.city.value', 'reporting company city')
        fill_field(driver, By.ID, 'rc.country.value', 'reporting company country')
        fill_field(driver, By.ID, 'rc.state.value', 'reporting company state')
        fill_field(driver, By.ID, 'rc.zip.value', 'reporting company zip')
    
        # Request to receive FinCEN ID
        click_element(driver, By.CSS_SELECTOR, 'label[for="rc.isRequestingId"]', use_js=True)
        logger.info("Request to receive FinCEN ID checkbox selected.")
    
        # Final Next button
        click_element(driver, By.CSS_SELECTOR, 'button[data-testid="bottom-next-button"]', use_js=True)
    
    
    def fill_company_applicants(driver):
        # Select existing reporting company
        click_element(driver, By.CSS_SELECTOR, 'label[for="rc.isExistingReportingCompany"]', use_js=True)
    
        # Click the "Yes" button on the popup using TAB and ENTER keys
        click_yes_button(driver)
    
        # Click the next button to go to the beneficial owners page
        click_element(driver, By.CSS_SELECTOR, 'button[data-testid="bottom-next-button"]', use_js=True)
    
    
    def is_browser_open(driver):
        try:
            driver.title  # Attempt to get the browser's title to check if it's still open
            return True
        except UnexpectedAlertPresentException:
            try:
                alert = driver.switch_to.alert
                alert.accept()
                return True
            except NoAlertPresentException:
                return True
        except WebDriverException:
            return False
    
    def close_browser(driver):
        try:
            driver.quit()
        except WebDriverException:
            # Forcefully kill the process if quit() fails
            browser_pid = driver.service.process.pid
            os.kill(browser_pid, signal.SIGTERM)
    
    
    def main():
        # Load companies from CSV
        companies_df = read_companies_from_csv('./data/companies.csv')
    
        for index, row in companies_df.iterrows():
            if pd.notna(row['complete']):
                continue
    
            llc = row['LLC']
            ein = row['EIN']
            ssn_itin = row['SSN/ITIN']
    
            # Setup Chrome driver
            service = Service(ChromeDriverManager().install())
            options = webdriver.ChromeOptions()
            options.add_argument("--start-maximized")  # Add this line to open Chrome in a maximized window
            driver = webdriver.Chrome(service=service, options=options)
    
            try:
                # Open the website
                driver.get("https://boiefiling.fincen.gov/boir/html")
                logger.info(f"Website opened successfully for {llc}.")
                # Fill filing information
                fill_filing_information(driver)
    
                # Fill reporting company information
                fill_reporting_company(driver, llc, ein, ssn_itin)
    
                # Fill company applicants
                fill_company_applicants(driver)
    
                # Fill beneficial owners
                fill_beneficial_owners(driver)
    
                # Fill submission page
                fill_submit_page(driver)
    
                # Mark the company as complete in the DataFrame
                companies_df.at[index, 'complete'] = 'yes'
    
                # Save the updated CSV
                update_csv('./data/companies.csv', companies_df)
                logger.info(f"Form completed for {llc} and marked as complete in CSV.")
    
                input('Press enter for next company')
    
            finally:
                close_browser(driver)
    
    if __name__ == "__main__":
        main()

    Supporting CSV

    You’ll need to create a subdirectory wherever you run this program labeled ./data in this folder you can put the JPEG’s of your ID’s which must be uploaded as well as a CSV file with all of your different company names (EIN or SSN depending on how they’re setup) and a column labeled ‘complete’. The script will go through every single company automating your information but just changing these fields for each organization you own. Here is a sample of the CSV file.

    BOIR E-filing API FAILED

    I originally attempted to get this code to use the API provided by FinCEN you can request access here. I could not get this to format correctly based on their requirements.

    THIS CODE IS NOT COMPLETE

    import requests
    import pandas as pd
    import base64
    import logging
    import xml.etree.ElementTree as ET
    from xml.dom import minidom
    import os
    import re
    import api_keys
    import csv
    import mimetypes
    from lxml import etree
    import time
    import xmlschema
    
    # Setup logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    # API endpoints
    AUTH_URL = 'https://iam.fincen.gov/am/oauth2/realms/root/realms/Finance/access_token'
    SANDBOX_URL = 'https://boiefiling-api.user-test.fincen.gov/preprod'
    PRODUCTION_URL = 'https://boiefiling-api.fincen.gov/prod'
    
    
    def initialize_company_status_file(csv_path):
        with open('company_status.csv', 'w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(['Company Name', 'Process ID', 'Status'])
    
        logger.info("Company status file initialized.")
    
    
    def check_file_exists(file_path):
        if not os.path.exists(file_path):
            logger.error(f"File not found: {file_path}")
            return False
        return True
    
    
    def get_access_token(environment):
        """Retrieve an access token."""
        logger.info("Getting access token...")
        if environment == 'sandbox':
            client_id = api_keys.SANDBOX_CLIENT_ID
            client_secret = api_keys.SANDBOX_CLIENT_SECRET
            scope = 'BOSS-EFILE-SANDBOX'
        else:
            client_id = api_keys.PRODUCTION_CLIENT_ID
            client_secret = api_keys.PRODUCTION_CLIENT_SECRET
            scope = 'BOSS-EFILE'
    
        credentials = f"{client_id}:{client_secret}"
        encoded_credentials = base64.b64encode(credentials.encode()).decode()
    
        headers = {
            'Authorization': f'Basic {encoded_credentials}',
            'Content-Type': 'application/x-www-form-urlencoded'
        }
        data = {
            'grant_type': 'client_credentials',
            'scope': scope
        }
    
        response = requests.post(AUTH_URL, headers=headers, data=data)
        response.raise_for_status()
    
        return response.json().get('access_token')
    
    
    def validate_attachment(file_path):
        """Validate the identifying document image attachment."""
        valid_extensions = ['.jpeg', '.jpg', '.png', '.pdf']
        file_size = os.path.getsize(file_path)
        file_extension = os.path.splitext(file_path)[1].lower()
    
        if file_extension not in valid_extensions:
            raise ValueError(f"Invalid file format: {file_extension}. Supported formats are JPEG, JPG, PNG, PDF.")
    
        if file_size > 4 * 1024 * 1024:
            raise ValueError(f"File size exceeds the 4MB limit: {file_size} bytes.")
    
        filename = os.path.basename(file_path)
        if re.search(r'[^\w!@#$%()_\-\.=+\[\]{}|;~]', filename):
            raise ValueError(f"Invalid characters in file name: {filename}")
    
        logger.info(f"Attachment {filename} is valid.")
    
    
    
    
    def save_xml_file(xml_string, filename):
        """Save XML string to a file."""
        try:
            with open(filename, 'w') as file:
                file.write(xml_string)
            logger.info(f"XML saved to {filename}")
        except Exception as e:
            logger.error(f"Error saving XML to file: {str(e)}")
    
    
    def initiate_submission(token, base_url):
        """Initiate BOIR submission and get process ID."""
        logger.info("Initiating submission...")
        headers = {
            'Authorization': f'Bearer {token}'
        }
    
        response = requests.get(f'{base_url}/processId', headers=headers)
        response.raise_for_status()
    
        process_id = response.json().get('processId')
        logger.info(f"Obtained process ID: {process_id}")
        return process_id
    
    
    def upload_attachment(token, process_id, document_path, base_url):
        try:
            if not check_file_exists(document_path):
                logger.error(f"Attachment file does not exist: {document_path}")
                return 'file_not_found'
    
            validate_attachment(document_path)
    
            headers = {
                "Authorization": f"Bearer {token}",
                "Content-Type": mimetypes.guess_type(document_path)[0]
            }
    
            with open(document_path, 'rb') as file_data:
                files = {
                    'file': (os.path.basename(document_path), file_data, mimetypes.guess_type(document_path)[0])
                }
                url = f"{base_url}/attachments/{process_id}/{os.path.basename(document_path)}"
                response = requests.post(url, headers=headers, files=files)
    
            if response.status_code != 200:
                logger.error(f"Failed to upload file {os.path.basename(document_path)} for process ID {process_id}. Status Code: {response.status_code}, Response: {response.text}")
                return 'upload_failed'
    
            response.raise_for_status()
            return 'upload_success'
        except Exception as e:
            logger.error(f"Exception occurred while uploading attachment {os.path.basename(document_path)} for process ID {process_id}: {str(e)}")
            return 'exception_occurred'
    
    
    def upload_boir_xml(token, process_id, xml_path, base_url):
        try:
            if not check_file_exists(xml_path):
                logger.error(f"BOIR XML file does not exist: {xml_path}")
                return 'file_not_found'
    
            logger.info(f"Uploading BOIR XML for process ID: {process_id}")
            headers = {
                'Authorization': f'Bearer {token}',
                'Content-Type': 'application/xml'
            }
    
            xml_file_name = os.path.basename(xml_path)
            with open(xml_path, 'rb') as xml_data:
                response = requests.post(f'{base_url}/upload/BOIR/{process_id}/{xml_file_name}', headers=headers, data=xml_data)
    
            if response.status_code != 200:
                logger.error(f"Failed to upload BOIR XML {xml_file_name} for process ID {process_id}. Status Code: {response.status_code}, Response: {response.text}")
                return 'upload_failed'
    
            response.raise_for_status()
            response_json = response.json()
    
            if 'submissionStatus' in response_json:
                logger.info(f"Upload response: {response_json}")
                return response_json['submissionStatus']
    
            logger.error(f"Upload response does not contain expected keys: {response_json}")
            return 'unknown_status'
        except Exception as e:
            logger.error(f"Exception occurred while uploading BOIR XML {xml_file_name} for process ID {process_id}: {str(e)}")
            return 'exception_occurred'
    
    
    def track_submission_status(token, process_id, base_url):
        """Track the status of the BOIR submission."""
        logger.info(f"Tracking submission status for process ID: {process_id}")
        headers = {
            'Authorization': f'Bearer {token}'
        }
    
        response = requests.get(f'{base_url}/submissionStatus/{process_id}', headers=headers)
        response.raise_for_status()
    
        return response.json().get('submissionStatus')
    
    
    def log_error(response_data):
        """Log errors based on response data."""
        errors = response_data.get('errors', [])
        for error in errors:
            error_code = error.get('ErrorCode')
            error_text = error.get('ErrorText')
            logger.error(f"Error Code: {error_code}, Error Text: {error_text}")
    
            if error_code in ["SBE01", "SBE02", "SBE03", "SBE04", "SBE05", "SBE06"]:
                logger.error("Recommended Action: " + error.get('ErrorText', '').split('Please')[1].strip())
    
    
    def create_boir_xml(csv_row):
        try:
            root = ET.Element("{www.fincen.gov/base}EFilingSubmissionXML", attrib={
                'xmlns:fc2': 'www.fincen.gov/base',
                'xmlns:xsi': 'http://www.w3.org/2001/XMLSchema-instance',
                'xsi:schemaLocation': 'www.fincen.gov/base BSA_XML_2.0.xsd',
                'SeqNum': '1'
            })
    
            # Submitter Information
            ET.SubElement(root, "{www.fincen.gov/base}SubmitterElectronicAddressText").text = 'EMAIL@EMAIL.com'
            ET.SubElement(root, "{www.fincen.gov/base}SubmitterEntityIndivdualLastName").text = 'LAST NAME'
            ET.SubElement(root, "{www.fincen.gov/base}SubmitterIndivdualFirstName").text = 'FIRST NAME'
    
            activity = ET.SubElement(root, "{www.fincen.gov/base}Activity", attrib={'SeqNum': '2'})
            ET.SubElement(activity, "{www.fincen.gov/base}ApprovalOfficialSignatureDateText").text = str(csv_row["2.date_prepared"])
            ET.SubElement(activity, "{www.fincen.gov/base}FilingDateText")
    
            activity_association = ET.SubElement(activity, "{www.fincen.gov/base}ActivityAssociation", attrib={'SeqNum': '3'})
            ET.SubElement(activity_association, "{www.fincen.gov/base}InitialReportIndicator").text = "Y" if str(csv_row["1.a"]).upper() == "Y" else ""
            ET.SubElement(activity_association, "{www.fincen.gov/base}CorrectsAmendsPriorReportIndicator").text = "Y" if str(csv_row["1.b"]).upper() == "Y" else ""
            ET.SubElement(activity_association, "{www.fincen.gov/base}UpdatesPriorReportIndicator").text = "Y" if str(csv_row["1.c"]).upper() == "Y" else ""
            ET.SubElement(activity_association, "{www.fincen.gov/base}NewlyExemptEntityIndicator").text = "Y" if str(csv_row["1.d"]).upper() == "Y" else ""
    
            # Reporting Company Information
            reporting_company = ET.SubElement(activity, "{www.fincen.gov/base}Party", attrib={'SeqNum': '4'})
            ET.SubElement(reporting_company, "{www.fincen.gov/base}ActivityPartyTypeCode").text = "62"
            ET.SubElement(reporting_company, "{www.fincen.gov/base}ExistingReportingCompanyIndicator").text = "Y" if str(csv_row["16"]).upper() == "Y" else ""
            ET.SubElement(reporting_company, "{www.fincen.gov/base}FormationCountryCodeText").text = str(csv_row["10.a"])
            ET.SubElement(reporting_company, "{www.fincen.gov/base}FormationStateCodeText").text = str(csv_row["10.b"])
            ET.SubElement(reporting_company, "{www.fincen.gov/base}RequestFinCENIDIndicator").text = "Y"
    
            party_name = ET.SubElement(reporting_company, "{www.fincen.gov/base}PartyName", attrib={'SeqNum': '5'})
            ET.SubElement(party_name, "{www.fincen.gov/base}PartyNameTypeCode").text = "L"
            ET.SubElement(party_name, "{www.fincen.gov/base}RawPartyFullName").text = str(csv_row["5"])
    
            address = ET.SubElement(reporting_company, "{www.fincen.gov/base}Address", attrib={'SeqNum': '6'})
            ET.SubElement(address, "{www.fincen.gov/base}RawCityText").text = str(csv_row["12"])
            ET.SubElement(address, "{www.fincen.gov/base}RawCountryCodeText").text = "US"
            ET.SubElement(address, "{www.fincen.gov/base}RawStateCodeText").text = str(csv_row["14"])
            ET.SubElement(address, "{www.fincen.gov/base}RawStreetAddress1Text").text = str(csv_row["11"])
            ET.SubElement(address, "{www.fincen.gov/base}RawZIPCode").text = str(csv_row["15"])
    
            party_identification = ET.SubElement(reporting_company, "{www.fincen.gov/base}PartyIdentification", attrib={'SeqNum': '7'})
            ET.SubElement(party_identification, "{www.fincen.gov/base}PartyIdentificationNumberText").text = str(csv_row["8"])
            ET.SubElement(party_identification, "{www.fincen.gov/base}PartyIdentificationTypeCode").text = str(csv_row["7"])
    
            # Company Applicant Information
            company_applicant = ET.SubElement(activity, "{www.fincen.gov/base}Party", attrib={'SeqNum': '8'})
            ET.SubElement(company_applicant, "{www.fincen.gov/base}ActivityPartyTypeCode").text = "63"
            ET.SubElement(company_applicant, "{www.fincen.gov/base}IndividualBirthDateText").text = str(csv_row["23"])
    
            party_name = ET.SubElement(company_applicant, "{www.fincen.gov/base}PartyName", attrib={'SeqNum': '9'})
            ET.SubElement(party_name, "{www.fincen.gov/base}PartyNameTypeCode").text = "L"
            ET.SubElement(party_name, "{www.fincen.gov/base}RawEntityIndividualLastName").text = str(csv_row["19"])
            ET.SubElement(party_name, "{www.fincen.gov/base}RawIndividualFirstName").text = str(csv_row["20"])
            ET.SubElement(party_name, "{www.fincen.gov/base}RawIndividualMiddleName").text = str(csv_row["21"])
            ET.SubElement(party_name, "{www.fincen.gov/base}RawIndividualNameSuffixText").text = str(csv_row["22"])
    
            address = ET.SubElement(company_applicant, "{www.fincen.gov/base}Address", attrib={'SeqNum': '10'})
            ET.SubElement(address, "{www.fincen.gov/base}RawCityText").text = str(csv_row["26"])
            ET.SubElement(address, "{www.fincen.gov/base}RawCountryCodeText").text = str(csv_row["27"])
            ET.SubElement(address, "{www.fincen.gov/base}RawStateCodeText").text = str(csv_row["28"])
            ET.SubElement(address, "{www.fincen.gov/base}RawStreetAddress1Text").text = str(csv_row["25"])
            ET.SubElement(address, "{www.fincen.gov/base}RawZIPCode").text = str(csv_row["29"])
            if str(csv_row["24"]) == "Business address":
                ET.SubElement(address, "{www.fincen.gov/base}BusinessAddressIndicator").text = "Y"
            else:
                ET.SubElement(address, "{www.fincen.gov/base}ResidentialAddressIndicator").text = "Y"
    
            party_identification = ET.SubElement(company_applicant, "{www.fincen.gov/base}PartyIdentification", attrib={'SeqNum': '11'})
            ET.SubElement(party_identification, "{www.fincen.gov/base}OtherIssuerCountryText").text = str(csv_row["32.a"])
            ET.SubElement(party_identification, "{www.fincen.gov/base}OtherIssuerStateText").text = str(csv_row["32.b"])
            ET.SubElement(party_identification, "{www.fincen.gov/base}PartyIdentificationNumberText").text = str(csv_row["31"])
            ET.SubElement(party_identification, "{www.fincen.gov/base}PartyIdentificationTypeCode").text = str(csv_row["30"])
            ET.SubElement(party_identification, "{www.fincen.gov/base}OriginalAttachmentFileName").text = str(csv_row["33"])
    
            # Beneficial Owner Information
            seq_num = 12
            for i in range(1, 3):
                if f'38.{i}' in csv_row:
                    beneficial_owner = ET.SubElement(activity, "{www.fincen.gov/base}Party", attrib={'SeqNum': str(seq_num)})
                    ET.SubElement(beneficial_owner, "{www.fincen.gov/base}ActivityPartyTypeCode").text = "64"
                    ET.SubElement(beneficial_owner, "{www.fincen.gov/base}IndividualBirthDateText").text = str(csv_row.get(f'42.{i}', ""))
                    if str(csv_row.get(f'37.{i}', "")).upper() == "Y":
                        ET.SubElement(beneficial_owner, "{www.fincen.gov/base}ExemptIndicator").text = "Y"
                        ET.SubElement(beneficial_owner, "{www.fincen.gov/base}PartyName", attrib={'SeqNum': str(seq_num + 1)})
                        ET.SubElement(beneficial_owner, "{www.fincen.gov/base}RawEntityIndividualLastName").text = str(csv_row.get(f'38.{i}', ""))
                    else:
                        party_name = ET.SubElement(beneficial_owner, "{www.fincen.gov/base}PartyName", attrib={'SeqNum': str(seq_num + 1)})
                        ET.SubElement(party_name, "{www.fincen.gov/base}PartyNameTypeCode").text = "L"
                        ET.SubElement(party_name, "{www.fincen.gov/base}RawEntityIndividualLastName").text = str(csv_row.get(f'38.{i}', ""))
                        ET.SubElement(party_name, "{www.fincen.gov/base}RawIndividualFirstName").text = str(csv_row.get(f'39.{i}', ""))
                        ET.SubElement(party_name, "{www.fincen.gov/base}RawIndividualMiddleName").text = str(csv_row.get(f'40.{i}', ""))
                        ET.SubElement(party_name, "{www.fincen.gov/base}RawIndividualNameSuffixText").text = str(csv_row.get(f'41.{i}', ""))
    
                    address = ET.SubElement(beneficial_owner, "{www.fincen.gov/base}Address", attrib={'SeqNum': str(seq_num + 2)})
                    ET.SubElement(address, "{www.fincen.gov/base}RawCityText").text = str(csv_row.get(f'44.{i}', ""))
                    ET.SubElement(address, "{www.fincen.gov/base}RawCountryCodeText").text = str(csv_row.get(f'45.{i}', ""))
                    ET.SubElement(address, "{www.fincen.gov/base}RawStateCodeText").text = str(csv_row.get(f'46.{i}', ""))
                    ET.SubElement(address, "{www.fincen.gov/base}RawStreetAddress1Text").text = str(csv_row.get(f'43.{i}', ""))
                    ET.SubElement(address, "{www.fincen.gov/base}RawZIPCode").text = str(csv_row.get(f'47.{i}', ""))
    
                    party_identification = ET.SubElement(beneficial_owner, "{www.fincen.gov/base}PartyIdentification", attrib={'SeqNum': str(seq_num + 3)})
                    ET.SubElement(party_identification, "{www.fincen.gov/base}OtherIssuerCountryText").text = str(csv_row.get(f'50.{i}.a', ""))
                    ET.SubElement(party_identification, "{www.fincen.gov/base}OtherIssuerStateText").text = str(csv_row.get(f'50.{i}.b', ""))
                    ET.SubElement(party_identification, "{www.fincen.gov/base}IssuerLocalTribalCodeText").text = str(csv_row.get(f'50.{i}.c', ""))
                    ET.SubElement(party_identification, "{www.fincen.gov/base}OtherIssuerLocalTribalText").text = str(csv_row.get(f'50.{i}.d', ""))
                    ET.SubElement(party_identification, "{www.fincen.gov/base}PartyIdentificationNumberText").text = str(csv_row.get(f'49.{i}', ""))
                    ET.SubElement(party_identification, "{www.fincen.gov/base}PartyIdentificationTypeCode").text = str(csv_row.get(f'48.{i}', ""))
                    ET.SubElement(party_identification, "{www.fincen.gov/base}OriginalAttachmentFileName").text = str(csv_row.get(f'51.{i}', ""))
    
                    seq_num += 4
    
            xml_str = prettify_xml(root)
            logger.debug(f"Generated XML: {xml_str}")
    
            if validate_xml(xml_str):
                return xml_str
            else:
                return None
        except Exception as e:
            logger.error(f"Error creating BOIR XML: {str(e)}")
            return None
    def prettify_xml(elem):
        """Return a pretty-printed XML string for the Element."""
        rough_string = ET.tostring(elem, 'utf-8')
        reparsed = minidom.parseString(rough_string)
        return reparsed.toprettyxml(indent="  ")
    
    def validate_xml(xml_str):
        """Validate the structure of the XML string using the provided XSD schema."""
        try:
            schema_bsa = xmlschema.XMLSchema("BSA_XML_2.0.xsd")
            schema_boir = xmlschema.XMLSchema("BOIRSchema.xsd.xml")
    
            schema_bsa.validate(xml_str)
            schema_boir.validate(xml_str)
    
            logger.info("XML structure is valid.")
            return True
        except xmlschema.XMLSchemaException as e:
            logger.error(f"XML schema validation error: {e}")
            return False
        except Exception as e:
            logger.error(f"XML parsing error: {e}")
            return False
    
    
    def check_submission_status(token, process_id, base_url, retry_count=3, retry_delay=5):
        """Check the status of a BOIR submission."""
        headers = {
            'Authorization': f'Bearer {token}'
        }
    
        for attempt in range(retry_count):
            try:
                response = requests.get(f'{base_url}/submissionStatus/{process_id}', headers=headers)
    
                if response.status_code == 200:
                    submission_status = response.json().get('submissionStatus')
                    logger.info(f"Submission status for process ID {process_id}: {submission_status}")
    
                    if submission_status == 'submission_validation_failed':
                        error_details = response.json().get('validationErrors', ['Unknown error'])[0]
                        logger.debug(f'Failed with {response.json()}')
                        log_error(response.json())
                        return submission_status, error_details
    
                    return submission_status, None
                else:
                    logger.error(f"Failed to retrieve submission status for process ID {process_id}. Status code: {response.status_code}")
                    logger.error(f"Response content: {response.text}")
                    if attempt < retry_count - 1:
                        logger.info(f"Retrying in {retry_delay} seconds...")
                        time.sleep(retry_delay)
            except requests.exceptions.RequestException as e:
                logger.error(f"Error occurred while checking submission status for process ID {process_id}: {str(e)}")
                if attempt  0:
            logger.info("Please check the company status CSV file for details on failed submissions.")
    
    
    def monitor_and_update():
        """Continuously monitor and update the registration status."""
        while True:
            verify_submissions()
            time.sleep(3600)  # Wait for 1 hour before checking again
    
    
    if __name__ == "__main__":
        csv_path = './data/boir_data.csv'
        environment = 'sandbox'  # or 'production'
    
        register_organizations(csv_path, environment)
        verify_submissions()
        generate_summary_report()
        monitor_and_update()
    

    Supporting CSV

    This code requires a ./data/boir_data.csv file which contains the following column which correspond to each section on the BOIR form.

    1.a 1.b 1.c 1.d 1.e 1.f 1.g 1.h 2.date_prepared 3 4 5 6 7 8 9 10.a 10.b 10.c 10.d 10.e 10.f 10.g 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32.a 32.b 32.c 32.d 33 35 36 37 38.1 39.1 40.1 41.1 42.1 43.1 44.1 45.1 46.1 47.1 48.1 49.1 50.1.a 50.1.b 50.1.c 50.1.d 51.1 38.2 39.2 40.2 41.2 42.2 43.2 44.2 45.2 46.2 47.2 48.2 49.2 50.2.a 50.2.b 50.2.c 50.2.d 51.2


  • A Deep Dive into Intraday Trading Costs with Python, Alpaca’s API, and SIP Data

    In the realm of intraday trading, managing and minimizing trading costs is not just a practice; it’s a necessity. A strategy that seems profitable on paper can quickly become a losing proposition when real-world costs, particularly the spread between the bid and ask prices, are factored in. Today, I’d like to share a comprehensive analysis tool I developed to focus on this.

    Output

    This is the HTML output plot generated by the program. Below you’ll see some data followed by some important metrics. Note that the title has the spread standard deviation in dollars and percent. As well the actual values are shown in the plots below with their distribution and boxplots.

    The Importance of Spread Analysis

    Bid Price

    The “bid” is the highest price a buyer is willing to pay for a stock. It essentially represents the demand side of the market for a particular stock. When you’re selling a stock, the bid price is the most you can hope to get at that moment. It’s a real-time reflection of what buyers believe the stock is worth, based on their analysis, market conditions, and other factors. The bid price is constantly changing as buyers adjust their willingness to pay in response to market dynamics.

    Ask Price

    Conversely, the “ask” price is the lowest price at which a seller is willing to sell their stock. It represents the supply side of the equation. When you’re looking to buy a stock, the ask price is the lowest you can expect to pay at that moment. Like the bid price, the ask is always in flux, influenced by sellers’ perceptions of the stock’s value, market trends, and various economic indicators.

    The Bid-Ask Spread

    The difference between the bid and ask price is known as the “spread.” The spread can be a critical indicator of a stock’s liquidity and market volatility. A narrow spread typically indicates a highly liquid market with a high volume of transactions and minimal difference between what buyers are willing to pay and what sellers are asking. Conversely, a wider spread suggests lower liquidity, potentially making it more challenging to execute large trades without affecting the market price.

    Now that we’ve explored the bid-ask spread let’s establish why spread analysis is crucial. The spread directly impacts your trading costs. For high-frequency traders, even small variances in this spread can significantly affect overall profitability. My tool is designed to subscribe to Alpaca’s API, fetching real-time quotes and prices alongside their volume. This setup allows us to compute the spread both as a dollar value and as a percentage of the asset’s value, offering a clear view of the trading costs involved.

    The Tool’s Anatomy

    The tool comprises two Python files: alpaca_plots.py and alpaca_functions.py. The former is primarily responsible for the data visualization aspect, while the latter deals with data fetching, processing, and statistics calculation.

    Key Functions and Their Roles

    • Data Subscription and Handling: At the core, my tool subscribes to quote and trade updates via Alpaca’s API, focusing on a list of specified symbols. This is crucial for accessing real-time data, essential for accurate spread analysis.
    • Spread Calculation: Once data is fetched, the tool calculates the spread in both dollar value and percentage. This is done by subtracting the bid price from the ask price for each quote, providing an immediate measure of the trading cost for that specific asset.
    • Statistical Analysis: Beyond mere calculation, the tool also analyzes the distribution of spread values, including their standard deviation. This statistical approach allows traders to understand not just the average costs, but also the variability and risk associated with the spread.
    • Data Visualization: A key feature is its ability to generate insightful visualizations, including boxplots. These plots offer a visual representation of the spread distribution, highlighting the median, quartiles, and any outliers. This visual context is invaluable for traders looking to assess the cost implications of their strategies quickly.

    Practical Application and Insights

    By analyzing the spread in both absolute and relative terms, traders can make informed decisions about which assets to trade and when. For example, a high spread as a percentage of the asset’s value might deter trading in certain assets during specific times, guiding traders towards more cost-effective opportunities.

    In Summary

    This tool is more than just a technical exercise; it’s a practical solution to a problem many traders face daily. By offering a detailed analysis of Alpaca spreads, it empowers traders to make data-driven decisions, ultimately enhancing the profitability of their trading strategies. Whether you’re a seasoned trader or just starting, understanding and applying such tools can significantly impact your trading success.

    Code

    alpaca_functions.py

    import alpaca_trade_api as tradeapi
    import pandas as pd
    import os
    import plotly.graph_objects as go
    from plotly.subplots import make_subplots
    from alpaca_config import api_key, api_secret, base_url
    import logging
    import asyncio
    from pathlib import Path
    from datetime import datetime
    import subprocess
    
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    
    # Global DataFrame for accumulating quote data
    quotes_data = pd.DataFrame(columns=['symbol', 'bid_price', 'ask_price', 'bid_size', 'ask_size'])
    trades_data = pd.DataFrame(columns=['symbol', 'trade_price', 'trade_size'])
    
    def kill_other_instances(exclude_pid):
        try:
            # Get the list of processes matching the script name
            result = subprocess.run(['pgrep', '-f', 'alpaca_functions.py'], stdout=subprocess.PIPE)
            if result.stdout:
                pids = result.stdout.decode('utf-8').strip().split('\n')
                for pid in pids:
                    if pid != exclude_pid:
                        try:
                            # Terminate the process
                            subprocess.run(['kill', pid])
                            logging.warning(f"Terminated process with PID: {pid}")
                        except subprocess.CalledProcessError as e:
                            logging.error(f"Could not terminate process with PID: {pid}. Error: {e}")
            else:
                logging.info("No other instances found.")
        except subprocess.CalledProcessError as e:
            logging.info(f"Error finding processes: {e}")
    async def get_market_hours(api, date_str):
        # Convert date_str to date object
        specific_date = datetime.strptime(date_str, '%Y-%m-%d').date()
        # Format the date as a string in 'YYYY-MM-DD' format
        date_str = specific_date.strftime('%Y-%m-%d')
    
        # Fetch the market calendar for the specific date
        calendar = api.get_calendar(start=date_str, end=date_str)
    
        logging.debug(f'{calendar}')
    
        if calendar:
            market_open = calendar[0].open.strftime('%H:%M')
            market_close = calendar[0].close.strftime('%H:%M')
            logging.info(f"Market hours for {date_str}: {market_open} - {market_close}")
            return market_open, market_close
        else:
            logging.warning(f"No market hours found for {date_str}.")
            return None, None
    
    async def consolidate_parquet_files(quotes_directory, trades_directory):
        async def process_directory(directory):
            for day_dir in Path(directory).iterdir():
                if day_dir.is_dir():
                    symbol_dfs = {}
    
                    parquet_files = list(day_dir.glob("*.parquet"))
                    if not parquet_files:
                        logging.info(f"No Parquet files found in {day_dir}.")
                        continue
    
                    for file in parquet_files:
                        if '_' in file.stem:
                            symbol = file.stem.split('_')[0]
                            df = pd.read_parquet(file)
    
                            if symbol in symbol_dfs:
                                symbol_dfs[symbol] = pd.concat([symbol_dfs[symbol], df])
                            else:
                                symbol_dfs[symbol] = df
    
                    for symbol, df in symbol_dfs.items():
                        consolidated_filename = f"{symbol}.parquet"
                        consolidated_file_path = day_dir / consolidated_filename
    
                        if consolidated_file_path.is_file():
                            consolidated_df = pd.read_parquet(consolidated_file_path)
                            consolidated_df = pd.concat([consolidated_df, df])
                            consolidated_df = consolidated_df[~consolidated_df.index.duplicated(keep='last')]
                            consolidated_df = consolidated_df.sort_index()  # Modified to eliminate the warning
                            consolidated_df.to_parquet(consolidated_file_path, index=True)
                            logging.debug(f"Updated consolidated file: {consolidated_filename}")
                        else:
                            df = df[~df.index.duplicated(keep='last')]
                            df = df.sort_index()  # Modified to eliminate the warning
                            df.to_parquet(consolidated_file_path, index=True)
                            logging.info(f"Consolidated {consolidated_filename}")
    
                    for file in parquet_files:
                        if '_' in file.stem:
                            try:
                                os.remove(file)
                                logging.debug(f"Deleted {file}")
                            except OSError as e:
                                logging.error(f"Error deleting {file}: {e}")
                else:
                    logging.info(f"Date directory {day_dir} not found or is not a directory.")
    
        await asyncio.gather(
            process_directory(quotes_directory),
            process_directory(trades_directory)
        )
    # Function to check symbol properties
    async def check_symbol_properties(api, symbols):
        not_active, not_tradeable, not_shortable = [], [], []
        for symbol in symbols:
            asset = api.get_asset(symbol)  # Removed 'await' as get_asset is not an async function
            if asset.status != 'active':
                not_active.append(symbol)
            if not asset.tradable:
                not_tradeable.append(symbol)
            if not asset.shortable:
                not_shortable.append(symbol)
        return not_active, not_tradeable, not_shortable
    
    def process_quote(quote):
        logging.debug(quote)
    
        timestamp = pd.to_datetime(quote.timestamp, unit='ns').tz_convert('America/New_York')
    
        quote_df = pd.DataFrame({
            'symbol': [quote.symbol],
            'bid_price': [quote.bid_price],
            'ask_price': [quote.ask_price],
            'bid_size': [quote.bid_size],
            'ask_size': [quote.ask_size],
            'timestamp': [timestamp]
        }).set_index('timestamp')
    
        return quote_df
    
    def process_trade(trade):
        logging.debug(trade)
    
        timestamp = pd.to_datetime(trade.timestamp, unit='ns').tz_convert('America/New_York')
    
        trade_df = pd.DataFrame({
            'symbol': [trade.symbol],
            'trade_price': [trade.price],
            'trade_size': [trade.size],
            'timestamp': [timestamp]
        }).set_index('timestamp')
    
        return trade_df
    
    async def periodic_save(interval_seconds=3600, quotes_directory='/home/shared/algos/ml4t/data/alpaca_quotes/', trades_directory='/home/shared/algos/ml4t/data/alpaca_trades/'):
        global quotes_data, trades_data
        while True:
            try:
                logging.info('Running periodic save...')
                current_time = datetime.now()
                date_str = current_time.strftime('%Y-%m-%d')
                hour_str = current_time.strftime('%H-%M-%S')
    
                # Saving quotes data
                if not quotes_data.empty:
                    quotes_day_directory = os.path.join(quotes_directory, date_str)
                    os.makedirs(quotes_day_directory, exist_ok=True)
                    for symbol, group in quotes_data.groupby('symbol'):
                        filepath = os.path.join(quotes_day_directory, f"{symbol}_{date_str}_{hour_str}.parquet")
                        group.to_parquet(filepath, index=True)
                    logging.info(f"Saved all quotes for {date_str} {hour_str} to disk.")
                    quotes_data.drop(quotes_data.index, inplace=True)  # Clearing the DataFrame
                else:
                    logging.warning('quotes_data is empty')
                # Saving trades data
                if not trades_data.empty:
                    trades_day_directory = os.path.join(trades_directory, date_str)
                    os.makedirs(trades_day_directory, exist_ok=True)
                    for symbol, group in trades_data.groupby('symbol'):
                        filepath = os.path.join(trades_day_directory, f"{symbol}_{date_str}_{hour_str}.parquet")
                        group.to_parquet(filepath, index=True)
                    logging.info(f"Saved all trades for {date_str} {hour_str} to disk.")
                    trades_data.drop(trades_data.index, inplace=True)  # Clearing the DataFrame
                else:
                    logging.warning('trades_data is empty')
    
                await asyncio.sleep(interval_seconds)
    
            except Exception as e:
                    logging.error(f'Error in periodic_save: {e}')  # Properly logging the exception message
    
    
    
    async def run_alpaca_monitor(symbols, remove_not_shortable=False):
        # Initialize the Alpaca API
        api = tradeapi.REST(api_key, api_secret, base_url, api_version='v2')
    
        total_symbols = len(symbols)
    
        not_active, not_tradeable, not_shortable = await check_symbol_properties(api, symbols)
        # Calculate and log percentages...
        if remove_not_shortable:
            symbols = [symbol for symbol in symbols if symbol not in not_active + not_tradeable + not_shortable]
        else:
            symbols = [symbol for symbol in symbols if symbol not in not_active + not_tradeable]
    
        logging.info(f'Monitoring the following symbols: {symbols}')
    
        # Calculate and log percentages
        percent_not_active = (len(not_active) / total_symbols) * 100
        percent_not_tradeable = (len(not_tradeable) / total_symbols) * 100
        percent_not_shortable = (len(not_shortable) / total_symbols) * 100
    
        logging.info(f"Percentage of symbols not active: {percent_not_active:.2f}%")
        logging.info(f"Percentage of symbols not tradeable: {percent_not_tradeable:.2f}%")
        logging.info(f"Percentage of symbols not shortable: {percent_not_shortable:.2f}%")
    
        # Remove symbols that are not active, tradeable, or shortable
        symbols = [symbol for symbol in symbols if symbol not in not_active + not_tradeable + not_shortable]
    
        logging.info(f'Monitoring the following symbols: {symbols}')
    
        stream = tradeapi.stream.Stream(api_key, api_secret, base_url, data_feed='sip')
    
        async def handle_quote(q):
            global quotes_data
            new_quote = process_quote(q)
            quotes_data = pd.concat([quotes_data, new_quote], ignore_index=False)
            logging.debug(f'quotes \n {quotes_data.tail()}')
    
        async def handle_trade(t):
            global trades_data
            new_trade = process_trade(t)
            trades_data = pd.concat([trades_data, new_trade], ignore_index=False)
            logging.debug(f'trades \n {trades_data.tail()}')
    
        async def consolidate_periodically(interval, quotes_directory, trades_directory):
            while True:
                try:
                    await consolidate_parquet_files(quotes_directory, trades_directory)
                except Exception as e:
                    logging.error(f"Error consolidating parquet files: {e}")
                    # Handle the error as needed, for example, break the loop, or continue
                await asyncio.sleep(interval)
    
        save_quotes_task = asyncio.create_task(periodic_save(180, '/home/shared/algos/ml4t/data/alpaca_quotes/', '/home/shared/algos/ml4t/data/alpaca_trades/'))
        consolidate_task = asyncio.create_task(consolidate_periodically(180, '/home/shared/algos/ml4t/data/alpaca_quotes', '/home/shared/algos/ml4t/data/alpaca_trades'))
    
    
        try:
            # Subscribe to the streams
            for symbol in symbols:
                stream.subscribe_quotes(handle_quote, symbol)
                stream.subscribe_trades(handle_trade, symbol)
    
            await stream._run_forever()
    
        except ValueError as e:
            if "auth failed" in str(e) or "connection limit exceeded" in str(e):
                # Log the specific error message without re-raising the exception to avoid showing traceback
                logging.error(f"WebSocket authentication error: {e}")
            else:
                # For other ValueErrors, log them and optionally re-raise if you want to show the traceback
                logging.error(f"Error with WebSocket connection: {e}")
    
    
    if __name__ == "__main__":
        current_pid = str(os.getpid())
        kill_other_instances(current_pid)
    
        csv_file = '/home/shared/algos/ml4t/data/selected_pairs_with_values.csv'
        df = pd.read_csv(csv_file)
        symbols = list(set(df['s1'].tolist() + df['s2'].tolist()))
        symbols = [symbol.replace('-', '.') for symbol in symbols]
    
        quotes_dir='/home/shared/algos/ml4t/data/alpaca_quotes'
        trades_dir='/home/shared/algos/ml4t/data/alpaca_trades'
        asyncio.run(run_alpaca_monitor(symbols))
    
    
    

    alpaca_plots.py

    import asyncio
    import logging
    from datetime import datetime
    import pandas as pd
    from pathlib import Path
    import plotly.graph_objects as go
    from plotly.subplots import make_subplots
    import alpaca_trade_api as tradeapi
    from alpaca_config import api_key, api_secret, base_url
    
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    
    def get_market_hours(api, date_str):
        # Convert date_str to date object
        specific_date = datetime.strptime(date_str, '%Y-%m-%d').date()
        # Format the date as a string in 'YYYY-MM-DD' format
        date_str = specific_date.strftime('%Y-%m-%d')
    
        # Fetch the market calendar for the specific date
        calendar = api.get_calendar(start=date_str, end=date_str)
    
        logging.debug(f'{calendar}')
    
        if calendar:
            market_open = calendar[0].open.strftime('%H:%M')
            market_close = calendar[0].close.strftime('%H:%M')
            logging.info(f"Market hours for {date_str}: {market_open} - {market_close}")
            return market_open, market_close
        else:
            logging.warning(f"No market hours found for {date_str}.")
            return None, None
    
    
    def load_and_plot_data(quotes_directory, trades_directory, symbols, api):
        logging.info(f'Running load_and_plot_data')
        today = datetime.now().strftime('%Y-%m-%d')
        #override today for testing
        # today = '2024-04-05'
    
        try:
            # Use today to get market hours
            market_open, market_close = get_market_hours(api, today)
            # Check if date directories exist
            quotes_date_dir = Path(quotes_directory) / today
            trades_date_dir = Path(trades_directory) / today
    
            if not quotes_date_dir.exists():
                logging.error(f"Quotes directory for date {today} not found: {quotes_date_dir}")
                return
            if not trades_date_dir.exists():
                logging.error(f"Trades directory for date {today} not found: {trades_date_dir}")
                return
    
            for symbol in symbols:
                # Construct file paths
                quotes_file_path = quotes_date_dir / f"{symbol}.parquet"
                trades_file_path = trades_date_dir / f"{symbol}.parquet"
    
                # Load the data
                if quotes_file_path.exists() and trades_file_path.exists():
                    symbol_quotes = pd.read_parquet(quotes_file_path)
                    symbol_trades = pd.read_parquet(trades_file_path)
    
                    logging.debug(f"Loaded {symbol_quotes.shape[0]} quotes and {symbol_trades.shape[0]} trades for {symbol} on {today}.")
                    # Filter symbol_quotes and symbol_trades to market hours
                    market_open_time = datetime.strptime(market_open, '%H:%M').time()
                    market_close_time = datetime.strptime(market_close, '%H:%M').time()
                    symbol_quotes = symbol_quotes.between_time(market_open_time, market_close_time)
                    symbol_trades = symbol_trades.between_time(market_open_time, market_close_time)
                    # Call plot_statistics with filtered data
                    plot_statistics(symbol_quotes, symbol_trades, symbol, market_open, market_close)
                else:
                    missing_files = []
                    if not quotes_file_path.exists():
                        missing_files.append(f"quotes file for {symbol} and path {quotes_file_path}")
                    if not trades_file_path.exists():
                        missing_files.append(f"trades file for {symbol} and path {trades_file_path}")
                    logging.warning(f"Missing {', and '.join(missing_files)} on {today}.")
            logging.info(f'Finished loading and plotting data')
        except Exception as e:
            logging.error(f"Error loading and plotting data for {today}: {e}")
            return
    
    def plot_statistics(symbol_quotes, symbol_trades, symbol, market_open, market_close):
        logging.info(f'Running plot_statistics for {symbol}')
    
        if not symbol_quotes.empty and not symbol_trades.empty:
            # Calculate 'spread' and 'spread_percentage' directly on symbol_quotes
            symbol_quotes['spread'] = symbol_quotes['ask_price'] - symbol_quotes['bid_price']
            symbol_quotes['spread_percentage'] = (symbol_quotes['spread'] / symbol_quotes['bid_price']) * 100
    
            # Calculate standard deviation of spread and spread_percentage
            spread_std = symbol_quotes['spread'].std()
            spread_percentage_std = symbol_quotes['spread_percentage'].std()
    
            # Make ask_size negative
            symbol_quotes['negative_ask_size'] = -symbol_quotes['ask_size']
    
            logging.info(f"Spread Standard Deviation for {symbol}: ${spread_std:.4f} ({spread_percentage_std:.4f}%)")
    
            # Prepare the figure with subplots
            fig = make_subplots(rows=7, cols=2,
                                subplot_titles=("Bid and Ask Prices with Trades", "Bid Size & Ask Size", "Trade Size",
                                                "Spread ($)", "Spread (%)",
                                                "Spread Distribution ($)", "Spread Distribution (%)",
                                                "Spread Boxplot ($)", "Spread Boxplot (%)"),
                                specs=[[{"colspan": 2}, None], [{"colspan": 2}, None], [{"colspan": 2}, None],
                                       [{}, {}], [{}, {}], [{"rowspan": 2}, {"rowspan": 2}], [{}, {}]],
                                shared_xaxes=True, vertical_spacing=0.05)
    
            # Bid and Ask Prices with Trades
            fig.add_trace(go.Scatter(x=symbol_quotes.index, y=symbol_quotes['bid_price'], mode='lines',
                                     name='Bid Price', line=dict(color='green')),
                          row=1, col=1)
            fig.add_trace(go.Scatter(x=symbol_quotes.index, y=symbol_quotes['ask_price'], mode='lines',
                                     name='Ask Price', line=dict(color='red')),
                          row=1, col=1)
            fig.add_trace(go.Scatter(x=symbol_trades.index, y=symbol_trades['trade_price'], mode='markers',
                                     name='Trade Price', marker=dict(color='black', size=4)),
                          row=1, col=1)
    
            # # Bid Size & Ask Size as line charts with colors
            # fig.add_trace(go.Scatter(x=symbol_quotes.index, y=symbol_quotes['bid_size'], mode='lines',
            #                          name='Bid Size', line=dict(color='red')),
            #               row=2, col=1)
            # fig.add_trace(go.Scatter(x=symbol_quotes.index, y=symbol_quotes['ask_size'], mode='lines',
            #                          name='Ask Size', line=dict(color='green')),
            #               row=2, col=1)
    
    
            # Bid Size & Ask Size as line charts with colors, making ask_size negative
            fig.add_trace(go.Scatter(x=symbol_quotes.index, y=symbol_quotes['bid_size'], mode='lines',
                                     name='Bid Size', line=dict(color='green')),
                          row=2, col=1)
            fig.add_trace(go.Scatter(x=symbol_quotes.index, y=symbol_quotes['negative_ask_size'], mode='lines',
                                     name='Ask Size', line=dict(color='red')),
                          row=2, col=1)
    
    
            # Trade Size as a line chart with color
            fig.add_trace(go.Scatter(x=symbol_trades.index, y=symbol_trades['trade_size'], mode='lines',
                                     name='Trade Size', line=dict(color='black')),
                          row=3, col=1)
    
            # Spread ($)
            fig.add_trace(go.Scatter(x=symbol_quotes.index, y=symbol_quotes['spread'], mode='lines', name='Spread ($)'), row=4, col=1)
            # Spread (%)
            fig.add_trace(go.Scatter(x=symbol_quotes.index, y=symbol_quotes['spread_percentage'], mode='lines', name='Spread (%)'), row=4, col=2)
    
            # Spread Distribution ($)
            fig.add_trace(go.Histogram(x=symbol_quotes['spread'], name='Spread Distribution ($)'), row=5, col=1)
            # Spread Distribution (%)
            fig.add_trace(go.Histogram(x=symbol_quotes['spread_percentage'], name='Spread Distribution (%)'), row=5, col=2)
    
            # Spread Boxplot ($)
            fig.add_trace(go.Box(y=symbol_quotes['spread'], name='Spread Boxplot ($)'), row=6, col=1)
            # Spread Boxplot (%)
            fig.add_trace(go.Box(y=symbol_quotes['spread_percentage'], name='Spread Boxplot (%)'), row=6, col=2)
    
    
            title = (
                f"Statistics for {symbol} from {market_open} to {market_close}
    " f"Spread Std ($): {spread_std:.4f}, " f"Spread Std (%): {spread_percentage_std:.4f}%" ) # Adjust layout if needed, e.g., to update margins, titles, or axis labels fig.update_layout(height=1400, title_text=f"Statistics for {symbol} on {market_open} to {market_close}") fig.update_layout(height=1400, title_text=title) # Directory check and save plot plots_directory = Path("./plots/alpaca_quotes/") plots_directory.mkdir(parents=True, exist_ok=True) plot_filename = plots_directory / f"{symbol}_quote_trade_statistics.html" fig.write_html(str(plot_filename)) logging.info(f"Plot for {symbol} saved to {plot_filename}") else: logging.warning(f'Cannot plot data for {symbol} as dataframes are empty') def main(): api = tradeapi.REST(api_key, api_secret, base_url, api_version='v2') csv_file = '/home/shared/algos/ml4t/data/selected_pairs_with_values.csv' df = pd.read_csv(csv_file) symbols = list(set(df['s1'].tolist() + df['s2'].tolist())) symbols = [symbol.replace('-', '.') for symbol in symbols] quotes_dir = '/home/shared/algos/ml4t/data/alpaca_quotes' trades_dir = '/home/shared/algos/ml4t/data/alpaca_trades' load_and_plot_data(quotes_dir, trades_dir, symbols, api) if __name__ == "__main__": main()