Automating 1099 statements from TreasuryDirect.gov

Ok, so Ibonds had a huge rate of return last year. Perhaps you’re like me and you created.. a few hundred accounts. Well, now you have a major problem. You need to get your 1099 statement from each account. With the TreasuryDirect.gov OTP (one-time password) process this can be very time-consuming. Below is my treasury direct Python script. It is now modified to automatically download all of your 1099 documents.

You can see below this script will generate PDFs of all of your 1099 files.

This script requires an input file that contains a column labeled ‘Treasury Direct’ Every value under this column should be your Treasury Direct account numbers. You can read more about all the intricacies of this code from my previous post, automating multiple treasurydirect.gov accounts.

#main.py

from treasury_direct import process_account, close_chrome
import pandas as pd


def get_account_data(force_update_all=True):
    for index, row in df.iterrows():
        account_number = row['Treasury Direct']
        print(f'Working on account number {account_number}')

        # Skip empty or NaN account numbers
        if pd.isna(account_number) or account_number == '':
            print('Skipping empty account number.')
            continue

        # Check if the row is already complete
        if force_update_all == False:
            if not pd.isna(row['LLC Name']) and not pd.isna(row['Original Purchase Amount']) \
                    and not pd.isna(row['Current Value']) and not pd.isna(row['Issue Date']) \
                    and not pd.isna(row['Interest Rate']):
                print(f'Skipping account number {row["Treasury Direct"]} as it is already complete.')
                continue
            success = process_account(account_number, df, index, url, force_update_all=False, get_bonds=False, get_bank_info=False, redeem=False, save_1099=True)
        elif force_update_all == True:
            success = process_account(account_number, df, index, url, force_update_all=False, get_bonds=False, get_bank_info=False, redeem=False, save_1099=True)

if __name__ == '__main__':

    df = pd.read_csv('accounts.csv')
    url = "https://www.treasurydirect.gov/RS/UN-Display.do"

    close_chrome()
    get_account_data(force_update_all=True)





#treasury_direct.py

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import NoSuchElementException, TimeoutException
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.action_chains import ActionChains

import pyautogui
import time
import pandas as pd
import base64
import os

from gmail import get_otp, move_otp_emails_to_trash
import psutil
from config import password

def close_chrome():
    for process in psutil.process_iter(["name", "exe"]):
        try:
            if process.info["name"] == "chrome.exe" or (process.info["exe"] and "chrome" in process.info["exe"]):
                process.terminate()
        except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
            pass

