Polymarket.com, a prediction market platform, operates on the Ethereum blockchain through the Polygon network, making it possible to analyze user transactions directly from the blockchain. By accessing a user’s wallet address, we can examine their trades in detail, track profit/loss, and monitor position changes over time. In this post, I’ll show how you can leverage the Polygon blockchain data to analyze trades on Polymarket using wallet IDs and some helpful Python code.
Below are examples of the kinds of charts the script generates and their significance.
Shares by Market
This chart focuses on the number of shares the user holds across different markets. It’s another way to visualize their exposure to various outcomes but focuses on the number of shares rather than their total purchase value.
Insight: Larger bars suggest higher exposure to specific markets. The average price paid per share is also annotated, providing further context.
Purpose: Useful for understanding the user’s position size in each market.
Total Purchase Value by Market
In this bar chart, the user’s total purchase value is broken down by market. The height of each bar indicates how much the user has invested in each specific market.
- Purpose: It allows for a clear visualization of where the user is concentrating their funds, showing which markets hold the largest portion of their portfolio.
- Insight: The accompanying labels provide information about the average price paid per share in each market, helping understand whether the user is buying low or high within a market.
Total Purchase Value Timeline
This scatter plot shows the timeline of the user’s trades by plotting the total purchase value of trades over time.
- Purpose: This chart reveals when the user made their largest investments, showing the fluctuations in purchase value across trades.
- Insight: Each dot represents a trade, with its position on the Y-axis showing the value and on the X-axis showing the timestamp of the transaction. You can use this chart to understand when the user made big moves in the market.
Holdings by Market and Outcome (Treemap)
The treemap provides a more detailed look at the user’s holdings, breaking down their positions by both market and outcome. Each rectangle represents the shares held in a particular market-outcome pair, with the size of the rectangle proportional to the user’s investment.
- Purpose: Ideal for visually assessing how much the user has allocated to each market and outcome combination.
- Insight: It highlights not just which markets the user has invested in but also how they’ve distributed their bets across different outcomes within those markets.
Holdings Distribution by Market (Pie Chart)
This pie chart visualizes the user’s current holdings, showing the percentage distribution of shares across different markets.
- Purpose: Offers a high-level overview of the user’s portfolio diversification across different markets.
- Insight: Larger slices indicate heavier investments in specific markets, allowing you to quickly see where the user has concentrated their bets.
Cumulative Shares Over Time
This line chart tracks the cumulative number of shares held by the user in various markets over time. It helps visualize when the user bought or sold shares and how their position has evolved in each market.
- Purpose: This chart is essential for understanding the user’s trading strategy over time, revealing periods of heavy buying or selling.
- Insight: Each line represents a different market and outcome combination. Peaks in the lines indicate increased positions, while dips show reductions or sales.
The Code
The code behind these charts fetches data from Polygon’s blockchain and processes the transactions associated with a given wallet ID. It retrieves ERC-1155 and ERC-20 token transaction data, enriches it with market information, and generates visual insights based on trading activity. You can use this code to analyze any Polymarket user’s trades simply by knowing their wallet address.
Here’s a breakdown of the main functions:
fetch_user_transactions(wallet_address, api_key)
: Fetches all ERC-1155 and ERC-20 transactions for a given wallet from Polygon.add_financial_columns()
: Processes transactions to calculate key financial metrics like profit/loss, total purchase value, and shares held.plot_profit_loss_by_trade()
: Generates a bar plot showing profit or loss for each trade.plot_shares_over_time()
: Creates a line plot showing cumulative shares over time.create_and_save_pie_chart()
: Generates a pie chart that breaks down holdings by market.create_and_save_treemap()
: Produces a treemap for holdings based on market and outcome.plot_total_purchase_value()
: Generates a scatter plot showing total purchase value over time.
This tool offers deep insights into any user’s trading behavior and performance on Polymarket.
import os import requests import logging import pandas as pd import subprocess import json import time from dotenv import load_dotenv import plotly.express as px import re from bs4 import BeautifulSoup from importlib import reload import numpy as np import argparse import os import subprocess import json import logging import pandas as pd from dotenv import load_dotenv logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Load environment variables load_dotenv("keys.env") price_cache = {} # EXCHANGES CTF_EXCHANGE = '0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174' NEG_RISK_CTF_EXCHANGE = '0x4d97dcd97ec945f40cf65f87097ace5ea0476045' # SPENDERS FOR EXCHANGES NEG_RISK_CTF_EXCHANGE_SPENDER = '0xC5d563A36AE78145C45a50134d48A1215220f80a' NEG_RISK_ADAPTER = '0xd91E80cF2E7be2e162c6513ceD06f1dD0dA35296' CTF_EXCHANGE_SPENDER = '0x4bFb41d5B3570DeFd03C39a9A4D8dE6Bd8B8982E' CACHE_EXPIRATION_TIME = 60 * 30 # Cache expiration time in seconds (5 minutes) PRICE_CACHE_FILE = './data/live_price_cache.json' # Dictionary to cache live prices live_price_cache = {} def load_price_cache(): """Load the live price cache from a JSON file.""" if os.path.exists(PRICE_CACHE_FILE): try: with open(PRICE_CACHE_FILE, 'r') as file: return json.load(file) except json.JSONDecodeError as e: logger.error(f"Error loading price cache: {e}") return {} return {} def save_price_cache(cache): """Save the live price cache to a JSON file.""" with open(PRICE_CACHE_FILE, 'w') as file: json.dump(cache, file) def is_cache_valid(cache_entry, expiration_time=CACHE_EXPIRATION_TIME): """ Check if the cache entry is still valid based on the current time and expiration time. """ if not cache_entry: return False cached_time = cache_entry.get('timestamp', 0) return (time.time() - cached_time) < expiration_time def call_get_live_price(token_id, expiration_time=CACHE_EXPIRATION_TIME): """ Get live price from cache or update it if expired. """ logger.info(f'Getting live price for token {token_id}') # Load existing cache price_cache = load_price_cache() cache_key = f"{token_id}" # Check if cache is valid if cache_key in price_cache and is_cache_valid(price_cache[cache_key], expiration_time): logger.info(f'Returning cached price for {cache_key}') return price_cache[cache_key]['price'] # If cache is expired or doesn't exist, fetch live price try: result = subprocess.run( ['python3', 'get_live_price.py', token_id], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True ) # Parse the live price from the subprocess output output_lines = result.stdout.strip().split("\n") live_price_line = next((line for line in output_lines if "Live price for token" in line), None) if live_price_line: live_price = float(live_price_line.strip().split(":")[-1].strip()) else: logger.error("Live price not found in subprocess output.") return None logger.debug(f"Subprocess get_live_price output: {result.stdout}") # Update cache with the new price and timestamp price_cache[cache_key] = {'price': live_price, 'timestamp': time.time()} save_price_cache(price_cache) return live_price except subprocess.CalledProcessError as e: logger.error(f"Subprocess get_live_price error: {e.stderr}") return None except Exception as e: logger.error(f"Error fetching live price: {str(e)}") return None def update_live_price_and_pl(merged_df, contract_token_id, market_slug=None, outcome=None): """ Calculate the live price and profit/loss (pl) for each trade in the DataFrame. """ # Ensure tokenID in merged_df is string merged_df['tokenID'] = merged_df['tokenID'].astype(str) contract_token_id = str(contract_token_id) # Check for NaN or empty token IDs if not contract_token_id or contract_token_id == 'nan': logger.warning("Encountered NaN or empty contract_token_id. Skipping.") return merged_df # Add live_price and pl columns if they don't exist if 'live_price' not in merged_df.columns: merged_df['live_price'] = np.nan if 'pl' not in merged_df.columns: merged_df['pl'] = np.nan # Filter rows with the same contract_token_id and outcome merged_df['outcome'] = merged_df['outcome'].astype(str) matching_rows = merged_df[(merged_df['tokenID'] == contract_token_id) & (merged_df['outcome'].str.lower() == outcome.lower())] if not matching_rows.empty: logger.info(f'Fetching live price for token {contract_token_id}') live_price = call_get_live_price(contract_token_id) logger.info(f'Live price for token {contract_token_id}: {live_price}') if live_price is not None: try: # Calculate profit/loss based on the live price price_paid_per_token = matching_rows['price_paid_per_token'] total_purchase_value = matching_rows['total_purchase_value'] pl = ((live_price - price_paid_per_token) / price_paid_per_token) * total_purchase_value # Update the DataFrame with live price and pl merged_df.loc[matching_rows.index, 'live_price'] = live_price merged_df.loc[matching_rows.index, 'pl'] = pl except Exception as e: logger.error(f"Error calculating live price and profit/loss: {e}") else: logger.warning(f"Live price not found for tokenID {contract_token_id}") merged_df.loc[matching_rows.index, 'pl'] = np.nan return merged_df def find_token_id(market_slug, outcome, market_lookup): """Find the token_id based on market_slug and outcome.""" for market in market_lookup.values(): if market['market_slug'] == market_slug: for token in market['tokens']: if token['outcome'].lower() == outcome.lower(): return token['token_id'] return None def fetch_data(url): """Fetch data from a given URL and return the JSON response.""" try: response = requests.get(url, timeout=10) # You can specify a timeout response.raise_for_status() # Raise an error for bad responses (4xx, 5xx) return response.json() except requests.exceptions.RequestException as e: logger.error(f"Error fetching data from URL: {url}. Exception: {e}") return None def fetch_all_pages(api_key, token_ids, market_slug_outcome_map, csv_output_dir='./data/polymarket_trades/'): page = 1 offset = 100 retry_attempts = 0 all_data = [] # Store all data here while True: url = f"https://api.polygonscan.com/api?module=account&action=token1155tx&contractaddress={NEG_RISK_CTF_EXCHANGE}&page={page}&offset={offset}&startblock=0&endblock=99999999&sort=desc&apikey={api_key}" logger.info(f"Fetching transaction data for tokens {token_ids}, page: {page}") data = fetch_data(url) if data and data['status'] == '1': df = pd.DataFrame(data['result']) if df.empty: logger.info("No more transactions found, ending pagination.") break # Exit if there are no more transactions all_data.append(df) page += 1 # Go to the next page else: logger.error(f"API response error or no data found for page {page}") if retry_attempts < 5: retry_attempts += 1 time.sleep(retry_attempts) else: break if all_data: final_df = pd.concat(all_data, ignore_index=True) # Combine all pages logger.info(f"Fetched {len(final_df)} transactions across all pages.") return final_df return None def validate_market_lookup(token_ids, market_lookup): valid_token_ids = [] invalid_token_ids = [] for token_id in token_ids: market_slug, outcome = find_market_info(token_id, market_lookup) if market_slug and outcome: valid_token_ids.append(token_id) else: invalid_token_ids.append(token_id) logger.info(f"Valid token IDs: {valid_token_ids}") if invalid_token_ids: logger.warning(f"Invalid or missing market info for token IDs: {invalid_token_ids}") return valid_token_ids def sanitize_filename(filename): """ Sanitize the filename by removing or replacing invalid characters. """ # Replace invalid characters with an underscore return re.sub(r'[\\/*?:"<>|]', '_', filename) def sanitize_directory(directory): """ Sanitize the directory name by removing or replacing invalid characters. """ # Replace invalid characters with an underscore return re.sub(r'[\\/*?:"<>|]', '_', directory) def extract_wallet_ids(leaderboard_url): """Scrape the Polymarket leaderboard to extract wallet IDs.""" logging.info(f"Fetching leaderboard page: {leaderboard_url}") response = requests.get(leaderboard_url) if response.status_code != 200: logging.error(f"Failed to load page {leaderboard_url}, status code: {response.status_code}") return [] logging.debug(f"Page loaded successfully, status code: {response.status_code}") soup = BeautifulSoup(response.content, 'html.parser') logging.debug("Page content parsed with BeautifulSoup") wallet_ids = [] # Debug: Check if <a> tags are being found correctly a_tags = soup.find_all('a', href=True) logging.debug(f"Found {len(a_tags)} <a> tags in the page.") for a_tag in a_tags: href = a_tag['href'] logging.debug(f"Processing href: {href}") if href.startswith('/profile/'): wallet_id = href.split('/')[-1] wallet_ids.append(wallet_id) logging.info(f"Extracted wallet ID: {wallet_id}") else: logging.debug(f"Skipped href: {href}") return wallet_ids def load_market_lookup(json_path): """Load market lookup data from a JSON file.""" with open(json_path, 'r') as json_file: return json.load(json_file) def find_market_info(token_id, market_lookup): """Find market_slug and outcome based on tokenID.""" token_id = str(token_id) # Ensure token_id is a string if not token_id or token_id == 'nan': logger.warning("Token ID is NaN or empty. Skipping lookup.") return None, None logger.debug(f"Looking up market info for tokenID: {token_id}") for market in market_lookup.values(): for token in market['tokens']: if str(token['token_id']) == token_id: logger.debug( f"Found market info for tokenID {token_id}: market_slug = {market['market_slug']}, outcome = {token['outcome']}") return market['market_slug'], token['outcome'] logger.warning(f"No market info found for tokenID: {token_id}") return None, None def fetch_data(url): """Fetch data from a given URL and return the JSON response.""" response = requests.get(url) return response.json() def save_to_csv(filename, data, headers, output_dir): """Save data to a CSV file in the specified output directory.""" filepath = os.path.join(output_dir, filename) with open(filepath, 'w', newline='') as file: writer = csv.DictWriter(file, fieldnames=headers) writer.writeheader() for entry in data: writer.writerow(entry) logger.info(f"Saved data to {filepath}") def add_timestamps(erc1155_df, erc20_df): """ Rename timestamp columns and convert them from UNIX to datetime. """ # Rename the timestamp columns to avoid conflicts during merge erc1155_df.rename(columns={'timeStamp': 'timeStamp_erc1155'}, inplace=True) erc20_df.rename(columns={'timeStamp': 'timeStamp_erc20'}, inplace=True) # Convert UNIX timestamps to datetime format erc1155_df['timeStamp_erc1155'] = pd.to_numeric(erc1155_df['timeStamp_erc1155'], errors='coerce') erc20_df['timeStamp_erc20'] = pd.to_numeric(erc20_df['timeStamp_erc20'], errors='coerce') erc1155_df['timeStamp_erc1155'] = pd.to_datetime(erc1155_df['timeStamp_erc1155'], unit='s', errors='coerce') erc20_df['timeStamp_erc20'] = pd.to_datetime(erc20_df['timeStamp_erc20'], unit='s', errors='coerce') return erc1155_df, erc20_df def enrich_erc1155_data(erc1155_df, market_lookup): """ Enrich the ERC-1155 DataFrame with market_slug and outcome based on market lookup. """ def get_market_info(token_id): if pd.isna(token_id) or str(token_id) == 'nan': return 'Unknown', 'Unknown' for market in market_lookup.values(): for token in market['tokens']: if str(token['token_id']) == str(token_id): return market['market_slug'], token['outcome'] return 'Unknown', 'Unknown' erc1155_df['market_slug'], erc1155_df['outcome'] = zip( *erc1155_df['tokenID'].apply(lambda x: get_market_info(x)) ) return erc1155_df def get_transaction_details_by_hash(transaction_hash, api_key, output_dir='./data/polymarket_trades/'): """ Fetch the transaction details by hash from Polygonscan, parse the logs, and save the flattened data as a CSV. Args: - transaction_hash (str): The hash of the transaction. - api_key (str): The Polygonscan API key. - output_dir (str): The directory to save the CSV file. Returns: - None: Saves the transaction details to a CSV. """ # Ensure output directory exists os.makedirs(output_dir, exist_ok=True) # Construct the API URL for fetching transaction receipt details by hash url = f"https://api.polygonscan.com/api?module=proxy&action=eth_getTransactionReceipt&txhash={transaction_hash}&apikey={api_key}" logger.info(f"Fetching transaction details for hash: {transaction_hash}") logger.debug(f"Request URL: {url}") try: # Fetch transaction details response = requests.get(url) logger.debug(f"Polygonscan API response status: {response.status_code}") if response.status_code != 200: logger.error(f"Non-200 status code received: {response.status_code}") return None # Parse the JSON response data = response.json() logger.debug(f"Response JSON: {data}") # Check if the status is successful if data.get('result') is None: logger.error(f"Error in API response: {data.get('message', 'Unknown error')}") return None # Extract the logs logs = data['result']['logs'] logs_df = pd.json_normalize(logs) # Save the logs to a CSV file for easier review csv_filename = os.path.join(output_dir, f"transaction_logs_{transaction_hash}.csv") logs_df.to_csv(csv_filename, index=False) logger.info(f"Parsed logs saved to {csv_filename}") return logs_df except Exception as e: logger.error(f"Exception occurred while fetching transaction details for hash {transaction_hash}: {e}") return None def add_financial_columns(erc1155_df, erc20_df, wallet_id, market_lookup): """ Merge the ERC-1155 and ERC-20 dataframes, calculate financial columns, including whether a trade was won or lost, and fetch the latest price for each contract and tokenID. """ # Merge the two dataframes on the 'hash' column merged_df = pd.merge(erc1155_df, erc20_df, how='outer', on='hash', suffixes=('_erc1155', '_erc20')) # Convert wallet ID and columns to lowercase for case-insensitive comparison wallet_id = wallet_id.lower() merged_df['to_erc1155'] = merged_df['to_erc1155'].astype(str).str.lower() merged_df['from_erc1155'] = merged_df['from_erc1155'].astype(str).str.lower() # Remove rows where 'tokenID' is NaN or 'nan' merged_df['tokenID'] = merged_df['tokenID'].astype(str) merged_df = merged_df[~merged_df['tokenID'].isnull() & (merged_df['tokenID'] != 'nan')] # Set transaction type based on wallet address merged_df['transaction_type'] = 'other' merged_df.loc[merged_df['to_erc1155'] == wallet_id, 'transaction_type'] = 'buy' merged_df.loc[merged_df['from_erc1155'] == wallet_id, 'transaction_type'] = 'sell' # Calculate the purchase price per token and total dollar value if 'value' in merged_df.columns and 'tokenValue' in merged_df.columns: merged_df['price_paid_per_token'] = merged_df['value'].astype(float) / merged_df['tokenValue'].astype(float) merged_df['total_purchase_value'] = merged_df['value'].astype(float) / 10**6 # USDC has 6 decimal places merged_df['shares'] = merged_df['total_purchase_value'] / merged_df['price_paid_per_token'] else: logger.error("The necessary columns for calculating purchase price are missing.") return merged_df # Create the 'lost' and 'won' columns merged_df['lost'] = ( (merged_df['to_erc1155'] == '0x0000000000000000000000000000000000000000') & (merged_df['transaction_type'] == 'sell') & (merged_df['price_paid_per_token'].isna() | (merged_df['price_paid_per_token'] == 0)) ).astype(int) merged_df['won'] = ( (merged_df['transaction_type'] == 'sell') & (merged_df['price_paid_per_token'] == 1) ).astype(int) merged_df.loc[merged_df['lost'] == 1, 'shares'] = 0 merged_df.loc[merged_df['lost'] == 1, 'total_purchase_value'] = 0 # Fetch live prices and calculate profit/loss (pl) merged_df['tokenID'] = merged_df['tokenID'].astype(str) merged_df = update_latest_prices(merged_df, market_lookup) return merged_df def plot_profit_loss_by_trade(df, user_info): """ Create a bar plot to visualize aggregated Profit/Loss (PL) by trade, with values rounded to two decimal places and formatted as currency. Args: df (DataFrame): DataFrame containing trade data, including 'market_slug', 'outcome', and 'pl'. user_info (dict): Dictionary containing user information, such as username, wallet address, and other relevant details. """ if 'pl' not in df.columns or df['pl'].isnull().all(): logger.warning("No PL data available for plotting. Skipping plot.") return username = user_info.get("username", "Unknown User") wallet_id = user_info.get("wallet_address", "N/A") positions_value = user_info.get("positions_value", "N/A") profit_loss = user_info.get("profit_loss", "N/A") volume_traded = user_info.get("volume_traded", "N/A") markets_traded = user_info.get("markets_traded", "N/A") # Combine market_slug and outcome to create a trade identifier df['trade'] = df['market_slug'] + ' (' + df['outcome'] + ')' # Aggregate the Profit/Loss (pl) for each unique trade aggregated_df = df.groupby('trade', as_index=False).agg({'pl': 'sum'}) # Round PL values to two decimal places aggregated_df['pl'] = aggregated_df['pl'].round(2) # Format the PL values with a dollar sign for display aggregated_df['pl_display'] = aggregated_df['pl'].apply(lambda x: f"${x:,.2f}") # Define a color mapping based on Profit/Loss sign aggregated_df['color'] = aggregated_df['pl'].apply(lambda x: 'green' if x >= 0 else 'red') # Create the plot without using the color axis fig = px.bar( aggregated_df, x='trade', y='pl', title='', labels={'pl': 'Profit/Loss ($)', 'trade': 'Trade (Market Slug / Outcome)'}, text='pl_display', color='color', # Use the color column color_discrete_map={'green': 'green', 'red': 'red'}, ) # Remove the legend if you don't want it fig.update_layout(showlegend=False) # Rotate x-axis labels for better readability and set the main title fig.update_layout( title={ 'text': 'Aggregated Profit/Loss by Trade', 'y': 0.95, 'x': 0.5, 'xanchor': 'center', 'yanchor': 'top', 'font': {'size': 24} }, xaxis_tickangle=-45, margin=dict(t=150, l=50, r=50, b=100) ) # Prepare the subtitle text with user information subtitle_text = ( f"Username: {username} | Positions Value: {positions_value} | " f"Profit/Loss: {profit_loss} | Volume Traded: {volume_traded} | " f"Markets Traded: {markets_traded} | Wallet ID: {wallet_id}" ) # Add the subtitle as an annotation fig.add_annotation( text=subtitle_text, xref="paper", yref="paper", x=0.5, y=1.02, xanchor='center', yanchor='top', showarrow=False, font=dict(size=14) ) # Save the plot plot_dir = "./plots/user_trades" os.makedirs(plot_dir, exist_ok=True) sanitized_username = sanitize_filename(username) plot_file = os.path.join(plot_dir, f"{sanitized_username}_aggregated_profit_loss_by_trade.html") fig.write_html(plot_file) logger.info(f"Aggregated Profit/Loss by trade plot saved to {plot_file}") def plot_shares_over_time(df, user_info): """ Create a line plot to visualize the cumulative number of shares for each token over time. Buy orders add to the position, and sell orders subtract from it. Args: df (DataFrame): DataFrame containing trade data, including 'timeStamp_erc1155', 'shares', 'market_slug', 'outcome', and 'transaction_type' ('buy' or 'sell'). user_info (dict): Dictionary containing user information, such as username, wallet address, and other relevant details. """ if 'shares' not in df.columns or df['shares'].isnull().all(): logger.warning("No 'shares' data available for plotting. Skipping plot.") return username = user_info.get("username", "Unknown User") # Ensure 'timeStamp_erc1155' is a datetime type, just in case it needs to be converted if df['timeStamp_erc1155'].dtype != 'datetime64[ns]': df['timeStamp_erc1155'] = pd.to_datetime(df['timeStamp_erc1155'], errors='coerce') # Drop rows with NaN values in 'timeStamp_erc1155', 'shares', 'market_slug', 'outcome', or 'transaction_type' df = df.dropna(subset=['timeStamp_erc1155', 'shares', 'market_slug', 'outcome', 'transaction_type']) # Sort the dataframe by time to ensure the line chart shows the data in chronological order df = df.sort_values(by='timeStamp_erc1155') # Combine 'market_slug' and 'outcome' to create a unique label for each token df['token_label'] = df['market_slug'] + " - " + df['outcome'] # Create a column for 'position_change' which adds shares for buys and subtracts shares for sells based on 'transaction_type' df['position_change'] = df.apply(lambda row: row['shares'] if row['transaction_type'] == 'buy' else -row['shares'], axis=1) # Group by 'token_label' and calculate the cumulative position df['cumulative_position'] = df.groupby('token_label')['position_change'].cumsum() # Forward fill the cumulative position to maintain it between trades df['cumulative_position'] = df.groupby('token_label')['cumulative_position'].ffill() # Create the line plot, grouping by 'token_label' for separate lines per token ID fig = px.line( df, x='timeStamp_erc1155', y='cumulative_position', color='token_label', # This ensures each token ID (market_slug + outcome) gets its own line title=f'Cumulative Shares Over Time for {username}', labels={'timeStamp_erc1155': 'Trade Time', 'cumulative_position': 'Cumulative Position', 'token_label': 'Token (Market Slug - Outcome)'}, line_shape='linear' ) # Update layout for better aesthetics fig.update_layout( title={ 'text': f"Cumulative Number of Shares Over Time for {username}", 'y': 0.95, 'x': 0.5, 'xanchor': 'center', 'yanchor': 'top', 'font': {'size': 20} }, margin=dict(t=60), xaxis_title="Trade Time", yaxis_title="Cumulative Number of Shares", legend_title="Token (Market Slug - Outcome)" ) # Save the plot plot_dir = "./plots/user_trades" os.makedirs(plot_dir, exist_ok=True) sanitized_username = sanitize_filename(username) plot_file = os.path.join(plot_dir, f"{sanitized_username}_shares_over_time.html") fig.write_html(plot_file) logger.info(f"Cumulative shares over time plot saved to {plot_file}") def plot_user_trades(df, user_info): """Plot user trades and save plots, adjusting for trades that were lost.""" username = user_info["username"] wallet_id = user_info["wallet_address"] # Sanitize only the filename, not the directory sanitized_username = sanitize_filename(username) info_text = ( f"Username: {username} | Positions Value: {user_info['positions_value']} | " f"Profit/Loss: {user_info['profit_loss']} | Volume Traded: {user_info['volume_traded']} | " f"Markets Traded: {user_info['markets_traded']} | Wallet ID: {wallet_id}" ) # Ensure the directory exists os.makedirs("./plots/user_trades", exist_ok=True) plot_dir = "./plots/user_trades" # Flag loss trades where to_erc1155 is zero address, transaction_type is sell, and price_paid_per_token is NaN df['is_loss'] = df.apply( lambda row: (row['to_erc1155'] == '0x0000000000000000000000000000000000000000') and (row['transaction_type'] == 'sell') and pd.isna(row['price_paid_per_token']), axis=1) # Set shares and total purchase value to zero for loss trades df.loc[df['is_loss'], 'shares'] = 0 df.loc[df['is_loss'], 'total_purchase_value'] = 0 ### Modify for Total Purchase Value by Market (Current holdings) df['total_purchase_value_adjusted'] = df.apply( lambda row: row['total_purchase_value'] if row['transaction_type'] == 'buy' else -row['total_purchase_value'], axis=1 ) grouped_df_value = df.groupby(['market_slug']).agg({ 'total_purchase_value_adjusted': 'sum', 'shares': 'sum', }).reset_index() # Calculate the weighted average price_paid_per_token grouped_df_value['weighted_price_paid_per_token'] = ( grouped_df_value['total_purchase_value_adjusted'] / grouped_df_value['shares'] ) # Sort by total_purchase_value in descending order (ignoring outcome) grouped_df_value = grouped_df_value.sort_values(by='total_purchase_value_adjusted', ascending=False) # Format the label for the bars (removing outcome) grouped_df_value['bar_label'] = ( "Avg Price: $" + grouped_df_value['weighted_price_paid_per_token'].round(2).astype(str) ) fig = px.bar( grouped_df_value, x='market_slug', y='total_purchase_value_adjusted', barmode='group', title=f"Current Total Purchase Value by Market for {username}", labels={'total_purchase_value_adjusted': 'Current Total Purchase Value', 'market_slug': 'Market'}, text=grouped_df_value['bar_label'], hover_data={'weighted_price_paid_per_token': ':.2f'}, ) fig.update_layout( title={ 'text': f"Current Total Purchase Value by Market for {username}", 'y': 0.95, 'x': 0.5, 'xanchor': 'center', 'yanchor': 'top', 'font': {'size': 20} }, margin=dict(t=60), showlegend=False # Remove the legend as you requested ) fig.add_annotation( text=info_text, xref="paper", yref="paper", showarrow=False, x=0.5, y=1.05, font=dict(size=12) ) # Save the bar plot as an HTML file plot_file = os.path.join(plot_dir, f"{sanitized_username}_current_market_purchase_value.html") fig.write_html(plot_file) logger.info(f"Current market purchase value plot saved to {plot_file}") ### Modify for Trade Quantity by Market (Current holdings) df['shares_adjusted'] = df.apply( lambda row: row['shares'] if row['transaction_type'] == 'buy' else -row['shares'], axis=1) grouped_df_quantity = df.groupby(['market_slug']).agg({ 'shares_adjusted': 'sum', 'total_purchase_value': 'sum', }).reset_index() # Calculate the weighted average price_paid_per_token grouped_df_quantity['weighted_price_paid_per_token'] = ( grouped_df_quantity['total_purchase_value'] / grouped_df_quantity['shares_adjusted'] ) grouped_df_quantity = grouped_df_quantity.sort_values(by='shares_adjusted', ascending=False) grouped_df_quantity['bar_label'] = ( "Quantity: " + grouped_df_quantity['shares_adjusted'].round().astype(int).astype(str) + "<br>" + "Avg Price: $" + grouped_df_quantity['weighted_price_paid_per_token'].round(2).astype(str) ) fig = px.bar( grouped_df_quantity, x='market_slug', y='shares_adjusted', barmode='group', title=f"Current Trade Quantity by Market for {username}", labels={'shares_adjusted': 'Current Trade Quantity', 'market_slug': 'Market'}, text=grouped_df_quantity['bar_label'], ) fig.update_layout( title={ 'text': f"Current Trade Quantity by Market for {username}", 'y': 0.95, 'x': 0.5, 'xanchor': 'center', 'yanchor': 'top', 'font': {'size': 20} }, margin=dict(t=60), showlegend=False # Remove the legend as you requested ) fig.add_annotation( text=info_text, xref="paper", yref="paper", showarrow=False, x=0.5, y=1.05, font=dict(size=12) ) # Save the trade quantity plot as an HTML file plot_file = os.path.join(plot_dir, f"{sanitized_username}_current_market_trade_quantity.html") fig.write_html(plot_file) logger.info(f"Current market trade quantity plot saved to {plot_file}") ### Modify for Total Purchase Value Timeline df['total_purchase_value_timeline_adjusted'] = df.apply( lambda row: row['total_purchase_value'] if row['transaction_type'] == 'buy' else -row['total_purchase_value'], axis=1 ) # Combine 'market_slug' and 'outcome' into a unique label df['market_outcome_label'] = df['market_slug'] + ' (' + df['outcome'] + ')' # Create the scatter plot, now coloring by 'market_outcome_label' fig = px.scatter( df, x='timeStamp_erc1155', y='total_purchase_value_timeline_adjusted', color='market_outcome_label', # Use the combined label for market and outcome title=f"Total Purchase Value Timeline for {username}", labels={ 'total_purchase_value_timeline_adjusted': 'Total Purchase Value', 'timeStamp_erc1155': 'Transaction Time', 'market_outcome_label': 'Market/Outcome' }, hover_data=['market_slug', 'price_paid_per_token', 'outcome', 'hash'], ) fig.update_layout( title={ 'text': f"Total Purchase Value Timeline for {username}", 'y': 0.95, 'x': 0.5, 'xanchor': 'center', 'yanchor': 'top', 'font': {'size': 20} }, margin=dict(t=60) ) fig.add_annotation( text=info_text, xref="paper", yref="paper", showarrow=False, x=0.5, y=1.05, font=dict(size=12) ) # Save the updated plot plot_file = os.path.join(plot_dir, f"{sanitized_username}_total_purchase_value_timeline_adjusted.html") fig.write_html(plot_file) logger.info(f"Total purchase value timeline plot saved to {plot_file}") def plot_total_purchase_value(df, user_info): """Create and save a scatter plot for total purchase value, accounting for buy and sell transactions.""" # Ensure the directory exists os.makedirs("./plots/user_trades", exist_ok=True) plot_dir = "./plots/user_trades" username = user_info["username"] wallet_id = user_info["wallet_address"] # Sanitize only the filename, not the directory sanitized_username = sanitize_filename(username) info_text = ( f"Username: {username} | Positions Value: {user_info['positions_value']} | " f"Profit/Loss: {user_info['profit_loss']} | Volume Traded: {user_info['volume_traded']} | " f"Markets Traded: {user_info['markets_traded']} | Wallet ID: {wallet_id}" ) # Flag loss trades where to_erc1155 is zero address, transaction_type is sell, and price_paid_per_token is NaN df['is_loss'] = df.apply( lambda row: (row['to_erc1155'] == '0x0000000000000000000000000000000000000000') and (row['transaction_type'] == 'sell') and pd.isna(row['price_paid_per_token']), axis=1) # Set shares and total purchase value to zero for loss trades df.loc[df['is_loss'], 'shares'] = 0 df.loc[df['is_loss'], 'total_purchase_value'] = 0 # Adjust the total purchase value based on the transaction type df['total_purchase_value_adjusted'] = df.apply( lambda row: row['total_purchase_value'] if row['transaction_type'] == 'buy' else -row['total_purchase_value'], axis=1 ) # Create the scatter plot for total purchase value over time fig = px.scatter( df, x='timeStamp_erc1155', # Assuming this is the correct timestamp field y='total_purchase_value_adjusted', # Adjusted values for buys and sells color='market_slug', # Use market_slug with outcome as the color title=f"Current Purchase Value Timeline for {username}", # Update title to reflect "current" labels={'total_purchase_value_adjusted': 'Adjusted Purchase Value ($)', 'timeStamp_erc1155': 'Transaction Time'}, hover_data=['market_slug', 'price_paid_per_token', 'outcome', 'hash'], ) # Adjust title positioning and font size fig.update_layout( title={ 'text': f"Current Purchase Value Timeline for {username}", # Update to "Current" 'y': 0.95, 'x': 0.5, 'xanchor': 'center', 'yanchor': 'top', 'font': {'size': 20} }, margin=dict(t=60) ) fig.add_annotation( text=info_text, xref="paper", yref="paper", showarrow=False, x=0.5, y=1.05, font=dict(size=12) ) # Save the scatter plot as an HTML file plot_file = os.path.join(plot_dir, f"{sanitized_username}_current_purchase_value_timeline.html") fig.write_html(plot_file) logger.info(f"Current purchase value timeline plot saved to {plot_file}") def create_and_save_pie_chart(df, user_info): """Create and save a pie chart for user's current holdings.""" # Ensure the directory exists os.makedirs("./plots/user_trades", exist_ok=True) plot_dir = "./plots/user_trades" username = user_info["username"] wallet_id = user_info["wallet_address"] sanitized_username = sanitize_filename(username) info_text = ( f"Username: {username} | Positions Value: {user_info['positions_value']} | " f"Profit/Loss: {user_info['profit_loss']} | Volume Traded: {user_info['volume_traded']} | " f"Markets Traded: {user_info['markets_traded']} | Wallet ID: {wallet_id}" ) # Flag loss trades where to_erc1155 is zero address, transaction_type is sell, and price_paid_per_token is NaN df['is_loss'] = df.apply( lambda row: (row['to_erc1155'] == '0x0000000000000000000000000000000000000000') and (row['transaction_type'] == 'sell') and pd.isna(row['price_paid_per_token']), axis=1) # Set shares and total purchase value to zero for loss trades df.loc[df['is_loss'], 'shares'] = 0 df['shares_adjusted'] = df.apply( lambda row: row['shares'] if row['transaction_type'] == 'buy' else -row['shares'], axis=1) holdings = df.groupby('market_slug').agg({'shares_adjusted': 'sum'}).reset_index() holdings = holdings.sort_values('shares_adjusted', ascending=False) threshold = 0.02 large_slices = holdings[holdings['shares_adjusted'] > holdings['shares_adjusted'].sum() * threshold] small_slices = holdings[holdings['shares_adjusted'] <= holdings['shares_adjusted'].sum() * threshold] if not small_slices.empty: other_sum = small_slices['shares_adjusted'].sum() others_df = pd.DataFrame([{'market_slug': 'Others', 'shares_adjusted': other_sum}]) large_slices = pd.concat([large_slices, others_df], ignore_index=True) fig = px.pie( large_slices, names='market_slug', values='shares_adjusted', title=f"Current Holdings Distribution by Market for {username}", ) fig.update_layout( title={ 'text': f"Current Holdings Distribution by Market for {username}", 'y': 0.95, 'x': 0.5, 'xanchor': 'center', 'yanchor': 'top', 'font': {'size': 20} }, margin=dict(t=60) ) fig.add_annotation( text=info_text, xref="paper", yref="paper", showarrow=False, x=0.5, y=1.05, font=dict(size=12) ) # Save the pie chart as an HTML file plot_file = os.path.join(plot_dir, f"{sanitized_username}_current_holdings_pie_chart.html") fig.write_html(plot_file) logger.info(f"Current holdings pie chart saved to {plot_file}") def create_and_save_treemap(df, user_info): """Create and save a treemap for user's current holdings.""" plot_dir = './plots/user_trades' username = user_info["username"] wallet_id = user_info["wallet_address"] sanitized_username = sanitize_filename(username) info_text = ( f"Username: {username} | Positions Value: {user_info['positions_value']} | " f"Profit/Loss: {user_info['profit_loss']} | Volume Traded: {user_info['volume_traded']} | " f"Markets Traded: {user_info['markets_traded']} | Wallet ID: {wallet_id}" ) # Flag loss trades where to_erc1155 is zero address, transaction_type is sell, and price_paid_per_token is NaN df['is_loss'] = df.apply( lambda row: (row['to_erc1155'] == '0x0000000000000000000000000000000000000000') and (row['transaction_type'] == 'sell') and pd.isna(row['price_paid_per_token']), axis=1) # Set shares and total purchase value to zero for loss trades df.loc[df['is_loss'], 'shares'] = 0 # Adjust shares based on transaction type (buy vs sell) df['shares_adjusted'] = df.apply( lambda row: row['shares'] if row['transaction_type'] == 'buy' else -row['shares'], axis=1) # Group by market_slug and outcome for treemap holdings = df.groupby(['market_slug', 'outcome']).agg({'shares_adjusted': 'sum'}).reset_index() # Create the treemap fig = px.treemap( holdings, path=['market_slug', 'outcome'], values='shares_adjusted', title=f"Current Holdings Distribution by Market and Outcome for {username}", ) # Adjust title positioning and font size fig.update_layout( title={ 'text': f"Current Holdings Distribution by Market and Outcome for {username}", 'y': 0.95, 'x': 0.5, 'xanchor': 'center', 'yanchor': 'top', 'font': {'size': 20} }, margin=dict(t=60) ) fig.add_annotation( text=info_text, xref="paper", yref="paper", showarrow=False, x=0.5, y=1.05, font=dict(size=12) ) # Save the treemap as an HTML file plot_file = os.path.join(plot_dir, f"{sanitized_username}_current_holdings_treemap.html") fig.write_html(plot_file) logger.info(f"Current holdings treemap saved to {plot_file}") def update_latest_prices(merged_df, market_lookup): """ Fetch and update the latest prices for each contract and tokenID pair in the merged_df, and calculate profit/loss (pl) based on the live price. """ # Ensure 'pl' column exists in the DataFrame if 'pl' not in merged_df.columns: merged_df['pl'] = np.nan # Import numpy as np at the top of your script # Ensure tokenID is a string and filter out NaN tokenIDs merged_df['tokenID'] = merged_df['tokenID'].astype(str) merged_df = merged_df[~merged_df['tokenID'].isnull() & (merged_df['tokenID'] != 'nan')] unique_contract_token_pairs = merged_df[['contractAddress_erc1155', 'tokenID']].drop_duplicates() for contract_address, token_id in unique_contract_token_pairs.itertuples(index=False): # Ensure token_id is a string token_id_str = str(token_id) if not token_id_str or token_id_str == 'nan': logger.warning("Encountered NaN or empty token_id. Skipping.") continue # Find market_slug and outcome using the market_lookup market_slug, outcome = find_market_info(token_id_str, market_lookup) if market_slug and outcome: # Update live price and pl in the DataFrame merged_df = update_live_price_and_pl(merged_df, token_id_str, market_slug=market_slug, outcome=outcome) else: logger.warning(f"Market info not found for token ID: {token_id_str}. Skipping PL calculation for these rows.") # Optionally, set 'pl' to 0 or np.nan for these rows merged_df.loc[merged_df['tokenID'] == token_id_str, 'pl'] = np.nan return merged_df def call_get_user_profile(wallet_id): """ Call subprocess to get user profile data by wallet_id. """ if not wallet_id: logger.error("No wallet ID provided.") return None try: logger.info(f"Calling subprocess to fetch user profile for wallet ID: {wallet_id}") # Execute get_user_profile.py using subprocess and pass wallet_id result = subprocess.run( ['python3', 'get_user_profile.py', wallet_id], # Make sure wallet_id is passed as an argument stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, text=True, timeout=30 # Set a timeout for the subprocess ) logger.debug(f"Subprocess stdout: {result.stdout}") logger.debug(f"Subprocess stderr: {result.stderr}") # Parse the JSON response from stdout user_data = json.loads(result.stdout) return user_data except subprocess.TimeoutExpired: logger.error(f"Subprocess timed out when fetching user profile for wallet ID: {wallet_id}") return None except subprocess.CalledProcessError as e: logger.error(f"Subprocess error when fetching user profile for wallet ID {wallet_id}: {e.stderr}") return None except json.JSONDecodeError as e: logger.error(f"Failed to parse JSON from subprocess for wallet ID {wallet_id}: {e}") return None def replace_hex_values(df, columns): """ Replace specific hex values in the given columns with their corresponding names. Args: - df (pd.DataFrame): The DataFrame containing the transaction data. - columns (list): List of column names where the hex values should be replaced. Returns: - pd.DataFrame: The DataFrame with the replaced values. """ # Mapping of hex values to their corresponding names replacement_dict = { '0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174': 'CTF_EXCHANGE', '0x4d97dcd97ec945f40cf65f87097ace5ea0476045': 'NEG_RISK_CTF_EXCHANGE', '0xC5d563A36AE78145C45a50134d48A1215220f80a': 'NEG_RISK_CTF_EXCHANGE_SPENDER', '0xd91E80cF2E7be2e162c6513ceD06f1dD0dA35296': 'NEG_RISK_ADAPTER', '0x4bFb41d5B3570DeFd03C39a9A4D8dE6Bd8B8982E': 'CTF_EXCHANGE_SPENDER', } for column in columns: if column in df.columns: df[column] = df[column].replace(replacement_dict) return df def process_wallet_data(wallet_addresses, api_key, plot=True, latest_price_mode=False): """ Processes user wallet data to generate user transaction information. If `latest_price_mode` is set to True, the function will only retrieve the latest prices for tokens without generating user reports. Args: - wallet_addresses (list): List of wallet addresses to process. - api_key (str): The Polygonscan API key. - plot (bool): Whether to generate plots for the user data. - latest_price_mode (bool): If True, only retrieve the latest transaction prices for the given wallets. """ # Load environment variables load_dotenv("keys.env") # Ensure the output directory exists output_dir = './data/user_trades/' os.makedirs(output_dir, exist_ok=True) # Load the market lookup JSON data market_lookup_path = './data/market_lookup.json' market_lookup = load_market_lookup(market_lookup_path) for wallet_address in wallet_addresses: # Fetch user info (username) based on wallet ID user_info = call_get_user_profile(wallet_address) # Pass wallet_address to the function username = user_info['username'] if user_info else "Unknown" # Sanitize the username to create a valid filename sanitized_username = sanitize_filename(username) logger.info(f"Processing wallet for user: {username}") # API URLs for ERC-20 and ERC-1155 transactions erc20_url = f"https://api.polygonscan.com/api?module=account&action=tokentx&address={wallet_address}&startblock=0&endblock=99999999&sort=asc&apikey={api_key}" erc1155_url = f"https://api.polygonscan.com/api?module=account&action=token1155tx&address={wallet_address}&startblock=0&endblock=99999999&sort=asc&apikey={api_key}" # Fetch ERC-20 and ERC-1155 transactions erc20_response = fetch_data(erc20_url) erc1155_response = fetch_data(erc1155_url) if erc20_response['status'] == '1' and erc1155_response['status'] == '1': erc20_data = erc20_response['result'] erc1155_data = erc1155_response['result'] # Convert data to DataFrames erc20_df = pd.DataFrame(erc20_data) erc1155_df = pd.DataFrame(erc1155_data) # Enrich ERC-1155 data with market_slug and outcome erc1155_df = enrich_erc1155_data(erc1155_df, market_lookup) # Add timestamps erc1155_df, erc20_df = add_timestamps(erc1155_df, erc20_df) # Merge and add financial columns merged_df = add_financial_columns(erc1155_df, erc20_df, wallet_address, market_lookup) if 'pl' in merged_df.columns: logger.info(f"'pl' column exists with {merged_df['pl'].count()} non-null values.") else: logger.error("'pl' column does not exist in merged_df after update_latest_prices.") # Replace hex values with the corresponding names columns_to_replace = ['contractAddress_erc1155', 'from_erc1155', 'to_erc1155'] merged_df = replace_hex_values(merged_df, columns_to_replace) # Save the merged and enriched data output_file = f'{output_dir}{sanitized_username}_enriched_transactions.csv' merged_df.to_csv(output_file, index=False) logger.info(f"Enriched data saved to {output_file}") # Check if 'pl' column exists and has non-null values if 'pl' in merged_df.columns and merged_df['pl'].notnull().any(): logger.info(f"'pl' column exists with {merged_df['pl'].count()} non-null values.") if not latest_price_mode: # Generate and save the Profit/Loss by trade plot plot_profit_loss_by_trade(merged_df, user_info) else: logger.warning(f"'pl' column is missing or empty for user {username}. Skipping PL plot.") logger.info("Data processing completed.") def call_scrape_wallet_ids(top_volume=True, top_profit=True): """ Scrape leaderboard and return wallet IDs based on top volume or top profit. Args: - top_volume (bool): Whether to fetch top volume users. - top_profit (bool): Whether to fetch top profit users. Returns: - List of wallet IDs. """ wallet_ids = [] # Construct the command to call get_leaderboard_wallet_ids.py with appropriate flags command = ['python3', 'get_leaderboard_wallet_ids.py'] if top_volume: command.append('--top-volume') if top_profit: command.append('--top-profit') try: # Run the script with the constructed command result = subprocess.run( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, text=True ) logger.debug(f"Leaderboard wallet script stdout: {result.stdout}") # Parse the output as JSON and extend the wallet_ids list wallet_ids.extend(json.loads(result.stdout)) except subprocess.CalledProcessError as e: logger.error(f"Error running get_leaderboard_wallet_ids.py: {e.stderr}") except json.JSONDecodeError as json_err: logger.error(f"Failed to parse JSON from get_leaderboard_wallet_ids.py: {json_err}") # Log the combined wallet IDs logger.info(f"Fetched {len(wallet_ids)} wallet IDs based on volume/profit flags.") return wallet_ids def process_and_plot_user_data(wallet_addresses, api_key, plot=True, latest_price_mode=False): """ Process wallet data for each user, calculate financial data, and optionally generate plots. Args: wallet_addresses (list): List of wallet addresses. api_key (str): Polygonscan API key. plot (bool): Whether to generate plots for the user data. latest_price_mode (bool): If True, only retrieve the latest prices, no plotting. """ # Load market lookup data market_lookup_path = './data/market_lookup.json' market_lookup = load_market_lookup(market_lookup_path) # Define the columns to keep columns_to_keep = [ 'timeStamp_erc1155', 'tokenID', 'tokenValue', 'market_slug', 'outcome', 'value', 'tokenDecimal', 'transaction_type', 'price_paid_per_token', 'total_purchase_value', 'shares', 'lost', 'won', 'pl', 'live_price' ] for wallet_address in wallet_addresses: # Fetch user info (username) based on wallet ID user_info = call_get_user_profile(wallet_address) username = user_info.get('username', "Unknown") logger.info(f"Processing wallet for user: {username} ({wallet_address})") # Fetch ERC-20 and ERC-1155 transactions erc20_df, erc1155_df = fetch_user_transactions(wallet_address, api_key) if erc20_df is not None and erc1155_df is not None: # Enrich ERC-1155 data with market_slug and outcome erc1155_df = enrich_erc1155_data(erc1155_df, market_lookup) # Add timestamps erc1155_df, erc20_df = add_timestamps(erc1155_df, erc20_df) # Merge and add financial columns merged_df = add_financial_columns(erc1155_df, erc20_df, wallet_address, market_lookup) # Check for Profit/Loss data if 'pl' in merged_df.columns and merged_df['pl'].notnull().any(): if not latest_price_mode and plot: # Generate all plots for the user generate_all_user_plots(merged_df, user_info) # Save the merged and enriched data sanitized_username = sanitize_filename(username) output_dir = './data/user_trades/' os.makedirs(output_dir, exist_ok=True) # Save to Parquet (default format) output_file_parquet = f'{output_dir}{sanitized_username}_enriched_transactions.parquet' merged_df.to_parquet(output_file_parquet, index=False) logger.info(f"Enriched data saved to {output_file_parquet}") # Save to CSV # Keep only the specified columns and sort by timeStamp_erc1155 merged_df = merged_df[columns_to_keep].sort_values(by='timeStamp_erc1155', ascending=True) output_file_csv = f'{output_dir}{sanitized_username}_enriched_transactions.csv' merged_df.to_csv(output_file_csv, index=False) logger.info(f"Enriched data saved to {output_file_csv}") else: logger.warning(f"Profit/Loss column missing or empty for user: {username}") else: logger.error(f"Failed to fetch transaction data for wallet: {wallet_address}") def generate_all_user_plots(merged_df, user_info): """ Generate all necessary plots for a user. Args: merged_df (DataFrame): The merged DataFrame with user transactions and financial info. user_info (dict): Dictionary containing user information. """ # Generate Profit/Loss by Trade plot plot_profit_loss_by_trade(merged_df, user_info) # Generate Shares Over Time plot plot_shares_over_time(merged_df, user_info) # Generate Total Purchase Value by Market plot plot_user_trades(merged_df, user_info) # Generate Pie Chart for Holdings create_and_save_pie_chart(merged_df, user_info) # Generate Treemap for Holdings create_and_save_treemap(merged_df, user_info) logger.info(f"All plots generated for user: {user_info['username']}") def fetch_user_transactions(wallet_address, api_key): """ Fetch ERC-20 and ERC-1155 transaction data for a user with pagination. Args: wallet_address (str): Wallet address to fetch transactions for. api_key (str): Polygonscan API key. Returns: (DataFrame, DataFrame): DataFrames for ERC-20 and ERC-1155 transactions. """ def fetch_paginated_data(url): """ Fetch paginated data from the provided URL. Args: url (str): Base URL for the API request. Returns: DataFrame: DataFrame with all paginated results. """ page = 1 offset = 1000 # Set the offset/page size based on the API's limits (e.g., 1000) all_data = [] while True: paginated_url = f"{url}&page={page}&offset={offset}" data = fetch_data(paginated_url) if data and data['status'] == '1' and len(data['result']) > 0: all_data.extend(data['result']) page += 1 else: break # Stop if no more data is returned return pd.DataFrame(all_data) # Fetch ERC-20 transactions with pagination erc20_url = (f"https://api.polygonscan.com/api" f"?module=account" f"&action=tokentx" f"&address={wallet_address}" f"&startblock=0" f"&endblock=99999999" f"&sort=desc" f"&apikey={api_key}") erc20_df = fetch_paginated_data(erc20_url) # Fetch ERC-1155 transactions with pagination erc1155_url = (f"https://api.polygonscan.com/api" f"?module=account" f"&action=token1155tx" f"&address={wallet_address}" f"&startblock=0" f"&endblock=99999999" f"&sort=desc" f"&apikey={api_key}") erc1155_df = fetch_paginated_data(erc1155_url) if not erc20_df.empty and not erc1155_df.empty: return erc20_df, erc1155_df else: return None, None def fetch_wallet_addresses(skip_leaderboard, top_volume, top_profit): """ Fetch wallet addresses based on leaderboard data or manual input. Args: skip_leaderboard (bool): Whether to skip leaderboard fetching. top_volume (bool): Fetch top volume users. top_profit (bool): Fetch top profit users. Returns: list: A list of wallet addresses to process. """ # Manually specified wallet addresses manual_wallet_ids = [ '0x76527252D7FEd00dC4D08d794aFa1cCC36069C2a', # Add more wallet IDs as needed ] if not skip_leaderboard: leaderboard_wallet_ids = call_scrape_wallet_ids(top_volume=top_volume, top_profit=top_profit) wallet_addresses = list(set(manual_wallet_ids + leaderboard_wallet_ids)) # Remove duplicates else: wallet_addresses = manual_wallet_ids return wallet_addresses def main(wallet_addresses=None, skip_leaderboard=False, top_volume=False, top_profit=False, plot=True, latest_price_mode=False): """ Main function to process wallet data and generate plots. Args: wallet_addresses (list): A list of wallet addresses to process (if provided). skip_leaderboard (bool): Whether to skip fetching leaderboard data. top_volume (bool): Whether to fetch top volume users. top_profit (bool): Whether to fetch top profit users. plot (bool): Whether to generate plots for the user data. latest_price_mode (bool): If True, only retrieve the latest prices, no plotting. """ # Load environment variables load_dotenv("keys.env") api_key = os.getenv('POLYGONSCAN_API_KEY') if not wallet_addresses: # Fetch wallet addresses if not provided wallet_addresses = fetch_wallet_addresses(skip_leaderboard, top_volume, top_profit) # Process wallet data and optionally generate plots process_and_plot_user_data(wallet_addresses, api_key, plot=plot, latest_price_mode=latest_price_mode) if __name__ == "__main__": # Use argparse to accept command-line arguments parser = argparse.ArgumentParser(description='Process wallet data for specific wallet addresses.') parser.add_argument( '--wallets', nargs='+', # This will accept multiple wallet IDs help='List of wallet addresses to process.' ) parser.add_argument('--skip-leaderboard', action='store_true', help='Skip leaderboard fetching.') parser.add_argument('--top-volume', action='store_true', help='Fetch top volume users.') parser.add_argument('--top-profit', action='store_true', help='Fetch top profit users.') parser.add_argument('--no-plot', action='store_true', help='Disable plot generation.') parser.add_argument('--latest-price-mode', action='store_true', help='Only retrieve the latest prices, no plotting.') args = parser.parse_args() # Call the main function with the parsed arguments main( wallet_addresses=args.wallets, skip_leaderboard=args.skip_leaderboard, top_volume=args.top_volume, top_profit=args.top_profit, plot=not args.no_plot, latest_price_mode=args.latest_price_mode )