Republish Alpaca trades, quotes, and bars using Redis

Like many brokers, you don’t get to make unlimited connections to the Alpaca API. What I’m going to set up today is Redis to redistribute Alpaca trades, quotes, and bars data. The purpose of this is so that all of my algorithms have access to Alpaca’s data.

This program also shows you how many symbols are missing each minute, saves the data to dataframes and to your local disk for further analysis.

  1. main.py: This is the entry point of the application, responsible for initializing the subscription and handling user input.
  2. redis_subscriber.py: This file establishes a connection to Redis and sets up the subscription mechanism for Alpaca trades, quotes, or bars. It listens for incoming data and then broadcasts it to the appropriate channels for further processing.
  3. subscriptions.py: This file contains functions for subscribing and unsubscribing to Alpaca trades, quotes, and bars. These functions ensure a smooth and error-free data streaming experience.
  4. message_processing.py: This file processes incoming data from the Alpaca API and formats it for further analysis. It handles any errors that may occur during the process and ensures that the data is consistent and accurate.
  5. dataframes.py: This file manages the dataframes used for storing and analyzing the received data. It organizes the data in an accessible format and performs any necessary data manipulation tasks.
  6. config.py: This file manages the configuration, including API keys, directory settings, and the Alpaca REST API instance. Centralizing these configurations ensures easy maintenance and organization.

Install Redis

sudo apt-get update

sudo apt-get install redis-server

sudo systemctl enable redis-server

sudo systemctl status redis

If you successfully get a PONG reply after issuing this command Redis is now configured. You should only run Redis in this set up on a secure internal network. Further hardening is required if this network is accessible.

main.py

import asyncio
import websockets
import json
import os
import traceback
from colorama import Fore, Style, init
import redis

from config import APCA_API_KEY_ID, APCA_API_SECRET_KEY, live_api
from subscriptions import subscribe_to_trades, subscribe_to_quotes, subscribe_to_bars,\
                           unsubscribe_trade_updates, unsubscribe_quote_updates, unsubscribe_bar_updates
from message_processing import process_message
import websockets.exceptions


import pandas as pd
from datetime import datetime

from dataframes import create_dataframes

symbols_to_trade = []

async def on_message(ws, message):
    try:
        messages = json.loads(message)
        for msg in messages:
            process_message(msg, trades_df, quotes_df, bars_df)
            redis_client.publish('alpaca-messages', json.dumps(msg))
    except Exception as e:
        print("Error in on_message:")
        traceback.print_exc()

async def authenticate(ws):
    auth_data = {
        "action": "auth",
        "key": APCA_API_KEY_ID,
        "secret": APCA_API_SECRET_KEY
    }
    await ws.send(json.dumps(auth_data))

async def create_ws_connection(symbols, source='sip'):
    base_url = f'wss://stream.data.alpaca.markets/v2/{source}'

    async with websockets.connect(base_url, ping_timeout=60) as ws:  # Set ping_timeout to 60 seconds
        await authenticate(ws)

        # Subscribe to trades
        print('Subscribing to trades')
        await subscribe_to_trades(ws, symbols_to_trade)

        # Subscribe to quotes
        print('Subscribing to quotes')
        await subscribe_to_quotes(ws, symbols_to_trade)

        # Subscribe to bars
        print('Subscribing to bars')
        await subscribe_to_bars(ws, symbols_to_trade)

        while True:
            try:
                message = await ws.recv()
                await on_message(ws, message)
            except websockets.exceptions.ConnectionClosedError as e:
                print(f"Connection closed: {e}, reconnecting...")
                await create_ws_connection(symbols_to_trade, source=source)
                break
            except Exception as e:
                print(f"Error: {e}")

def get_assets(active=True, tradable=False, shortable=False, exclude_curencies=True):
    global symbols_to_trade
    assets = live_api.list_assets()
    filtered_assets_dict = {}

    for asset in assets:
        if active and asset.status != 'active':
            continue
        if tradable and not asset.tradable:
            continue
        if shortable and not asset.shortable:
            continue
        if exclude_curencies and '/' in asset.symbol:
            continue
        filtered_assets_dict[asset.symbol] = asset.name

    symbols_to_trade = list(filtered_assets_dict.keys())
    print(f'Returning {len(symbols_to_trade)} assets')
    return symbols_to_trade