# Close any running Chrome instances
def process_account(account_number, df, index, url, force_update_all=False, get_bonds=True, get_bank_info=False, redeem=False, save_1099=False):
    chrome_options = Options()
    # chrome_options.add_argument("user-data-dir=/home/jeremy/.config/google-chrome")
    # chrome_options.add_argument("--headless")
    chrome_options.add_argument("--disable-gpu")
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument("--start-maximized")

    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service, options=chrome_options)
    driver.get(url)

    wait = WebDriverWait(driver, 10)

    username_input = driver.find_element(By.NAME, "username")
    username_input.send_keys(account_number)


    submit_button = driver.find_element(By.NAME, "submit")
    submit_button.click()



    # Get the page source to check which text is present
    page_source = driver.page_source

    if 'OTP' in page_source:
        otp_skipped = False
    else:
        otp_skipped = True

    if not otp_skipped:
        start_time = time.time()
        otp_received = False

        while not otp_received:
            elapsed_time = time.time() - start_time

            # Exit the loop and move to the next account if the timer exceeds 10 minutes
            if elapsed_time > 5 * 60:
                print("Timed out waiting for OTP after 10 minutes. Moving to the next account.")
                driver.close()
                move_otp_emails_to_trash()
                return  # Continue with the next iteration of the loop (if inside a loop)

            otp = get_otp()

            if otp is not None:
                otp_received = True
            else:
                # Request a new OTP if the timer exceeds 5 minutes
                if elapsed_time > 3 * 60:
                    try:
                        resend_link = driver.find_element(By.XPATH, '//a[contains(@href, "/RS/OTP-New.do")]')
                        resend_link.click()
                    except:
                        print('Unable to click resend OTP link')
                        move_otp_emails_to_trash()
                        return

                # Sleep for 10 seconds before trying again
                time.sleep(10)

        otp_input = driver.find_element(By.NAME, "otp")

        otp_input.send_keys(otp)

        #old checkbox no longer exists
        # # Check the checkbox
        # try:
        #     checkbox = driver.find_element(By.NAME, "registerM2M")
        #     checkbox.click()
        # except:
        #     print('Unable to click the checkbox')

        otp_submit_button = driver.find_element(By.XPATH,
                                                '//input[@class="action" and @type="submit" and @name="enter.x"]')
        otp_submit_button.click()

    # Pause the script for 5 seconds
    time.sleep(3)

    try:
        password_field = driver.find_element(By.NAME, "password")
        password_field.send_keys(password)
    except:
        print('Unable to enter password')
        return

    try:
        submit_button = driver.find_element(By.NAME, "enter.x")
        submit_button.click()
    except:
        return

    # Get the page source to check which text is present
    page_source = driver.page_source

    if 'Contact Info Verification' in page_source:
        civ_skipped = False
    else:
        civ_skipped = True

    if not civ_skipped:
        verify_button = driver.find_element(By.XPATH, '//input[@type="submit" and @value="Verify"]')
        verify_button.click()

    time.sleep(3)

    if get_bonds:
        try:
            link = wait.until(EC.presence_of_element_located((By.LINK_TEXT, "SAVINGS BONDS")))
            link.click()
        except (NoSuchElementException, TimeoutException):
            print("Unable to locate the 'SAVINGS BONDS' link within the specified timeout. This account probably has no savings bonds.")


        time.sleep(3)

        # Find all radio buttons
        radio_buttons = WebDriverWait(driver, 10).until(
            EC.presence_of_all_elements_located((By.XPATH, '//input[@type="radio"]'))
        )

        print(f"Found {len(radio_buttons)} radio buttons")  # Debugging statement

        # Find the first radio button that is not disabled
        radio_button_to_select = None
        for radio_button in radio_buttons:
            is_disabled = radio_button.get_attribute("disabled")
            print(f"Radio button disabled attribute: {is_disabled}")  # Debugging statement
            if not is_disabled:
                radio_button_to_select = radio_button
                break

        if radio_button_to_select:
            print("Found an enabled radio button.")
            try:
                radio_button_to_select.click()
                print("Click successful.")
            except Exception as e:
                print(f"Error clicking the radio button: {e}")
        else:
            print("No enabled radio button found.")

        # Click the submit button
        submit_button = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, '//input[@type="submit" and @value="Submit"]'))
        )
        try:
            submit_button.click()
        except Exception as e:
            print(f"Error clicking the submit button: {e}")
        try:
            # Locate the elements containing the desired information
            llc_and_account_number = driver.find_element(By.XPATH, '//div[@id="accountnumber"]').text

            original_purchase_amount = driver.find_element(By.XPATH,
                                                           '//p[contains(text(), "Series I current holdings total amount")]/span').text
            current_value = driver.find_element(By.XPATH,
                                                '//p[contains(text(), "Series I current holdings current value")]/span').text
            issue_date = driver.find_element(By.XPATH, '(//tr[contains(@class, "altrow")]/td)[3]').text
            interest_rate = driver.find_element(By.XPATH, '//td[contains(text(), "%")]').text

            try:
                status = driver.find_element(By.XPATH, '//td[contains(text(), "Pending Redemption")]').text
                if status:
                    print(f"Status: {status}")
                    df.loc[index, 'Status'] = status
                else:
                    print("Status is blank. Moving to the next field.")
                    df.loc[index, 'Status'] = "N/A"  # Or whatever value you wish to use for blank fields
            except NoSuchElementException:
                print("Element not found. Moving to the next field.")
                df.loc[index, 'Status'] = "N/A"  # Or whatever value you wish to use for missing fields
            # Separate the LLC name and account number
            llc_name, account_number = llc_and_account_number.split(':', 1)
            llc_name = llc_name.strip().replace("LLC Name: ", "")
            account_number = account_number.strip()

            # Print the extracted information
            print(f"LLC Name: {llc_name}")
            print(f"Account Number: {account_number}")
            print(f"Original Purchase Amount: {original_purchase_amount}")
            print(f"Current Value: {current_value}")
            print(f"Issue Date: {issue_date}")
            print(f"Interest Rate: {interest_rate}")

            # Save the extracted information as new columns for the current row
            df.loc[index, 'LLC Name'] = llc_name
            df.loc[index, 'Original Purchase Amount'] = original_purchase_amount
            df.loc[index, 'Current Value'] = current_value
            df.loc[index, 'Issue Date'] = issue_date
            df.loc[index, 'Interest Rate'] = interest_rate
        except NoSuchElementException:
            print(f"Failed to extract ibond information for account {account_number}. Moving to the next account.")


    if redeem:
        print('Redeem is turned on')
        radio_buttons = WebDriverWait(driver, 10).until(
            EC.presence_of_all_elements_located((By.XPATH, '//input[@type="radio"]'))
        )

        # Find the first radio button that is not disabled
        radio_button_to_select = None
        for radio_button in radio_buttons:
            if not radio_button.get_attribute("disabled"):
                radio_button_to_select = radio_button
                break

        # Select the radio button
        if radio_button_to_select:
            radio_button_to_select.click()
        else:
            print("No enabled radio button found")


        # Click the submit button
        select_button = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, '//input[@type="submit" and @value="Select"]'))
        )
        select_button.click()
        time.sleep(1)

        redeem_button = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, '//input[@type="submit" and @value="Redeem"]'))
        )
        redeem_button.click()
        time.sleep(1)


        review_button = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, '//input[@type="submit" and @value="Review"]'))
        )
        review_button.click()
        time.sleep(1)



        submit_button = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, '//input[@type="submit" and @value="Submit"]'))
        )
        submit_button.click()
        time.sleep(1)

    if get_bank_info:
        # Get bank account information
        # Click the "ManageDirect" link
        wait = WebDriverWait(driver, 10)
        manage_direct_link = wait.until(
            EC.presence_of_element_located((By.XPATH, '//a[contains(@href, "md.DisplayManageDirect")]')))

        manage_direct_link.click()

        time.sleep(3)

        # # Click the "Update my account information" link
        # update_account_info_link = driver.find_element_by_xpath('//a[contains(@href, "ai.DisplayEditAccountInfo")]')
        # update_account_info_link.click()

        # Click the "Update my Bank Information" link
        update_bank_info_link = driver.find_element(By.XPATH, '//a[contains(@href, "bank.DisplayBankInfo")]')
        update_bank_info_link.click()

        # # Check if the words "Security Question" exist on the page
        # security_question_elements = driver.find_elements_by_xpath('//h1/strong[contains(text(), "Security Question")]')
        #
        # if len(security_question_elements) > 0:
        #     # Extract the question
        #     question = driver.find_element_by_xpath('//h3').text
        #
        #     # If the question contains the word "pet's", fill the input element with 'mona'
        #     if "pet's" in question.lower():
        #         answer_input = driver.find_element_by_xpath('//input[@type="password" and @name="securityQuestionAnswer"]')
        #         answer_input.send_keys(pets_name)

        time.sleep(3)

        try:
            bank_name = driver.find_element(By.XPATH, '//tr[@class="altrow1"][1]/td[3]/strong').text
            routing_number = driver.find_element(By.XPATH, '//tr[@class="altrow1"][2]/td[3]/strong').text
            account_number = driver.find_element(By.XPATH, '//tr[@class="altrow1"][3]/td[3]/strong').text
            names_on_account = driver.find_element(By.XPATH, '//tr[@class="altrow1"][4]/td[3]/strong').text
            account_type = driver.find_element(By.XPATH, '//tr[@class="altrow1"][5]/td[3]/strong').text
            return_code = driver.find_element(By.XPATH, '//tr[@class="altrow1"][6]/td[3]/strong').text

            # Print the extracted information
            print("Bank Name:", bank_name)
            print("Routing Number:", routing_number)
            print("Account Number:", account_number)
            print("Name(s) on Account:", names_on_account)
            print("Account Type:", account_type)
            print("Return Code:", return_code)

            # Save the extracted information as new columns for the current row
            df.loc[index, 'Bank Name'] = bank_name
            df.loc[index, 'Routing Number'] = routing_number
            df.loc[index, 'Account Number'] = account_number
            df.loc[index, 'Name(s) on Account'] = names_on_account
            df.loc[index, 'Account Type'] = account_type
            df.loc[index, 'Return Code'] = return_code

            # print('Press enter to continue')
            # input()
        except NoSuchElementException:
            print(f"Failed to extract information for account {account_number}. Moving to the next account.")




    df.to_csv('accounts.csv', index=False)
    #
    # #code to redeem
    # # Click on Current Holdings
    # try:
    #     body_element = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, 'currentholdings')))
    #     body_element.click()
    # except TimeoutException:
    #     print("Couldn't find Current Holdings element.")
    #
    # # Click on the radio button for Series I Savings Bond
    # try:
    #     series_i_radio = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.XPATH, '//input[@type="radio" and @name="seriesCode" and @value="9122270681520925360"]')))
    #     series_i_radio.click()
    # except TimeoutException:
    #     print("Couldn't find Series I Savings Bond radio button.")
    #
    # # Click on the Submit button
    # try:
    #     submit_button = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.XPATH, '//input[@class="action" and @type="submit" and @name="572180930158889311"]')))
    #     submit_button.click()
    # except TimeoutException:
    #     print("Couldn't find first Submit button.")
    #
    # # Click on the next radio button
    # try:
    #     next_radio_button = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.XPATH, '//input[@type="radio" and @name="security_parms" and @value="7956006363914591110"]')))
    #     next_radio_button.click()
    # except TimeoutException:
    #     print("Couldn't find next radio button.")
    #
    # # Click on the Select button
    # try:
    #     select_button = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.XPATH, '//input[@class="action" and @type="submit" and @name="8745557767672189629"]')))
    #     select_button.click()
    # except TimeoutException:
    #     print("Couldn't find Select button.")
    #
    # # Click on the Redeem radio button
    # try:
    #     redeem_button = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.XPATH, '//input[@class="action" and @type="submit" and @name="2263125525558940209"]')))
    #     redeem_button.click()
    # except TimeoutException:
    #     print("Couldn't find Redeem button.")
    #
    # # Click on the Review radio button
    # try:
    #     review_button = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.XPATH, '//input[@class="action" and @type="submit" and @name="5163266466587291065"]')))
    #     review_button.click()
    # except TimeoutException:
    #     print("Couldn't find Review button.")

    if save_1099:
        directory = './1099'

        # Loop through a range of years, e.g., 2020 to 2024
        if not os.path.exists(directory):
            os.makedirs(directory)

        for year in range(2023, 2025):
            try:
                manage_direct_link = wait.until(
                    EC.presence_of_element_located((By.XPATH, '//a[contains(@href, "md.DisplayManageDirect")]')))
                manage_direct_link.click()
                time.sleep(2)

                # year_link_text = f"Year {year}"
                # year_link = WebDriverWait(driver, 5).until(EC.element_to_be_clickable((By.LINK_TEXT, year_link_text)))
                # year_link.click()
                # time.sleep(2)

                year_link = WebDriverWait(driver, 5).until(
                    EC.element_to_be_clickable((By.XPATH, f"//a[contains(text(), '{year}')]")))
                year_link.click()
                time.sleep(2)

                #you can click on the actual 1099 link but it won't exist in some instances(that tax year has to be in reportable period or later)
                # # Find and Click on the "View your 1099 for tax year" Link
                # view_1099_link_text = f"View your 1099 for tax year {year}"
                # view_1099_link = WebDriverWait(driver, 10).until(
                #     EC.presence_of_element_located((By.LINK_TEXT, view_1099_link_text)))
                # view_1099_link.click()
                # time.sleep(3)

                # Print the Page to PDF
                pdf_filename = f'{account_number}_{year}_1099.pdf'
                pdf_path = os.path.join(directory, pdf_filename)  # Path to save PDF in the '1099' subdirectory
                pdf_options = {
                    'printBackground': True,
                    'pageRanges': '1',
                    'paperWidth': 8.27,  # A4 paper size
                    'paperHeight': 11.69,  # A4 paper size
                    'path': pdf_path  # Save PDF with formatted filename in the '1099' subdirectory
                }

                result = driver.execute_cdp_cmd("Page.printToPDF", pdf_options)
                with open(pdf_path, "wb") as file:
                    file.write(base64.b64decode(result['data']))

            except (NoSuchElementException, TimeoutException):
                print(f"Unable to locate or process a link for the year {year}.")
                continue
    driver.close()
    move_otp_emails_to_trash()
    #Slow the program down as I believe the server is rate limiting.
    # time.sleep(60)

