A Python Based Alpaca Template to Process Real-time Market Data

The world of finance and trading has seen a massive surge in the use of technology and data. With the increasing availability of APIs and data streams, it has become essential for traders and developers to harness the power of real-time information to make better-informed decisions. Alpaca, a popular commission-free trading API, is one such service that enables developers to access real-time market data and automate their trading strategies.

However, getting started with Alpaca’s API and handling WebSocket streams for quotes, trades, and bars can be a challenging task, especially for those who are new to the domain. To help you overcome this hurdle, I have put together a Python template that simplifies the process and serves as a starting point for your custom trading projects. Here is a sample of the outputs.

Quotes:

Trades:

Bars:

In this blog post, I will walk you through the key components of this template, and explain how you can use it to access real-time data from Alpaca’s API and modify it according to your preferences.

  1. Organizing the Codebase

The first step towards building a clean and maintainable trading application is to organize your codebase into modular components. In our template, we have separated the code into different modules:

  • config.py: Contains the API key and secret key for Alpaca authentication.
  • constants.py: Stores dictionaries for exchange codes, trade conditions, and quote conditions.
  • subscriptions.py: Defines functions for subscribing and unsubscribing from trades, quotes, and bars.
  • message_processing.py: Contains a function for processing incoming messages from the WebSocket stream.

By organizing the code in this manner, we can easily manage and extend our trading application as needed.

  1. Establishing a WebSocket Connection

Our template uses the websocket library to establish a connection with Alpaca’s WebSocket stream. This is done in the create_ws_connection function, which takes the list of symbols and the data source (either ‘sip’ or ‘iex’) as input, and returns a WebSocketApp object. This object is then used to authenticate, subscribe, and handle incoming messages.

  1. Authenticating with Alpaca’s API

Before you can access real-time market data from Alpaca, you need to authenticate your WebSocket connection. The authenticate function sends an authentication message to the WebSocket, which includes your API key and secret key. Once authenticated, you can subscribe to various data streams.

  1. Subscribing to Real-time Data

Our template provides functions for subscribing to trades, quotes, and bars: subscribe_to_trades, subscribe_to_quotes, and subscribe_to_bars. These functions send subscription messages to the WebSocket, specifying the symbols you want to receive updates for.

You can also unsubscribe from any data stream using the corresponding unsubscribe_* functions, such as unsubscribe_trade_updates.

  1. Processing Incoming Messages

The on_message function handles incoming messages from the WebSocket stream. It parses the message and delegates the processing to the process_message function, which is defined in the message_processing.py module. This function takes care of processing trade, quote, and bar messages, as well as handling any errors or subscription updates.

This Python template serves as a solid foundation for building custom trading applications using Alpaca’s API. It simplifies the process of connecting to the WebSocket, authenticating, subscribing to data streams, and processing incoming messages. By using this template as a starting point, you can focus on implementing your trading strategies and refining your application according to your needs.

Feel free to modify and extend the code as required, and embark on your journey of harnessing the power of real-time market data to create sophisticated trading algorithms and applications.

In the future, you may consider incorporating features such as order management, portfolio tracking, and risk management to further enhance your trading application. Additionally, you can explore integrating other data sources and APIs to complement Alpaca’s data and provide a more comprehensive view of the market.

By leveraging this Alpaca Subscribe Template, you can quickly dive into the world of algorithmic trading and capitalize on the opportunities provided by real-time market data. So go ahead, experiment with the code, and unleash the potential of data-driven trading strategies for better decision-making and profitability. Happy coding!

main.py

import websocket
import json
import threading
import time
import os
from colorama import Fore, Style, init

from config import APCA_API_KEY_ID, APCA_API_SECRET_KEY
from constants import exchange_codes, trade_conditions_cts, cqs_quote_conditions, uqdf_quote_conditions
from subscriptions import subscribe_to_trades, subscribe_to_quotes, subscribe_to_bars,\
                           unsubscribe_trade_updates, unsubscribe_quote_updates, unsubscribe_bar_updates
from message_processing import process_message




# Function to handle incoming messages
def on_message(ws, message):
    # print("Message received: " + message)
    messages = json.loads(message)
    for msg in messages:
        process_message(msg)