async def run_stream(symbols_to_trade, source='sip'):
    while True:
        try:
            await create_ws_connection(symbols_to_trade, source=source)
        except websockets.exceptions.ConnectionClosedError as e:
            print(f"Connection closed: {e}, retrying in 1 seconds...")
            await asyncio.sleep(1)
        except Exception as e:
            print(f"Error: {e}, retrying in 1 seconds...")
            await asyncio.sleep(1)



if __name__ == "__main__":
    symbols = get_assets(active=True, tradable=False, shortable=False, exclude_curencies=True)
    trades_df, quotes_df, bars_df = create_dataframes(symbols)
    redis_client = redis.Redis(host='localhost', port=6379, db=0)
    asyncio.run(run_stream(symbols_to_trade))

redis_subscriber.py

import redis

def on_message(channel, message):
    print(f"Message received on channel '{channel}': {message}")

def main():
    redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)

    # Replace 'alpaca-messages' with the desired channel name
    channel_name = 'alpaca-messages'
    pubsub = redis_client.pubsub()
    pubsub.subscribe(channel_name)

    print(f"Subscribed to '{channel_name}' channel. Awaiting messages...")

    while True:
        message = pubsub.get_message()
        if message:
            if message['type'] == 'message':
                on_message(message['channel'], message['data'])

if __name__ == "__main__":
    main()

dataframes.py

import pandas as pd
import traceback
from datetime import datetime
from dateutil.parser import parse
import pytz
import os
from config import minute_data_dir
import asyncio


tzinfos = {'ZZ': pytz.UTC}

def append_to_csv(file_path, data):
    with open(file_path, 'a') as f:
        f.write(data + '\n')

def check_missing_values():
    while True:
        time.sleep(60)  # Wait for 1 minute
        for df_name, df in zip(["trades_df", "quotes_df", "bars_df"], [trades_df, quotes_df, bars_df]):
            row_missing_percentage = (df.isna().sum(axis=1) / df.shape[1] * 100).round(2)
            print(f"{df_name} row missing values (%):\n{row_missing_percentage}\n")

def create_dataframes(symbols):
    global trades_df, quotes_df, bars_df
    trades_columns = pd.MultiIndex.from_product([symbols, ['price', 'size', 'timestamp']])
    quotes_columns = pd.MultiIndex.from_product([symbols, ['bid', 'ask']])
    bars_columns = pd.MultiIndex.from_product([symbols, ['open', 'high', 'low', 'close', 'volume']])

    trades_df = pd.DataFrame(columns=trades_columns)
    quotes_df = pd.DataFrame(columns=quotes_columns)
    bars_df = pd.DataFrame(columns=bars_columns)
    return trades_df, quotes_df, bars_df


#
# async def save_data(df, data_directory, file_name, file_format='pickle'):
#     #CSV saving does not currently work as it slows down the process and the wss will die.
#     if not os.path.exists(data_directory):
#         os.makedirs(data_directory)
#
#     file_path = os.path.join(data_directory, f"{file_name}.{file_format}")
#
#     # Calculate missing data percentage for the row to be appended
#     row_missing_percentage = (df.isna().sum(axis=1) / df.shape[1] * 100).round(2)
#     print(f"{file_name} row missing values (%):\n{row_missing_percentage}\n")
#
#     if not os.path.exists(file_path):
#         if file_format == 'csv':
#             with open(file_path, 'w') as f:
#                 f.write(df.to_csv(header=True, index=True))
#         elif file_format == 'pickle':
#             df.to_pickle(file_path)
#     else:
#         if file_format == 'csv':
#             existing_df = pd.read_csv(file_path, header=[0, 1], index_col=0)
#             combined_df = pd.concat([existing_df, df])
#             # Ensure the correct column order in the combined DataFrame
#             combined_df = combined_df.reorder_levels([1, 0], axis=1).sort_index(axis=1)
#
#             with open(file_path, 'w') as f:
#                 f.write(combined_df.to_csv(header=True, index=True))
#         elif file_format == 'pickle':
#             existing_df = pd.read_pickle(file_path)
#             combined_df = pd.concat([existing_df, df])
#             combined_df.to_pickle(file_path)

