Like many brokers, you don’t get to make unlimited connections to the Alpaca API. What I’m going to set up today is Redis to redistribute Alpaca trades, quotes, and bars data. The purpose of this is so that all of my algorithms have access to Alpaca’s data.
This program also shows you how many symbols are missing each minute, saves the data to dataframes and to your local disk for further analysis.
main.py
: This is the entry point of the application, responsible for initializing the subscription and handling user input.redis_subscriber.py
: This file establishes a connection to Redis and sets up the subscription mechanism for Alpaca trades, quotes, or bars. It listens for incoming data and then broadcasts it to the appropriate channels for further processing.subscriptions.py
: This file contains functions for subscribing and unsubscribing to Alpaca trades, quotes, and bars. These functions ensure a smooth and error-free data streaming experience.message_processing.py
: This file processes incoming data from the Alpaca API and formats it for further analysis. It handles any errors that may occur during the process and ensures that the data is consistent and accurate.dataframes.py
: This file manages the dataframes used for storing and analyzing the received data. It organizes the data in an accessible format and performs any necessary data manipulation tasks.config.py
: This file manages the configuration, including API keys, directory settings, and the Alpaca REST API instance. Centralizing these configurations ensures easy maintenance and organization.
Install Redis
sudo apt-get update
sudo apt-get install redis-server
sudo systemctl enable redis-server
sudo systemctl status redis
If you successfully get a PONG reply after issuing this command Redis is now configured. You should only run Redis in this set up on a secure internal network. Further hardening is required if this network is accessible.
main.py
import asyncio
import websockets
import json
import os
import traceback
from colorama import Fore, Style, init
import redis
from config import APCA_API_KEY_ID, APCA_API_SECRET_KEY, live_api
from subscriptions import subscribe_to_trades, subscribe_to_quotes, subscribe_to_bars,\
unsubscribe_trade_updates, unsubscribe_quote_updates, unsubscribe_bar_updates
from message_processing import process_message
import websockets.exceptions
import pandas as pd
from datetime import datetime
from dataframes import create_dataframes
symbols_to_trade = []
async def on_message(ws, message):
try:
messages = json.loads(message)
for msg in messages:
process_message(msg, trades_df, quotes_df, bars_df)
redis_client.publish('alpaca-messages', json.dumps(msg))
except Exception as e:
print("Error in on_message:")
traceback.print_exc()
async def authenticate(ws):
auth_data = {
"action": "auth",
"key": APCA_API_KEY_ID,
"secret": APCA_API_SECRET_KEY
}
await ws.send(json.dumps(auth_data))
async def create_ws_connection(symbols, source='sip'):
base_url = f'wss://stream.data.alpaca.markets/v2/{source}'
async with websockets.connect(base_url, ping_timeout=60) as ws: # Set ping_timeout to 60 seconds
await authenticate(ws)
# Subscribe to trades
print('Subscribing to trades')
await subscribe_to_trades(ws, symbols_to_trade)
# Subscribe to quotes
print('Subscribing to quotes')
await subscribe_to_quotes(ws, symbols_to_trade)
# Subscribe to bars
print('Subscribing to bars')
await subscribe_to_bars(ws, symbols_to_trade)
while True:
try:
message = await ws.recv()
await on_message(ws, message)
except websockets.exceptions.ConnectionClosedError as e:
print(f"Connection closed: {e}, reconnecting...")
await create_ws_connection(symbols_to_trade, source=source)
break
except Exception as e:
print(f"Error: {e}")
def get_assets(active=True, tradable=False, shortable=False, exclude_curencies=True):
global symbols_to_trade
assets = live_api.list_assets()
filtered_assets_dict = {}
for asset in assets:
if active and asset.status != 'active':
continue
if tradable and not asset.tradable:
continue
if shortable and not asset.shortable:
continue
if exclude_curencies and '/' in asset.symbol:
continue
filtered_assets_dict[asset.symbol] = asset.name
symbols_to_trade = list(filtered_assets_dict.keys())
print(f'Returning {len(symbols_to_trade)} assets')
return symbols_to_trade
async def run_stream(symbols_to_trade, source='sip'):
while True:
try:
await create_ws_connection(symbols_to_trade, source=source)
except websockets.exceptions.ConnectionClosedError as e:
print(f"Connection closed: {e}, retrying in 1 seconds...")
await asyncio.sleep(1)
except Exception as e:
print(f"Error: {e}, retrying in 1 seconds...")
await asyncio.sleep(1)
if __name__ == "__main__":
symbols = get_assets(active=True, tradable=False, shortable=False, exclude_curencies=True)
trades_df, quotes_df, bars_df = create_dataframes(symbols)
redis_client = redis.Redis(host='localhost', port=6379, db=0)
asyncio.run(run_stream(symbols_to_trade))
redis_subscriber.py
import redis
def on_message(channel, message):
print(f"Message received on channel '{channel}': {message}")
def main():
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
# Replace 'alpaca-messages' with the desired channel name
channel_name = 'alpaca-messages'
pubsub = redis_client.pubsub()
pubsub.subscribe(channel_name)
print(f"Subscribed to '{channel_name}' channel. Awaiting messages...")
while True:
message = pubsub.get_message()
if message:
if message['type'] == 'message':
on_message(message['channel'], message['data'])
if __name__ == "__main__":
main()
dataframes.py
import pandas as pd
import traceback
from datetime import datetime
from dateutil.parser import parse
import pytz
import os
from config import minute_data_dir
import asyncio
tzinfos = {'ZZ': pytz.UTC}
def append_to_csv(file_path, data):
with open(file_path, 'a') as f:
f.write(data + '\n')
def check_missing_values():
while True:
time.sleep(60) # Wait for 1 minute
for df_name, df in zip(["trades_df", "quotes_df", "bars_df"], [trades_df, quotes_df, bars_df]):
row_missing_percentage = (df.isna().sum(axis=1) / df.shape[1] * 100).round(2)
print(f"{df_name} row missing values (%):\n{row_missing_percentage}\n")
def create_dataframes(symbols):
global trades_df, quotes_df, bars_df
trades_columns = pd.MultiIndex.from_product([symbols, ['price', 'size', 'timestamp']])
quotes_columns = pd.MultiIndex.from_product([symbols, ['bid', 'ask']])
bars_columns = pd.MultiIndex.from_product([symbols, ['open', 'high', 'low', 'close', 'volume']])
trades_df = pd.DataFrame(columns=trades_columns)
quotes_df = pd.DataFrame(columns=quotes_columns)
bars_df = pd.DataFrame(columns=bars_columns)
return trades_df, quotes_df, bars_df
#
# async def save_data(df, data_directory, file_name, file_format='pickle'):
# #CSV saving does not currently work as it slows down the process and the wss will die.
# if not os.path.exists(data_directory):
# os.makedirs(data_directory)
#
# file_path = os.path.join(data_directory, f"{file_name}.{file_format}")
#
# # Calculate missing data percentage for the row to be appended
# row_missing_percentage = (df.isna().sum(axis=1) / df.shape[1] * 100).round(2)
# print(f"{file_name} row missing values (%):\n{row_missing_percentage}\n")
#
# if not os.path.exists(file_path):
# if file_format == 'csv':
# with open(file_path, 'w') as f:
# f.write(df.to_csv(header=True, index=True))
# elif file_format == 'pickle':
# df.to_pickle(file_path)
# else:
# if file_format == 'csv':
# existing_df = pd.read_csv(file_path, header=[0, 1], index_col=0)
# combined_df = pd.concat([existing_df, df])
# # Ensure the correct column order in the combined DataFrame
# combined_df = combined_df.reorder_levels([1, 0], axis=1).sort_index(axis=1)
#
# with open(file_path, 'w') as f:
# f.write(combined_df.to_csv(header=True, index=True))
# elif file_format == 'pickle':
# existing_df = pd.read_pickle(file_path)
# combined_df = pd.concat([existing_df, df])
# combined_df.to_pickle(file_path)
def update_trades_df(symbol, price, size, timestamp, df, minute_data_dir):
try:
timestamp_dt = parse(timestamp, tzinfos=tzinfos).replace(second=0, microsecond=0)
if timestamp_dt not in df.index:
# if len(df.index) > 1:
# asyncio.create_task(save_data(df, minute_data_dir, 'trades', 'pickle'))
df.loc[timestamp_dt] = pd.Series(dtype='float64')
df.at[timestamp_dt, (symbol, 'price')] = price
# Cumulatively add 1 size to the current size value
current_size = df.at[timestamp_dt, (symbol, 'size')]
if pd.isna(current_size):
current_size = 0
df.at[timestamp_dt, (symbol, 'size')] = current_size + size
except Exception as e:
print("Error:", e)
traceback.print_exc()
def update_quotes_df(symbol, bid, ask, timestamp, df, minute_data_dir):
try:
timestamp_dt = parse(timestamp, tzinfos=tzinfos).replace(second=0, microsecond=0)
if timestamp_dt not in df.index:
# if len(df.index) > 1:
# asyncio.create_task(save_data(df, minute_data_dir, "quotes", 'pickle'))
df.loc[timestamp_dt] = pd.Series(dtype='float64')
df.at[timestamp_dt, (symbol, 'bid')] = bid
df.at[timestamp_dt, (symbol, 'ask')] = ask
except Exception as e:
print("Error:", e)
traceback.print_exc()
def update_bars_df(symbol, open_price, high_price, low_price, close_price, volume, timestamp, df, minute_data_dir):
try:
timestamp_dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00')).replace(second=0, microsecond=0)
if timestamp_dt not in df.index:
# if len(df.index) > 1:
# asyncio.create_task(save_data(df, minute_data_dir, "bars", 'pickle'))
df.loc[timestamp_dt] = pd.Series(dtype='float64')
df.at[timestamp_dt, (symbol, 'open')] = open_price
df.at[timestamp_dt, (symbol, 'high')] = high_price
df.at[timestamp_dt, (symbol, 'low')] = low_price
df.at[timestamp_dt, (symbol, 'close')] = close_price
df.at[timestamp_dt, (symbol, 'volume')] = volume
except Exception as e:
print("Error:", e)
traceback.print_exc()
message_processing.py
from constants import exchange_codes, trade_conditions_cts, trade_conditions_utdf, cqs_quote_conditions, uqdf_quote_conditions
from dataframes import update_trades_df, update_quotes_df, update_bars_df
from colorama import Fore, Style
import traceback
from config import minute_data_dir
def process_message(msg, trades_df, quotes_df, bars_df):
msg_type = msg['T']
try:
if msg_type == 't': # Trade
symbol = msg['S']
trade_id = msg['i']
exchange_code = msg['x']
exchange_desc = exchange_codes.get(exchange_code, "Unknown")
trade_price = msg['p']
trade_size = msg['s']
trade_condition = msg['c']
conditions_desc = [trade_conditions_cts.get(c, "Unknown") for c in trade_condition]
timestamp = msg['t']
tape = msg['z']
if exchange_code in ["A", "N", "P"]:
plan = "CTA"
trade_conditions_desc = [trade_conditions_cts.get(cond, "Unknown") for cond in trade_condition]
elif exchange_code in ["B", "Q", "S", "T", "X"]:
plan = "UTP"
trade_conditions_desc = [trade_conditions_utdf.get(cond, "Unknown") for cond in trade_condition]
elif exchange_code in ["C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "O", "R", "U", "V", "W", "Y"]:
plan = "Unknown"
trade_conditions_desc = [trade_conditions_cts.get(cond, "Unknown") for cond in trade_condition]
else:
plan = "Unknown"
trade_conditions_desc = [trade_conditions_cts.get(cond, "Unknown") for cond in trade_condition]
# update_trades_df(symbol, trade_price, trade_size, timestamp, df=trades_df, minute_data_dir=minute_data_dir)
# print("Trade:")
# print(f"Symbol: {symbol}")
# print(f"Trade ID: {trade_id}")
# print(f"Exchange Code: {exchange_code} ({exchange_desc})")
# print(f"Trade Price: {trade_price}")
# print(f"Trade Size: {trade_size}")
# print(f"Trade Condition: {trade_condition} ({', '.join(conditions_desc)})")
# print(f"Timestamp: {timestamp}")
# print(f"Tape: {tape}")
# print("-------")
elif msg_type == 'q': # Quote
symbol = msg['S']
ask_exchange_code = msg['ax']
ask_exchange_desc = exchange_codes.get(ask_exchange_code, "Unknown")
ask_price = msg['ap']
ask_size = msg['as']
bid_exchange_code = msg['bx']
bid_exchange_desc = exchange_codes.get(bid_exchange_code, "Unknown")
bid_price = msg['bp']
bid_size = msg['bs']
quote_condition = msg['c']
timestamp = msg['t']
tape = msg['z']
conditions = msg.get("c", [])
decoded_conditions = []
for condition_code in conditions:
if condition_code in cqs_quote_conditions:
decoded_conditions.append(cqs_quote_conditions[condition_code])
elif condition_code in uqdf_quote_conditions:
decoded_conditions.append(uqdf_quote_conditions[condition_code])
else:
decoded_conditions.append(f"Unknown Condition: {condition_code}")
# update_quotes_df(symbol, bid_price, ask_price, timestamp, df=quotes_df, minute_data_dir=minute_data_dir)
# print("Quote:")
# print(f"Symbol: {symbol}")
# print(f"Ask Exchange Code: {ask_exchange_code} ({ask_exchange_desc})")
# print(f"Ask Price: {ask_price}")
# print(f"Ask Size: {ask_size}")
# print(f"Bid Exchange Code: {bid_exchange_code} ({bid_exchange_desc})")
# print(f"Bid Price: {bid_price}")
# print(f"Bid Size: {bid_size}")
# print(f"Quote Condition: {quote_condition}")
# print(f"Quote Condition: {decoded_conditions}")
# print(f"Timestamp: {timestamp}")
# print(f"Tape: {tape}")
# print("-------")
elif msg_type in ['b', 'd', 'u']: # Bar
symbol = msg['S']
open_price = msg['o']
high_price = msg['h']
low_price = msg['l']
close_price = msg['c']
volume = msg['v']
timestamp = msg['t']
# print("Bar:")
# print(f"Symbol: {symbol}")
# print(f"Open Price: {open_price}")
# print(f"High Price: {high_price}")
# print(f"Low Price: {low_price}")
# print(f"Close Price: {close_price}")
# print(f"Volume: {volume}")
# print(f"Timestamp: {timestamp}")
# print("-------")
# update_bars_df(symbol, open_price, high_price, low_price, close_price, volume, timestamp, df=bars_df, minute_data_dir=minute_data_dir)
elif msg_type == 'c': # Trade Correction
symbol = msg['S']
exchange_code = msg['x']
original_trade_id = msg['oi']
original_trade_price = msg['op']
original_trade_size = msg['os']
original_trade_conditions = msg['oc']
corrected_trade_id = msg['ci']
corrected_trade_price = msg['cp']
corrected_trade_size = msg['cs']
corrected_trade_conditions = msg['cc']
timestamp = msg['t']
tape = msg['z']
# Handle the trade correction message here
elif msg_type == 'x': # Trade Cancel/Error
symbol = msg['S']
trade_id = msg['i']
exchange_code = msg['x']
trade_price = msg['p']
trade_size = msg['s']
action = msg['a']
timestamp = msg['t']
tape = msg['z']
# Handle the trade cancel/error message here
elif msg_type == 'l': # LULD
symbol = msg['S']
limit_up_price = msg['u']
limit_down_price = msg['d']
indicator = msg['i']
timestamp = msg['t']
tape = msg['z']
# Handle the LULD message here
elif msg_type == 'error': # Error
code = msg['code']
error_msg = msg['msg']
print(Fore.RED + f"Error Code: {code}")
print(f"Error Message: {error_msg}")
print(Style.RESET_ALL + "-------")
elif msg['T'] == 'success':
error_msg = msg['msg']
print(Fore.GREEN + f"Success Message: {error_msg}")
print(Style.RESET_ALL + "-------")
elif msg_type == "subscription":
print("Subscription:")
for item, symbols in msg.items():
if item != 'T' and symbols:
num_symbols = len(symbols)
print(f"{item}: {num_symbols} symbols")
print("-------")
else:
print(Fore.RED + f"Unknown message type: {msg_type}" + Style.RESET_ALL)
except Exception as e:
print(Fore.RED + "Error:", e, Style.RESET_ALL)
traceback.print_exc()
subscriptions.py
import json
import traceback
from colorama import Fore, Style, init
init(autoreset=True)
async def subscribe_to_trades(ws, symbols):
try:
sub_data = {
"action": "subscribe",
"trades": symbols
}
message = json.dumps(sub_data)
await ws.send(message)
except Exception as e:
print_error("Error in subscribe_to_trades:")
traceback.print_exc()
async def subscribe_to_quotes(ws, symbols):
try:
sub_data = {
"action": "subscribe",
"quotes": symbols
}
message = json.dumps(sub_data)
await ws.send(message)
except Exception as e:
print_error("Error in subscribe_to_quotes:")
traceback.print_exc()
async def subscribe_to_bars(ws, symbols):
try:
sub_data = {
"action": "subscribe",
"bars": symbols
}
message = json.dumps(sub_data)
await ws.send(message)
except Exception as e:
print_error("Error in subscribe_to_bars:")
traceback.print_exc()
async def unsubscribe_trade_updates(ws, symbols):
try:
sub_data = {
"action": "unsubscribe",
"trades": symbols
}
await ws.send(json.dumps(sub_data))
except Exception as e:
print_error("Error in unsubscribe_trade_updates:")
traceback.print_exc()
async def unsubscribe_quote_updates(ws, symbols):
try:
sub_data = {
"action": "unsubscribe",
"quotes": symbols
}
await ws.send(json.dumps(sub_data))
except Exception as e:
print_error("Error in unsubscribe_quote_updates:")
traceback.print_exc()
async def unsubscribe_bar_updates(ws, symbols):
try:
sub_data = {
"action": "unsubscribe",
"bars": symbols
}
await ws.send(json.dumps(sub_data))
except Exception as e:
print_error("Error in unsubscribe_bar_updates:")
traceback.print_exc()
config.py
import os
import sys
import alpaca_trade_api as tradeapi
parent_dir = os.path.abspath(os.path.join(os.getcwd(), '..'))
data_dir = os.path.join(parent_dir, 'data/')
sys.path.append(data_dir)
minute_data_dir = os.path.join(data_dir, 'minute_data/')
import keys
BASE_URL, API_KEY, SECRET_KEY = #define your own here
LIVE_BASE_URL, LIVE_API_KEY, LIVE_SECRET_KEY = #define your own here
LIVE_BASE_URL, APCA_API_KEY_ID, APCA_API_SECRET_KEY = #define your own here
live_api = tradeapi.REST(LIVE_API_KEY, LIVE_SECRET_KEY, LIVE_BASE_URL, api_version='v2')