# Function to authenticate
def authenticate(ws):
    auth_data = {
        "action": "auth",
        "key": APCA_API_KEY_ID,
        "secret": APCA_API_SECRET_KEY
    }
    ws.send(json.dumps(auth_data))

# Function to subscribe to trades

def create_ws_connection(symbols, source='sip'):
    base_url = f'wss://stream.data.alpaca.markets/v2/{source}'

    ws = websocket.WebSocketApp(
        base_url,
        on_message=on_message
    )

    # Authenticate and subscribe when the connection is open
    ws.on_open = lambda ws: authenticate(ws)

    return ws

if __name__ == "__main__":
    # List of symbols to subscribe
    symbols = ["AAPL", "MSFT", "GOOG", "FB", "AMZN", "TSLA", "NFLX", "NVDA", "AMD", "TWTR", "SNAP"]

    # Source: 'sip' or 'iex'
    source = 'sip'

    # Create a connection to the WebSocket
    ws = create_ws_connection(symbols, source=source)

    # Start the WebSocket connection in a new thread
    ws_thread = threading.Thread(target=ws.run_forever)
    ws_thread.start()
    time.sleep(5)

    # Let the WebSocket run for a while to receive updates

    # Subscribe to trades
    print('Subscribing to trades')
    subscribe_to_trades(ws, symbols)
    # unsubscribe_trade_updates(ws, symbols)

    # Subscribe to quotes
    print('Subscribing to quotes')
    subscribe_to_quotes(ws, symbols)
    # unsubscribe_quote_updates(ws, symbols)

    # Subscribe to bars
    print('Subscribing to bars')
    # subscribe_to_bars(ws, symbols)
    # time.sleep(5)
    # unsubscribe_bar_updates(ws, symbols)

message_processing.py

from constants import exchange_codes, trade_conditions_cts, trade_conditions_utdf, cqs_quote_conditions, uqdf_quote_conditions