def update_trades_df(symbol, price, size, timestamp, df, minute_data_dir):
    try:
        timestamp_dt = parse(timestamp, tzinfos=tzinfos).replace(second=0, microsecond=0)
        if timestamp_dt not in df.index:
            # if len(df.index) > 1:
            #     asyncio.create_task(save_data(df, minute_data_dir, 'trades', 'pickle'))
            df.loc[timestamp_dt] = pd.Series(dtype='float64')

        df.at[timestamp_dt, (symbol, 'price')] = price

        # Cumulatively add 1 size to the current size value
        current_size = df.at[timestamp_dt, (symbol, 'size')]
        if pd.isna(current_size):
            current_size = 0

        df.at[timestamp_dt, (symbol, 'size')] = current_size + size

    except Exception as e:
        print("Error:", e)
        traceback.print_exc()

def update_quotes_df(symbol, bid, ask, timestamp, df, minute_data_dir):
    try:
        timestamp_dt = parse(timestamp, tzinfos=tzinfos).replace(second=0, microsecond=0)
        if timestamp_dt not in df.index:
            # if len(df.index) > 1:
            #     asyncio.create_task(save_data(df, minute_data_dir, "quotes", 'pickle'))
            df.loc[timestamp_dt] = pd.Series(dtype='float64')

        df.at[timestamp_dt, (symbol, 'bid')] = bid
        df.at[timestamp_dt, (symbol, 'ask')] = ask
    except Exception as e:
        print("Error:", e)
        traceback.print_exc()

def update_bars_df(symbol, open_price, high_price, low_price, close_price, volume, timestamp, df, minute_data_dir):
    try:
        timestamp_dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00')).replace(second=0, microsecond=0)
        if timestamp_dt not in df.index:
            # if len(df.index) > 1:
            #     asyncio.create_task(save_data(df, minute_data_dir, "bars", 'pickle'))
            df.loc[timestamp_dt] = pd.Series(dtype='float64')

        df.at[timestamp_dt, (symbol, 'open')] = open_price
        df.at[timestamp_dt, (symbol, 'high')] = high_price
        df.at[timestamp_dt, (symbol, 'low')] = low_price
        df.at[timestamp_dt, (symbol, 'close')] = close_price
        df.at[timestamp_dt, (symbol, 'volume')] = volume
    except Exception as e:
        print("Error:", e)
        traceback.print_exc()

message_processing.py

from constants import exchange_codes, trade_conditions_cts, trade_conditions_utdf, cqs_quote_conditions, uqdf_quote_conditions
from dataframes import update_trades_df, update_quotes_df, update_bars_df
from colorama import Fore, Style
import traceback
from config import minute_data_dir