#gmail.py

import os
import pickle
import base64
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google.auth.transport.requests import Request

# If modifying these SCOPES, delete the file token.pickle.
SCOPES = ['https://www.googleapis.com/auth/gmail.modify']


def get_credentials():
    creds = None
    # The file token.pickle stores the user's access and refresh tokens, and is
    # created automatically when the authorization flow completes for the first time.
    if os.path.exists('token.pickle'):
        print("Loading credentials from pickle file.")
        with open('token.pickle', 'rb') as token:
            creds = pickle.load(token)
    else:
        print("No pickle file found.")

    # If there are no (valid) credentials available, let the user log in.
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            print("Credentials expired. Refreshing...")
            try:
                creds.refresh(Request())
                print("Credentials refreshed successfully.")
            except Exception as e:
                print(f"Could not refresh the token: {e}")
                # Remove the existing token.pickle file to avoid reusing it
                if os.path.exists('token.pickle'):
                    os.remove('token.pickle')
                print("Removed expired token.pickle file.")
                # Trigger the OAuth2 flow
                flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
                creds = flow.run_local_server(port=0)
        else:
            print("No valid credentials. Running authorization flow.")
            flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)

        # Save the credentials for the next run
        with open('token.pickle', 'wb') as token:
            print("Saving credentials to pickle file.")
            pickle.dump(creds, token)
    else:
        print("Credentials are valid.")
    return creds