def process_message(msg):
    msg_type = msg['T']

    if msg_type == 't':  # Trade
        print("Message type:", msg_type)

        symbol = msg['S']
        trade_id = msg['i']
        exchange_code = msg['x']
        exchange_desc = exchange_codes.get(exchange_code, "Unknown")
        trade_price = msg['p']
        trade_size = msg['s']
        trade_condition = msg['c']
        conditions_desc = [trade_conditions_cts.get(c, "Unknown") for c in trade_condition]
        timestamp = msg['t']
        tape = msg['z']
        if exchange_code in ["A", "N", "P"]:
            plan = "CTA"
            trade_conditions_desc = [trade_conditions_cts.get(cond, "Unknown") for cond in trade_condition]
        elif exchange_code in ["B", "Q", "S", "T", "X"]:
            plan = "UTP"
            trade_conditions_desc = [trade_conditions_utdf.get(cond, "Unknown") for cond in trade_condition]
        elif exchange_code in ["C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "O", "R", "U", "V", "W", "Y"]:
            plan = "Unknown"
            trade_conditions_desc = [trade_conditions_cts.get(cond, "Unknown") for cond in trade_condition]
        else:
            plan = "Unknown"
            trade_conditions_desc = [trade_conditions_cts.get(cond, "Unknown") for cond in trade_condition]


        print("Trade:")
        print(f"Symbol: {symbol}")
        print(f"Trade ID: {trade_id}")
        print(f"Exchange Code: {exchange_code} ({exchange_desc})")
        print(f"Trade Price: {trade_price}")
        print(f"Trade Size: {trade_size}")
        print(f"Trade Condition: {trade_condition} ({', '.join(conditions_desc)})")
        print(f"Timestamp: {timestamp}")
        print(f"Tape: {tape}")
        print("-------")

    elif msg_type == 'q':  # Quote
        symbol = msg['S']
        ask_exchange_code = msg['ax']
        ask_exchange_desc = exchange_codes.get(ask_exchange_code, "Unknown")
        ask_price = msg['ap']
        ask_size = msg['as']
        bid_exchange_code = msg['bx']
        bid_exchange_desc = exchange_codes.get(bid_exchange_code, "Unknown")
        bid_price = msg['bp']
        bid_size = msg['bs']
        quote_condition = msg['c']
        timestamp = msg['t']
        tape = msg['z']

        conditions = msg.get("c", [])
        decoded_conditions = []
        for condition_code in conditions:
            if condition_code in cqs_quote_conditions:
                decoded_conditions.append(cqs_quote_conditions[condition_code])
            elif condition_code in uqdf_quote_conditions:
                decoded_conditions.append(uqdf_quote_conditions[condition_code])
            else:
                decoded_conditions.append(f"Unknown Condition: {condition_code}")



        print("Quote:")
        print(f"Symbol: {symbol}")
        print(f"Ask Exchange Code: {ask_exchange_code} ({ask_exchange_desc})")
        print(f"Ask Price: {ask_price}")
        print(f"Ask Size: {ask_size}")
        print(f"Bid Exchange Code: {bid_exchange_code} ({bid_exchange_desc})")
        print(f"Bid Price: {bid_price}")
        print(f"Bid Size: {bid_size}")
        print(f"Quote Condition: {quote_condition}")
        print(f"Quote Condition: {decoded_conditions}")
        print(f"Timestamp: {timestamp}")
        print(f"Tape: {tape}")
        print("-------")

    elif msg_type in ['b', 'd', 'u']:  # Bar
        symbol = msg['S']
        open_price = msg['o']
        high_price = msg['h']
        low_price = msg['l']
        close_price = msg['c']
        volume = msg['v']
        timestamp = msg['t']

        print("Bar:")
        print(f"Symbol: {symbol}")
        print(f"Open Price: {open_price}")
        print(f"High Price: {high_price}")
        print(f"Low Price: {low_price}")
        print(f"Close Price: {close_price}")
        print(f"Volume: {volume}")
        print(f"Timestamp: {timestamp}")
        print("-------")


    elif msg_type == 'error':  # Error
        code = msg['code']
        error_msg = msg['msg']
        print(Fore.RED + f"Error Code: {code}")
        print(f"Error Message: {error_msg}")
        print(Style.RESET_ALL + "-------")

    elif msg['T'] == 'success':
        code = msg['code']
        error_msg = msg['msg']
        print(Fore.GREEN + f"Error Code: {code}")
        print(f"Error Message: {error_msg}")
        print(Style.RESET_ALL + "-------")


    elif msg_type == "subscription":
        print("Subscription:")
        for item, symbols in msg.items():
            if item != 'T' and symbols:
                print(f"{item}: {', '.join(symbols)}")
        print("-------")

    else:
        print(f"Unknown message type: {msg_type}")

subscriptions.py

import json

def subscribe_to_trades(ws, symbols):
    sub_data = {
        "action": "subscribe",
        "trades": symbols
    }
    ws.send(json.dumps(sub_data))

# Function to subscribe to quotes
def subscribe_to_quotes(ws, symbols):
    sub_data = {
        "action": "subscribe",
        "quotes": symbols
    }
    ws.send(json.dumps(sub_data))

# Function to subscribe to bars
def subscribe_to_bars(ws, symbols):
    sub_data = {
        "action": "subscribe",
        "bars": symbols
    }
    ws.send(json.dumps(sub_data))


def unsubscribe_trade_updates(ws, symbols):
    sub_data = {
        "action": "unsubscribe",
        "trades": symbols
    }
    ws.send(json.dumps(sub_data))


def unsubscribe_quote_updates(ws, symbols):
    sub_data = {
        "action": "unsubscribe",
        "quotes": symbols
    }
    ws.send(json.dumps(sub_data))


def unsubscribe_bar_updates(ws, symbols):
    sub_data = {
        "action": "unsubscribe",
        "bars": symbols
    }
    ws.send(json.dumps(sub_data))

constants.py

exchange_codes = {
    "A": "NYSE American (AMEX)",
    "B": "NASDAQ OMX BX",
    "C": "National Stock Exchange",
    "D": "FINRA ADF",
    "E": "Market Independent",
    "H": "MIAX",
    "I": "International Securities Exchange",
    "J": "Cboe EDGA",
    "K": "Cboe EDGX",
    "L": "Long Term Stock Exchange",
    "M": "Chicago Stock Exchange",
    "N": "New York Stock Exchange",
    "P": "NYSE Arca",
    "Q": "NASDAQ OMX",
    "S": "NASDAQ Small Cap",
    "T": "NASDAQ Int",
    "U": "Members Exchange",
    "V": "IEX",
    "W": "CBOE",
    "X": "NASDAQ OMX PSX",
    "Y": "Cboe BYX",
    "Z": "Cboe BZX",
}

trade_conditions_cts = {
    " ": "Regular Sale",
    "B": "Average Price Trade",
    "C": "Cash Trade (Same Day Clearing)",
    "E": "Automatic Execution",
    "F": "Inter-market Sweep Order",
    "H": "Price Variation Trade",
    "I": "Odd Lot Trade",
    "K": "Rule 127 (NYSE only) or Rule 155 (NYSE MKT only)",
    "L": "Sold Last (Late Reporting)",
    "M": "Market Center Official Close",
    "N": "Next Day Trade (Next Day Clearing)",
    "O": "Market Center Opening Trade",
    "P": "Prior Reference Price",
    "Q": "Market Center Official Open",
    "R": "Seller",
    "T": "Extended Hours Trade",
    "U": "Extended Hours Sold (Out Of Sequence)",
    "V": "Contingent Trade",
    "X": "Cross Trade",
    "Z": "Sold (Out Of Sequence)",
    "4": "Derivatively Priced",
    "5": "Market Center Reopening Trade",
    "6": "Market Center Closing Trade",
    "7": "Qualified Contingent Trade",
    "8": "Reserved",
    "9": "Corrected Consolidated Close Price as per Listing Market",
}

# Quote condition dictionaries for CQS and UQDF plans
cqs_quote_conditions = {
    "A": "Slow Quote Offer Side",
    "B": "Slow Quote Bid Side",
    "E": "Slow Quote LRP Bid Side",
    "F": "Slow Quote LRP Offer Side",
    "H": "Slow Quote Bid And Offer Side",
    "O": "Opening Quote",
    "R": "Regular Market Maker Open",
    "W": "Slow Quote Set Slow List",
    "C": "Closing Quote",
    "L": "Market Maker Quotes Closed",
    "U": "Slow Quote LRP Bid And Offer",
    "N": "Non Firm Quote",
    "4": "On Demand Intra Day Auction",
}

uqdf_quote_conditions = {
    "A": "Manual Ask Automated Bid",
    "B": "Manual Bid Automated Ask",
    "F": "Fast Trading",
    "H": "Manual Bid And Ask",
    "I": "Order Imbalance",
    "L": "Closed Quote",
    "N": "Non Firm Quote",
    "O": "Opening Quote Automated",
    "R": "Regular Two Sided Open",
    "U": "Manual Bid And Ask Non Firm",
    "Y": "No Offer No Bid One Sided Open",
    "X": "Order Influx",
    "Z": "No Open No Resume",
    "4": "On Demand Intra Day Auction",
}

constants.py

exchange_codes = {
    "A": "NYSE American (AMEX)",
    "B": "NASDAQ OMX BX",
    "C": "National Stock Exchange",
    "D": "FINRA ADF",
    "E": "Market Independent",
    "H": "MIAX",
    "I": "International Securities Exchange",
    "J": "Cboe EDGA",
    "K": "Cboe EDGX",
    "L": "Long Term Stock Exchange",
    "M": "Chicago Stock Exchange",
    "N": "New York Stock Exchange",
    "P": "NYSE Arca",
    "Q": "NASDAQ OMX",
    "S": "NASDAQ Small Cap",
    "T": "NASDAQ Int",
    "U": "Members Exchange",
    "V": "IEX",
    "W": "CBOE",
    "X": "NASDAQ OMX PSX",
    "Y": "Cboe BYX",
    "Z": "Cboe BZX",
}

trade_conditions_cts = {
    "@": "",
    " ": "Regular Sale",
    "B": "Average Price Trade",
    "C": "Cash Trade (Same Day Clearing)",
    "E": "Automatic Execution",
    "F": "Inter-market Sweep Order",
    "H": "Price Variation Trade",
    "I": "Odd Lot Trade",
    "K": "Rule 127 (NYSE only) or Rule 155 (NYSE MKT only)",
    "L": "Sold Last (Late Reporting)",
    "M": "Market Center Official Close",
    "N": "Next Day Trade (Next Day Clearing)",
    "O": "Market Center Opening Trade",
    "P": "Prior Reference Price",
    "Q": "Market Center Official Open",
    "R": "Seller",
    "T": "Extended Hours Trade",
    "U": "Extended Hours Sold (Out Of Sequence)",
    "V": "Contingent Trade",
    "X": "Cross Trade",
    "Z": "Sold (Out Of Sequence)",
    "4": "Derivatively Priced",
    "5": "Market Center Reopening Trade",
    "6": "Market Center Closing Trade",
    "7": "Qualified Contingent Trade",
    "8": "Reserved",
    "9": "Corrected Consolidated Close Price as per Listing Market",
}

trade_conditions_utdf = {
    "@": "Regular Sale",
    "A": "Acquisition",
    "B": "Bunched Trade",
    "C": "Cash Sale",
    "D": "Distribution",
    "E": "Placeholder",
    "F": "Intermarket Sweep",
    "G": "Bunched Sold Trade",
    "H": "Price Variation Trade",
    "I": "Odd Lot Trade",
    "K": "Rule 155 Trade (AMEX)",
    "L": "Sold Last",
    "M": "Market Center Official Close",
    "N": "Next Day",
    "O": "Opening Prints",
    "P": "Prior Reference Price",
    "Q": "Market Center Official Open",
    "R": "Seller",
    "S": "Split Trade",
    "T": "Form T",
    "U": "Extended trading hours (Sold Out of Sequence)",
    "V": "Contingent Trade",
    "W": "Average Price Trade",
    "X": "Cross Trade",
    "Y": "Yellow Flag Regular Trade",
    "Z": "Sold (out of sequence)",
    "1": "Stopped Stock (Regular Trade)",
    "4": "Derivatively priced",
    "5": "Re-Opening Prints",
    "6": "Closing Prints",
    "7": "Qualified Contingent Trade (QCT)",
    "8": "Placeholder For 611 Exempt",
    "9": "Corrected Consolidated Close (per listing market)",
}

quote_conditions_cqs = {
    "A": "Slow Quote Offer Side",
    "B": "Slow Quote Bid Side",
    "E": "Slow Quote LRP Bid Side",
    "F": "Slow Quote LRP Offer Side",
    "H": "Slow Quote Bid And Offer Side",
    "O": "Opening Quote",
    "R": "Regular Market Maker Open",
    "W": "Slow Quote Set Slow List",
    "C": "Closing Quote",
    "L": "Market Maker Quotes Closed",
    "U": "Slow Quote LRP Bid And Offer",
    "N": "Non Firm Quote",
    "4": "On Demand Intra Day Auction",
}

# Quote condition dictionaries for CQS and UQDF plans
cqs_quote_conditions = {
    "A": "Slow Quote Offer Side",
    "B": "Slow Quote Bid Side",
    "E": "Slow Quote LRP Bid Side",
    "F": "Slow Quote LRP Offer Side",
    "H": "Slow Quote Bid And Offer Side",
    "O": "Opening Quote",
    "R": "Regular Market Maker Open",
    "W": "Slow Quote Set Slow List",
    "C": "Closing Quote",
    "L": "Market Maker Quotes Closed",
    "U": "Slow Quote LRP Bid And Offer",
    "N": "Non Firm Quote",
    "4": "On Demand Intra Day Auction",
}

uqdf_quote_conditions = {
    "A": "Manual Ask Automated Bid",
    "B": "Manual Bid Automated Ask",
    "F": "Fast Trading",
    "H": "Manual Bid And Ask",
    "I": "Order Imbalance",
    "L": "Closed Quote",
    "N": "Non Firm Quote",
    "O": "Opening Quote Automated",
    "R": "Regular Two Sided Open",
    "U": "Manual Bid And Ask Non Firm",
    "Y": "No Offer No Bid One Sided Open",
    "X": "Order Influx",
    "Z": "No Open No Resume",
    "4": "On Demand Intra Day Auction",
}

config.py

import os
import sys

parent_dir = os.path.abspath(os.path.join(os.getcwd(), '..'))
data_dir = os.path.join(parent_dir, 'data/')
sys.path.append(data_dir)

import keys

# LIVE_BASE_URL, APCA_API_KEY_ID, APCA_API_SECRET_KEY = keys.get_live_keys()
LIVE_BASE_URL, APCA_API_KEY_ID, APCA_API_SECRET_KEY = YOUR_URL, YOUR_API_KEY, YOUR_API_SECRET

Leave a Reply