def process_message(msg, trades_df, quotes_df, bars_df):
    msg_type = msg['T']
    try:
        if msg_type == 't':  # Trade
            symbol = msg['S']
            trade_id = msg['i']
            exchange_code = msg['x']
            exchange_desc = exchange_codes.get(exchange_code, "Unknown")
            trade_price = msg['p']
            trade_size = msg['s']
            trade_condition = msg['c']
            conditions_desc = [trade_conditions_cts.get(c, "Unknown") for c in trade_condition]
            timestamp = msg['t']
            tape = msg['z']
            if exchange_code in ["A", "N", "P"]:
                plan = "CTA"
                trade_conditions_desc = [trade_conditions_cts.get(cond, "Unknown") for cond in trade_condition]
            elif exchange_code in ["B", "Q", "S", "T", "X"]:
                plan = "UTP"
                trade_conditions_desc = [trade_conditions_utdf.get(cond, "Unknown") for cond in trade_condition]
            elif exchange_code in ["C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "O", "R", "U", "V", "W", "Y"]:
                plan = "Unknown"
                trade_conditions_desc = [trade_conditions_cts.get(cond, "Unknown") for cond in trade_condition]
            else:
                plan = "Unknown"
                trade_conditions_desc = [trade_conditions_cts.get(cond, "Unknown") for cond in trade_condition]

            # update_trades_df(symbol, trade_price, trade_size, timestamp, df=trades_df, minute_data_dir=minute_data_dir)

            # print("Trade:")
            # print(f"Symbol: {symbol}")
            # print(f"Trade ID: {trade_id}")
            # print(f"Exchange Code: {exchange_code} ({exchange_desc})")
            # print(f"Trade Price: {trade_price}")
            # print(f"Trade Size: {trade_size}")
            # print(f"Trade Condition: {trade_condition} ({', '.join(conditions_desc)})")
            # print(f"Timestamp: {timestamp}")
            # print(f"Tape: {tape}")
            # print("-------")

        elif msg_type == 'q':  # Quote
            symbol = msg['S']
            ask_exchange_code = msg['ax']
            ask_exchange_desc = exchange_codes.get(ask_exchange_code, "Unknown")
            ask_price = msg['ap']
            ask_size = msg['as']
            bid_exchange_code = msg['bx']
            bid_exchange_desc = exchange_codes.get(bid_exchange_code, "Unknown")
            bid_price = msg['bp']
            bid_size = msg['bs']
            quote_condition = msg['c']
            timestamp = msg['t']
            tape = msg['z']

            conditions = msg.get("c", [])
            decoded_conditions = []
            for condition_code in conditions:
                if condition_code in cqs_quote_conditions:
                    decoded_conditions.append(cqs_quote_conditions[condition_code])
                elif condition_code in uqdf_quote_conditions:
                    decoded_conditions.append(uqdf_quote_conditions[condition_code])
                else:
                    decoded_conditions.append(f"Unknown Condition: {condition_code}")

            # update_quotes_df(symbol, bid_price, ask_price, timestamp, df=quotes_df, minute_data_dir=minute_data_dir)



            # print("Quote:")
            # print(f"Symbol: {symbol}")
            # print(f"Ask Exchange Code: {ask_exchange_code} ({ask_exchange_desc})")
            # print(f"Ask Price: {ask_price}")
            # print(f"Ask Size: {ask_size}")
            # print(f"Bid Exchange Code: {bid_exchange_code} ({bid_exchange_desc})")
            # print(f"Bid Price: {bid_price}")
            # print(f"Bid Size: {bid_size}")
            # print(f"Quote Condition: {quote_condition}")
            # print(f"Quote Condition: {decoded_conditions}")
            # print(f"Timestamp: {timestamp}")
            # print(f"Tape: {tape}")
            # print("-------")

        elif msg_type in ['b', 'd', 'u']:  # Bar

            symbol = msg['S']
            open_price = msg['o']
            high_price = msg['h']
            low_price = msg['l']
            close_price = msg['c']
            volume = msg['v']
            timestamp = msg['t']

            # print("Bar:")
            # print(f"Symbol: {symbol}")
            # print(f"Open Price: {open_price}")
            # print(f"High Price: {high_price}")
            # print(f"Low Price: {low_price}")
            # print(f"Close Price: {close_price}")
            # print(f"Volume: {volume}")
            # print(f"Timestamp: {timestamp}")
            # print("-------")

            # update_bars_df(symbol, open_price, high_price, low_price, close_price, volume, timestamp, df=bars_df, minute_data_dir=minute_data_dir)

        elif msg_type == 'c':  # Trade Correction
            symbol = msg['S']
            exchange_code = msg['x']
            original_trade_id = msg['oi']
            original_trade_price = msg['op']
            original_trade_size = msg['os']
            original_trade_conditions = msg['oc']
            corrected_trade_id = msg['ci']
            corrected_trade_price = msg['cp']
            corrected_trade_size = msg['cs']
            corrected_trade_conditions = msg['cc']
            timestamp = msg['t']
            tape = msg['z']
            # Handle the trade correction message here

        elif msg_type == 'x':  # Trade Cancel/Error
            symbol = msg['S']
            trade_id = msg['i']
            exchange_code = msg['x']
            trade_price = msg['p']
            trade_size = msg['s']
            action = msg['a']
            timestamp = msg['t']
            tape = msg['z']
            # Handle the trade cancel/error message here

        elif msg_type == 'l':  # LULD
            symbol = msg['S']
            limit_up_price = msg['u']
            limit_down_price = msg['d']
            indicator = msg['i']
            timestamp = msg['t']
            tape = msg['z']
            # Handle the LULD message here

        elif msg_type == 'error':  # Error
            code = msg['code']
            error_msg = msg['msg']
            print(Fore.RED + f"Error Code: {code}")
            print(f"Error Message: {error_msg}")
            print(Style.RESET_ALL + "-------")

        elif msg['T'] == 'success':
            error_msg = msg['msg']
            print(Fore.GREEN + f"Success Message: {error_msg}")
            print(Style.RESET_ALL + "-------")

        elif msg_type == "subscription":
            print("Subscription:")
            for item, symbols in msg.items():
                if item != 'T' and symbols:
                    num_symbols = len(symbols)
                    print(f"{item}: {num_symbols} symbols")
            print("-------")

        else:
            print(Fore.RED + f"Unknown message type: {msg_type}" + Style.RESET_ALL)

    except Exception as e:
        print(Fore.RED + "Error:", e, Style.RESET_ALL)
        traceback.print_exc()

subscriptions.py

import json
import traceback
from colorama import Fore, Style, init

init(autoreset=True)

async def subscribe_to_trades(ws, symbols):
    try:
        sub_data = {
            "action": "subscribe",
            "trades": symbols
        }
        message = json.dumps(sub_data)
        await ws.send(message)
    except Exception as e:
        print_error("Error in subscribe_to_trades:")
        traceback.print_exc()

async def subscribe_to_quotes(ws, symbols):
    try:
        sub_data = {
            "action": "subscribe",
            "quotes": symbols
        }
        message = json.dumps(sub_data)
        await ws.send(message)
    except Exception as e:
        print_error("Error in subscribe_to_quotes:")
        traceback.print_exc()

async def subscribe_to_bars(ws, symbols):
    try:
        sub_data = {
            "action": "subscribe",
            "bars": symbols
        }
        message = json.dumps(sub_data)
        await ws.send(message)
    except Exception as e:
        print_error("Error in subscribe_to_bars:")
        traceback.print_exc()

async def unsubscribe_trade_updates(ws, symbols):
    try:
        sub_data = {
            "action": "unsubscribe",
            "trades": symbols
        }
        await ws.send(json.dumps(sub_data))
    except Exception as e:
        print_error("Error in unsubscribe_trade_updates:")
        traceback.print_exc()

async def unsubscribe_quote_updates(ws, symbols):
    try:
        sub_data = {
            "action": "unsubscribe",
            "quotes": symbols
        }
        await ws.send(json.dumps(sub_data))
    except Exception as e:
        print_error("Error in unsubscribe_quote_updates:")
        traceback.print_exc()

async def unsubscribe_bar_updates(ws, symbols):
    try:
        sub_data = {
            "action": "unsubscribe",
            "bars": symbols
        }
        await ws.send(json.dumps(sub_data))
    except Exception as e:
        print_error("Error in unsubscribe_bar_updates:")
        traceback.print_exc()

config.py

import os
import sys
import alpaca_trade_api as tradeapi

parent_dir = os.path.abspath(os.path.join(os.getcwd(), '..'))
data_dir = os.path.join(parent_dir, 'data/')
sys.path.append(data_dir)
minute_data_dir = os.path.join(data_dir, 'minute_data/')

import keys

BASE_URL, API_KEY, SECRET_KEY = #define your own here
LIVE_BASE_URL, LIVE_API_KEY, LIVE_SECRET_KEY =  #define your own here

LIVE_BASE_URL, APCA_API_KEY_ID, APCA_API_SECRET_KEY =  #define your own here


live_api = tradeapi.REST(LIVE_API_KEY, LIVE_SECRET_KEY, LIVE_BASE_URL, api_version='v2')

Leave a Reply