Ok, so Ibonds had a huge rate of return last year. Perhaps you’re like me and you created.. a few hundred accounts. Well, now you have a major problem. You need to get your 1099 statement from each account. With the TreasuryDirect.gov OTP (one-time password) process this can be very time-consuming. Below is my treasury direct Python script. It is now modified to automatically download all of your 1099 documents.
You can see below this script will generate PDFs of all of your 1099 files.
This script requires an input file that contains a column labeled ‘Treasury Direct’ Every value under this column should be your Treasury Direct account numbers. You can read more about all the intricacies of this code from my previous post, automating multiple treasurydirect.gov accounts.
#main.py
from treasury_direct import process_account, close_chrome
import pandas as pd
def get_account_data(force_update_all=True):
for index, row in df.iterrows():
account_number = row['Treasury Direct']
print(f'Working on account number {account_number}')
# Skip empty or NaN account numbers
if pd.isna(account_number) or account_number == '':
print('Skipping empty account number.')
continue
# Check if the row is already complete
if force_update_all == False:
if not pd.isna(row['LLC Name']) and not pd.isna(row['Original Purchase Amount']) \
and not pd.isna(row['Current Value']) and not pd.isna(row['Issue Date']) \
and not pd.isna(row['Interest Rate']):
print(f'Skipping account number {row["Treasury Direct"]} as it is already complete.')
continue
success = process_account(account_number, df, index, url, force_update_all=False, get_bonds=False, get_bank_info=False, redeem=False, save_1099=True)
elif force_update_all == True:
success = process_account(account_number, df, index, url, force_update_all=False, get_bonds=False, get_bank_info=False, redeem=False, save_1099=True)
if __name__ == '__main__':
df = pd.read_csv('accounts.csv')
url = "https://www.treasurydirect.gov/RS/UN-Display.do"
close_chrome()
get_account_data(force_update_all=True)
#treasury_direct.py
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import NoSuchElementException, TimeoutException
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.action_chains import ActionChains
import pyautogui
import time
import pandas as pd
import base64
import os
from gmail import get_otp, move_otp_emails_to_trash
import psutil
from config import password
def close_chrome():
for process in psutil.process_iter(["name", "exe"]):
try:
if process.info["name"] == "chrome.exe" or (process.info["exe"] and "chrome" in process.info["exe"]):
process.terminate()
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
# Close any running Chrome instances
def process_account(account_number, df, index, url, force_update_all=False, get_bonds=True, get_bank_info=False, redeem=False, save_1099=False):
chrome_options = Options()
# chrome_options.add_argument("user-data-dir=/home/jeremy/.config/google-chrome")
# chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--start-maximized")
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_options)
driver.get(url)
wait = WebDriverWait(driver, 10)
username_input = driver.find_element(By.NAME, "username")
username_input.send_keys(account_number)
submit_button = driver.find_element(By.NAME, "submit")
submit_button.click()
# Get the page source to check which text is present
page_source = driver.page_source
if 'OTP' in page_source:
otp_skipped = False
else:
otp_skipped = True
if not otp_skipped:
start_time = time.time()
otp_received = False
while not otp_received:
elapsed_time = time.time() - start_time
# Exit the loop and move to the next account if the timer exceeds 10 minutes
if elapsed_time > 5 * 60:
print("Timed out waiting for OTP after 10 minutes. Moving to the next account.")
driver.close()
move_otp_emails_to_trash()
return # Continue with the next iteration of the loop (if inside a loop)
otp = get_otp()
if otp is not None:
otp_received = True
else:
# Request a new OTP if the timer exceeds 5 minutes
if elapsed_time > 3 * 60:
try:
resend_link = driver.find_element(By.XPATH, '//a[contains(@href, "/RS/OTP-New.do")]')
resend_link.click()
except:
print('Unable to click resend OTP link')
move_otp_emails_to_trash()
return
# Sleep for 10 seconds before trying again
time.sleep(10)
otp_input = driver.find_element(By.NAME, "otp")
otp_input.send_keys(otp)
#old checkbox no longer exists
# # Check the checkbox
# try:
# checkbox = driver.find_element(By.NAME, "registerM2M")
# checkbox.click()
# except:
# print('Unable to click the checkbox')
otp_submit_button = driver.find_element(By.XPATH,
'//input[@class="action" and @type="submit" and @name="enter.x"]')
otp_submit_button.click()
# Pause the script for 5 seconds
time.sleep(3)
try:
password_field = driver.find_element(By.NAME, "password")
password_field.send_keys(password)
except:
print('Unable to enter password')
return
try:
submit_button = driver.find_element(By.NAME, "enter.x")
submit_button.click()
except:
return
# Get the page source to check which text is present
page_source = driver.page_source
if 'Contact Info Verification' in page_source:
civ_skipped = False
else:
civ_skipped = True
if not civ_skipped:
verify_button = driver.find_element(By.XPATH, '//input[@type="submit" and @value="Verify"]')
verify_button.click()
time.sleep(3)
if get_bonds:
try:
link = wait.until(EC.presence_of_element_located((By.LINK_TEXT, "SAVINGS BONDS")))
link.click()
except (NoSuchElementException, TimeoutException):
print("Unable to locate the 'SAVINGS BONDS' link within the specified timeout. This account probably has no savings bonds.")
time.sleep(3)
# Find all radio buttons
radio_buttons = WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.XPATH, '//input[@type="radio"]'))
)
print(f"Found {len(radio_buttons)} radio buttons") # Debugging statement
# Find the first radio button that is not disabled
radio_button_to_select = None
for radio_button in radio_buttons:
is_disabled = radio_button.get_attribute("disabled")
print(f"Radio button disabled attribute: {is_disabled}") # Debugging statement
if not is_disabled:
radio_button_to_select = radio_button
break
if radio_button_to_select:
print("Found an enabled radio button.")
try:
radio_button_to_select.click()
print("Click successful.")
except Exception as e:
print(f"Error clicking the radio button: {e}")
else:
print("No enabled radio button found.")
# Click the submit button
submit_button = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.XPATH, '//input[@type="submit" and @value="Submit"]'))
)
try:
submit_button.click()
except Exception as e:
print(f"Error clicking the submit button: {e}")
try:
# Locate the elements containing the desired information
llc_and_account_number = driver.find_element(By.XPATH, '//div[@id="accountnumber"]').text
original_purchase_amount = driver.find_element(By.XPATH,
'//p[contains(text(), "Series I current holdings total amount")]/span').text
current_value = driver.find_element(By.XPATH,
'//p[contains(text(), "Series I current holdings current value")]/span').text
issue_date = driver.find_element(By.XPATH, '(//tr[contains(@class, "altrow")]/td)[3]').text
interest_rate = driver.find_element(By.XPATH, '//td[contains(text(), "%")]').text
try:
status = driver.find_element(By.XPATH, '//td[contains(text(), "Pending Redemption")]').text
if status:
print(f"Status: {status}")
df.loc[index, 'Status'] = status
else:
print("Status is blank. Moving to the next field.")
df.loc[index, 'Status'] = "N/A" # Or whatever value you wish to use for blank fields
except NoSuchElementException:
print("Element not found. Moving to the next field.")
df.loc[index, 'Status'] = "N/A" # Or whatever value you wish to use for missing fields
# Separate the LLC name and account number
llc_name, account_number = llc_and_account_number.split(':', 1)
llc_name = llc_name.strip().replace("LLC Name: ", "")
account_number = account_number.strip()
# Print the extracted information
print(f"LLC Name: {llc_name}")
print(f"Account Number: {account_number}")
print(f"Original Purchase Amount: {original_purchase_amount}")
print(f"Current Value: {current_value}")
print(f"Issue Date: {issue_date}")
print(f"Interest Rate: {interest_rate}")
# Save the extracted information as new columns for the current row
df.loc[index, 'LLC Name'] = llc_name
df.loc[index, 'Original Purchase Amount'] = original_purchase_amount
df.loc[index, 'Current Value'] = current_value
df.loc[index, 'Issue Date'] = issue_date
df.loc[index, 'Interest Rate'] = interest_rate
except NoSuchElementException:
print(f"Failed to extract ibond information for account {account_number}. Moving to the next account.")
if redeem:
print('Redeem is turned on')
radio_buttons = WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.XPATH, '//input[@type="radio"]'))
)
# Find the first radio button that is not disabled
radio_button_to_select = None
for radio_button in radio_buttons:
if not radio_button.get_attribute("disabled"):
radio_button_to_select = radio_button
break
# Select the radio button
if radio_button_to_select:
radio_button_to_select.click()
else:
print("No enabled radio button found")
# Click the submit button
select_button = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.XPATH, '//input[@type="submit" and @value="Select"]'))
)
select_button.click()
time.sleep(1)
redeem_button = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.XPATH, '//input[@type="submit" and @value="Redeem"]'))
)
redeem_button.click()
time.sleep(1)
review_button = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.XPATH, '//input[@type="submit" and @value="Review"]'))
)
review_button.click()
time.sleep(1)
submit_button = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.XPATH, '//input[@type="submit" and @value="Submit"]'))
)
submit_button.click()
time.sleep(1)
if get_bank_info:
# Get bank account information
# Click the "ManageDirect" link
wait = WebDriverWait(driver, 10)
manage_direct_link = wait.until(
EC.presence_of_element_located((By.XPATH, '//a[contains(@href, "md.DisplayManageDirect")]')))
manage_direct_link.click()
time.sleep(3)
# # Click the "Update my account information" link
# update_account_info_link = driver.find_element_by_xpath('//a[contains(@href, "ai.DisplayEditAccountInfo")]')
# update_account_info_link.click()
# Click the "Update my Bank Information" link
update_bank_info_link = driver.find_element(By.XPATH, '//a[contains(@href, "bank.DisplayBankInfo")]')
update_bank_info_link.click()
# # Check if the words "Security Question" exist on the page
# security_question_elements = driver.find_elements_by_xpath('//h1/strong[contains(text(), "Security Question")]')
#
# if len(security_question_elements) > 0:
# # Extract the question
# question = driver.find_element_by_xpath('//h3').text
#
# # If the question contains the word "pet's", fill the input element with 'mona'
# if "pet's" in question.lower():
# answer_input = driver.find_element_by_xpath('//input[@type="password" and @name="securityQuestionAnswer"]')
# answer_input.send_keys(pets_name)
time.sleep(3)
try:
bank_name = driver.find_element(By.XPATH, '//tr[@class="altrow1"][1]/td[3]/strong').text
routing_number = driver.find_element(By.XPATH, '//tr[@class="altrow1"][2]/td[3]/strong').text
account_number = driver.find_element(By.XPATH, '//tr[@class="altrow1"][3]/td[3]/strong').text
names_on_account = driver.find_element(By.XPATH, '//tr[@class="altrow1"][4]/td[3]/strong').text
account_type = driver.find_element(By.XPATH, '//tr[@class="altrow1"][5]/td[3]/strong').text
return_code = driver.find_element(By.XPATH, '//tr[@class="altrow1"][6]/td[3]/strong').text
# Print the extracted information
print("Bank Name:", bank_name)
print("Routing Number:", routing_number)
print("Account Number:", account_number)
print("Name(s) on Account:", names_on_account)
print("Account Type:", account_type)
print("Return Code:", return_code)
# Save the extracted information as new columns for the current row
df.loc[index, 'Bank Name'] = bank_name
df.loc[index, 'Routing Number'] = routing_number
df.loc[index, 'Account Number'] = account_number
df.loc[index, 'Name(s) on Account'] = names_on_account
df.loc[index, 'Account Type'] = account_type
df.loc[index, 'Return Code'] = return_code
# print('Press enter to continue')
# input()
except NoSuchElementException:
print(f"Failed to extract information for account {account_number}. Moving to the next account.")
df.to_csv('accounts.csv', index=False)
#
# #code to redeem
# # Click on Current Holdings
# try:
# body_element = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, 'currentholdings')))
# body_element.click()
# except TimeoutException:
# print("Couldn't find Current Holdings element.")
#
# # Click on the radio button for Series I Savings Bond
# try:
# series_i_radio = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.XPATH, '//input[@type="radio" and @name="seriesCode" and @value="9122270681520925360"]')))
# series_i_radio.click()
# except TimeoutException:
# print("Couldn't find Series I Savings Bond radio button.")
#
# # Click on the Submit button
# try:
# submit_button = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.XPATH, '//input[@class="action" and @type="submit" and @name="572180930158889311"]')))
# submit_button.click()
# except TimeoutException:
# print("Couldn't find first Submit button.")
#
# # Click on the next radio button
# try:
# next_radio_button = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.XPATH, '//input[@type="radio" and @name="security_parms" and @value="7956006363914591110"]')))
# next_radio_button.click()
# except TimeoutException:
# print("Couldn't find next radio button.")
#
# # Click on the Select button
# try:
# select_button = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.XPATH, '//input[@class="action" and @type="submit" and @name="8745557767672189629"]')))
# select_button.click()
# except TimeoutException:
# print("Couldn't find Select button.")
#
# # Click on the Redeem radio button
# try:
# redeem_button = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.XPATH, '//input[@class="action" and @type="submit" and @name="2263125525558940209"]')))
# redeem_button.click()
# except TimeoutException:
# print("Couldn't find Redeem button.")
#
# # Click on the Review radio button
# try:
# review_button = WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.XPATH, '//input[@class="action" and @type="submit" and @name="5163266466587291065"]')))
# review_button.click()
# except TimeoutException:
# print("Couldn't find Review button.")
if save_1099:
directory = './1099'
# Loop through a range of years, e.g., 2020 to 2024
if not os.path.exists(directory):
os.makedirs(directory)
for year in range(2023, 2025):
try:
manage_direct_link = wait.until(
EC.presence_of_element_located((By.XPATH, '//a[contains(@href, "md.DisplayManageDirect")]')))
manage_direct_link.click()
time.sleep(2)
# year_link_text = f"Year {year}"
# year_link = WebDriverWait(driver, 5).until(EC.element_to_be_clickable((By.LINK_TEXT, year_link_text)))
# year_link.click()
# time.sleep(2)
year_link = WebDriverWait(driver, 5).until(
EC.element_to_be_clickable((By.XPATH, f"//a[contains(text(), '{year}')]")))
year_link.click()
time.sleep(2)
#you can click on the actual 1099 link but it won't exist in some instances(that tax year has to be in reportable period or later)
# # Find and Click on the "View your 1099 for tax year" Link
# view_1099_link_text = f"View your 1099 for tax year {year}"
# view_1099_link = WebDriverWait(driver, 10).until(
# EC.presence_of_element_located((By.LINK_TEXT, view_1099_link_text)))
# view_1099_link.click()
# time.sleep(3)
# Print the Page to PDF
pdf_filename = f'{account_number}_{year}_1099.pdf'
pdf_path = os.path.join(directory, pdf_filename) # Path to save PDF in the '1099' subdirectory
pdf_options = {
'printBackground': True,
'pageRanges': '1',
'paperWidth': 8.27, # A4 paper size
'paperHeight': 11.69, # A4 paper size
'path': pdf_path # Save PDF with formatted filename in the '1099' subdirectory
}
result = driver.execute_cdp_cmd("Page.printToPDF", pdf_options)
with open(pdf_path, "wb") as file:
file.write(base64.b64decode(result['data']))
except (NoSuchElementException, TimeoutException):
print(f"Unable to locate or process a link for the year {year}.")
continue
driver.close()
move_otp_emails_to_trash()
#Slow the program down as I believe the server is rate limiting.
# time.sleep(60)
#gmail.py
import os
import pickle
import base64
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google.auth.transport.requests import Request
# If modifying these SCOPES, delete the file token.pickle.
SCOPES = ['https://www.googleapis.com/auth/gmail.modify']
def get_credentials():
creds = None
# The file token.pickle stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first time.
if os.path.exists('token.pickle'):
print("Loading credentials from pickle file.")
with open('token.pickle', 'rb') as token:
creds = pickle.load(token)
else:
print("No pickle file found.")
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
print("Credentials expired. Refreshing...")
try:
creds.refresh(Request())
print("Credentials refreshed successfully.")
except Exception as e:
print(f"Could not refresh the token: {e}")
# Remove the existing token.pickle file to avoid reusing it
if os.path.exists('token.pickle'):
os.remove('token.pickle')
print("Removed expired token.pickle file.")
# Trigger the OAuth2 flow
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
else:
print("No valid credentials. Running authorization flow.")
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open('token.pickle', 'wb') as token:
print("Saving credentials to pickle file.")
pickle.dump(creds, token)
else:
print("Credentials are valid.")
return creds
def get_otp():
try:
creds = get_credentials()
service = build('gmail', 'v1', credentials=creds)
results = service.users().messages().list(userId='me',
q='from:Treasury.Direct@fiscal.treasury.gov subject:"One Time Passcode" is:unread newer_than:1m').execute()
messages = results.get('messages', [])
if not messages:
print('No messages found.')
return None
else:
# Get the first unread email
message = messages[0]
msg = service.users().messages().get(userId='me', id=message['id'], format='full').execute()
msg_str = base64.urlsafe_b64decode(msg['payload']['body']['data']).decode()
otp = msg_str.splitlines()[6].split()[0]
if otp:
one_time_passcode = otp
print(f"{one_time_passcode}")
return one_time_passcode
else:
print("No One Time Passcode found in the email.")
return None
except HttpError as error:
print(f'An error occurred: {error}')
return None
def move_otp_emails_to_trash():
try:
creds = get_credentials()
service = build('gmail', 'v1', credentials=creds)
results = service.users().messages().list(userId='me',
q='from:Treasury.Direct@fiscal.treasury.gov subject:"One Time Passcode"').execute()
messages = results.get('messages', [])
if not messages:
print('No messages found.')
else:
for message in messages:
service.users().messages().trash(userId='me', id=message['id']).execute()
print(f"Moved message with ID {message['id']} to trash.")
except HttpError as error:
print(f'An error occurred: {error}')
move_otp_emails_to_trash()
#
#
# if __name__ == '__main__':
# get_emails()
#config.py
password = 'password'