def get_otp():
    try:
        creds = get_credentials()
        service = build('gmail', 'v1', credentials=creds)
        results = service.users().messages().list(userId='me',
                                                  q='from:Treasury.Direct@fiscal.treasury.gov subject:"One Time Passcode" is:unread newer_than:1m').execute()
        messages = results.get('messages', [])
        if not messages:
            print('No messages found.')
            return None
        else:
            # Get the first unread email
            message = messages[0]
            msg = service.users().messages().get(userId='me', id=message['id'], format='full').execute()
            msg_str = base64.urlsafe_b64decode(msg['payload']['body']['data']).decode()
            otp = msg_str.splitlines()[6].split()[0]
            if otp:
                one_time_passcode = otp
                print(f"{one_time_passcode}")
                return one_time_passcode
            else:
                print("No One Time Passcode found in the email.")
                return None
    except HttpError as error:
        print(f'An error occurred: {error}')
        return None

def move_otp_emails_to_trash():
    try:
        creds = get_credentials()
        service = build('gmail', 'v1', credentials=creds)
        results = service.users().messages().list(userId='me',
                                                  q='from:Treasury.Direct@fiscal.treasury.gov subject:"One Time Passcode"').execute()
        messages = results.get('messages', [])
        if not messages:
            print('No messages found.')
        else:
            for message in messages:
                service.users().messages().trash(userId='me', id=message['id']).execute()
                print(f"Moved message with ID {message['id']} to trash.")
    except HttpError as error:
        print(f'An error occurred: {error}')

move_otp_emails_to_trash()
#
#
# if __name__ == '__main__':
#     get_emails()

#config.py

password = 'password'

Leave a Reply