← Back to index

Okay, I understand. You want a more comprehensive and "full" version of the Python simulation, building upon the previous example. This version will include:

Here's the code for this more advanced simulation:

import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import display, clear_output
import time

# --- Simulation Parameters ---
EXPLORATION_RATE = 0.3  # Increased exploration
LEARNING_RATE = 0.5   # Adjusted learning rate
DISCOUNT_FACTOR = 0.95
INITIAL_BALANCE = 10000
SIMULATION_STEPS = 5000
MAX_INVENTORY = 500  # Maximum inventory allowed
INVENTORY_PENALTY_THRESHOLD = 0.8 * MAX_INVENTORY # Threshold for applying inventory penalty
INVENTORY_REWARD_THRESHOLD = 0.2 * MAX_INVENTORY # Threshold for applying inventory reward

# --- Market Environment ---

class MarketEnvironment:
    def __init__(self, base_price=50, slope=2, price_fluctuation_range=0.05, order_book_depth=5):
        self.base_price = base_price
        self.slope = slope
        self.total_supply = 0
        self.price_history = []
        self.price_fluctuation_range = price_fluctuation_range
        self.order_book_depth = order_book_depth
        self.order_book = {"bids": [], "asks": []}  # Simplified order book
        self.transaction_history = [] # Keep a record of recent transactions
        self.event_schedule = [] # Schedule for dynamic bonding curve events
        self.current_time = 0 # Internal time tracker
        self.dynamic_event_probability = 0.05 # Probability of a dynamic event per step

        self.generate_initial_order_book()

    def generate_initial_order_book(self):
      # Generate an initial order book around the starting price
      current_price = self.get_price()
      for i in range(self.order_book_depth):
          bid_price = current_price * (1 - (i + 1) * 0.02)  # Example: Bids decrease by 2%
          ask_price = current_price * (1 + (i + 1) * 0.02)  # Example: Asks increase by 2%
          bid_amount = random.randint(10, 50)  # Random amount for each level
          ask_amount = random.randint(10, 50)
          self.order_book["bids"].append((bid_price, bid_amount))
          self.order_book["asks"].append((ask_price, ask_amount))

      # Sort bids and asks
      self.order_book["bids"].sort(key=lambda x: x[0], reverse=True)
      self.order_book["asks"].sort(key=lambda x: x[0])

    def add_dynamic_event(self, step, base_price_change, slope_change):
        self.event_schedule.append((step, base_price_change, slope_change))

    def apply_dynamic_events(self):
      if random.random() < self.dynamic_event_probability:
          # Generate a random event
          base_price_change = random.uniform(-0.1, 0.1)  # Up to +/- 10% change in base price
          slope_change = random.uniform(-0.2, 0.2)  # Up to +/- 20% change in slope
          event_step = self.current_time + random.randint(50, 200)  # Schedule the event in the future

          # Add the event to the schedule
          self.add_dynamic_event(event_step, base_price_change, slope_change)
          print(f"Scheduled dynamic event at step {event_step}: base_price_change={base_price_change:.2f}, slope_change={slope_change:.2f}")

      # Apply any scheduled events
      for event in self.event_schedule:
          if self.current_time >= event[0]:
              self.base_price *= (1 + event[1])
              self.slope *= (1 + event[2])
              self.event_schedule.remove(event)
              print(f"Applied dynamic event at step {self.current_time}: base_price={self.base_price:.2f}, slope={self.slope:.2f}")

    def get_price(self):
        # Base price with bonding curve
        price = self.base_price + self.slope * self.total_supply

        # Add random fluctuations
        fluctuation = random.uniform(
            -self.price_fluctuation_range, self.price_fluctuation_range
        )
        price *= 1 + fluctuation

        # Keep track of price history
        self.price_history.append(price)
        return price

    def get_state(self):
        # Determine price trend based on recent history
        if len(self.price_history) >= 5:
            # Calculate moving average to determine trend
            ma_current = np.mean(self.price_history[-3:])
            ma_previous = np.mean(self.price_history[-5:-2])

            if ma_current > ma_previous * 1.01: # 1% tolerance for trend
                trend = "Upward"
            elif ma_current < ma_previous * 0.99: # 1% tolerance for trend
                trend = "Downward"
            else:
                trend = "Stable"
        else:
            trend = "Stable"

        # Determine supply level
        if self.total_supply < INITIAL_BALANCE * 0.1: # 10% of initial balance as low supply
            supply_level = "Low"
        elif self.total_supply < INITIAL_BALANCE * 0.5: # 50% of initial balance as medium supply
            supply_level = "Medium"
        else:
            supply_level = "High"

        # Get best bid and ask prices from the order book
        best_bid = self.order_book["bids"][0][0] if self.order_book["bids"] else 0
        best_ask = self.order_book["asks"][0][0] if self.order_book["asks"] else float('inf')

        # Include recent transaction history
        recent_transactions = self.transaction_history[-5:]

        return {
            "trend": trend,
            "supply_level": supply_level,
            "best_bid": best_bid,
            "best_ask": best_ask,
            "recent_transactions": recent_transactions
        }

    def buy(self, amount, agent_balance):
      total_cost = 0
      remaining_amount = amount

      # Consume the order book
      asks_to_remove = []
      for i, (ask_price, ask_amount) in enumerate(self.order_book["asks"]):
          if agent_balance >= total_cost + ask_price * min(remaining_amount, ask_amount):
              if remaining_amount >= ask_amount:
                  total_cost += ask_price * ask_amount
                  remaining_amount -= ask_amount
                  asks_to_remove.append(i)
                  self.transaction_history.append((self.current_time, ask_price, ask_amount, 'buy'))
              else:
                  total_cost += ask_price * remaining_amount
                  self.order_book["asks"][i] = (ask_price, ask_amount - remaining_amount)
                  self.transaction_history.append((self.current_time, ask_price, remaining_amount, 'buy'))
                  remaining_amount = 0
                  break
          else:
              # Agent cannot afford to completely fill this order
              affordable_amount = agent_balance - total_cost // ask_price
              if affordable_amount > 0:
                  total_cost += ask_price * affordable_amount
                  self.order_book["asks"][i] = (ask_price, ask_amount - affordable_amount)
                  self.transaction_history.append((self.current_time, ask_price, affordable_amount, 'buy'))
                  remaining_amount -= affordable_amount
                  agent_balance -= total_cost # Update agent's balance
              break

      # Remove filled orders
      for index in reversed(asks_to_remove):
          del self.order_book["asks"][index]

      # Add new orders if the order book is depleted
      if not self.order_book["asks"] or len(self.order_book["asks"]) < self.order_book_depth:
          self.generate_initial_order_book() # Regenerate the order book based on the current price

      # Update total supply based on the amount actually bought
      self.total_supply += (amount - remaining_amount)

      return total_cost, amount - remaining_amount

    def sell(self, amount, agent_inventory):
        total_revenue = 0
        remaining_amount = amount

        # Consume the order book
        bids_to_remove = []
        for i, (bid_price, bid_amount) in enumerate(self.order_book["bids"]):
            if remaining_amount >= bid_amount:
                total_revenue += bid_price * bid_amount
                remaining_amount -= bid_amount
                bids_to_remove.append(i)
                self.transaction_history.append((self.current_time, bid_price, bid_amount, 'sell'))
            else:
                total_revenue += bid_price * remaining_amount
                self.order_book["bids"][i] = (bid_price, bid_amount - remaining_amount)
                self.transaction_history.append((self.current_time, bid_price, remaining_amount, 'sell'))
                remaining_amount = 0
                break

        # Remove filled orders
        for index in reversed(bids_to_remove):
            del self.order_book["bids"][index]

        # Add new orders if the order book is depleted
        if not self.order_book["bids"] or len(self.order_book["bids"]) < self.order_book_depth:
            self.generate_initial_order_book()

        # Update total supply based on the amount actually sold
        self.total_supply -= (amount - remaining_amount)

        return total_revenue, amount - remaining_amount

    def update_time(self):
      self.current_time += 1

# --- Agent ---

class Agent:
    def __init__(self, agent_id):
        self.agent_id = agent_id
        self.q_table = {}  # Q-table (state_tuple, action_tuple) -> value
        self.balance = INITIAL_BALANCE
        self.inventory = 0
        self.last_action = None
        self.last_market_state = None
        self.transaction_history = []

    def choose_action(self, state, valid_actions):
        if random.random() < EXPLORATION_RATE:
            # Explore
            action_type = random.choice(valid_actions)
            if action_type == "buy":
                amount = random.randint(1, min(50, int(self.balance / state["best_ask"]))) # Buy up to 50, or whatever the balance allows
            elif action_type == "sell":
                amount = random.randint(1, min(50, self.inventory)) # Sell up to 50 or whatever the inventory has
            else:
                amount = 0 # Hold
            return (action_type, amount)
        else:
            # Exploit
            state_tuple = self.get_state_tuple(state)
            q_values = {}

            for action_type in valid_actions:
              if action_type == "buy":
                for amount in range(1, min(50, int(self.balance / state["best_ask"])) + 1):
                  action_tuple = (action_type, amount)
                  q_values[action_tuple] = self.q_table.get(state_tuple, {}).get(action_tuple, 0.0)
              elif action_type == "sell":
                for amount in range(1, min(50, self.inventory) + 1):
                  action_tuple = (action_type, amount)
                  q_values[action_tuple] = self.q_table.get(state_tuple, {}).get(action_tuple, 0.0)
              else:
                action_tuple = (action_type, 0)
                q_values[action_tuple] = self.q_table.get(state_tuple, {}).get(action_tuple, 0.0)

            # Find the action with the highest Q-value
            best_action = max(q_values, key=q_values.get)
            return best_action

    def get_state_tuple(self, state):
      # Convert the state dictionary into a hashable tuple
      # Flatten the recent transactions into a tuple
      recent_transactions_flat = tuple(item for sublist in state["recent_transactions"] for item in sublist)

      return (
          state["trend"],
          state["supply_level"],
          state["best_bid"],
          state["best_ask"],
          recent_transactions_flat
      )

    def get_valid_actions(self):
      valid_actions = ["hold"]
      if self.balance > 0:
          valid_actions.append("buy")
      if self.inventory > 0:
          valid_actions.append("sell")
      return valid_actions

    def update_q_table(self, current_state, action, reward, new_state):
        current_state_tuple = self.get_state_tuple(current_state)
        new_state_tuple = self.get_state_tuple(new_state)
        action_tuple = action

        if current_state_tuple not in self.q_table:
            self.q_table[current_state_tuple] = {}
        if action_tuple not in self.q_table[current_state_tuple]:
            self.q_table[current_state_tuple][action_tuple] = 0.0

        if new_state_tuple not in self.q_table:
          self.q_table[new_state_tuple] = {}
          # Initialize possible actions in the new state
          for next_action_type in ["buy", "sell", "hold"]:
            if next_action_type == "buy":
              for amount in range(1, 51):
                self.q_table[new_state_tuple][(next_action_type, amount)] = 0.0
            elif next_action_type == "sell":
              for amount in range(1, 51):
                self.q_table[new_state_tuple][(next_action_type, amount)] = 0.0
            else:
              self.q_table[new_state_tuple][(next_action_type, 0)] = 0.0

        current_q = self.q_table[current_state_tuple][action_tuple]

        # Find the maximum Q-value for the next state among all possible actions
        max_next_q = max(self.q_table[new_state_tuple].values(), default=0.0)

        new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (
            reward + DISCOUNT_FACTOR * max_next_q
        )
        self.q_table[current_state_tuple][action_tuple] = new_q

    def calculate_reward(self, action_type, amount, cost=0, revenue=0):
        reward = 0

        if action_type == "buy":
            if cost > 0:
                reward -= cost  # Penalty for buying (spending money)
            else:
                reward = -50  # Penalty if buying wasn't possible (e.g., not enough balance)
        elif action_type == "sell":
            if revenue > 0:
                reward += revenue  # Reward for selling (getting money)
            else:
                reward = -50 # Penalty if selling wasn't possible (e.g., not enough inventory)
        elif action_type == "hold":
            # Small reward/penalty for holding to encourage some activity
            reward = 0

        # Inventory Penalty/Reward
        if self.inventory > INVENTORY_PENALTY_THRESHOLD:
          reward -= (self.inventory - INVENTORY_PENALTY_THRESHOLD) * 0.1 # Penalty for exceeding the inventory threshold
        elif self.inventory < INVENTORY_REWARD_THRESHOLD:
          reward += (INVENTORY_REWARD_THRESHOLD - self.inventory) * 0.05 # Reward for having low inventory

        return reward

# --- Simulation Loop ---

def run_simulation(agent, environment, num_steps=SIMULATION_STEPS):
    results = []
    for step in range(num_steps):
        environment.update_time() # Update the internal time of the environment
        environment.apply_dynamic_events() # Apply scheduled dynamic events
        state = environment.get_state()
        valid_actions = agent.get_valid_actions()
        action_tuple = agent.choose_action(state, valid_actions)
        action_type, amount = action_tuple

        reward = 0
        cost = 0
        revenue = 0

        if action_type == "buy":
            cost, actual_amount_bought = environment.buy(amount, agent.balance)
            if cost > 0:
                agent.balance -= cost
                agent.inventory += actual_amount_bought
                reward = agent.calculate_reward(action_type, actual_amount_bought, cost=cost)
            else:
                reward = agent.calculate_reward(action_type, amount, cost=0) # Not enough balance or order book issues

        elif action_type == "sell":
            revenue, actual_amount_sold = environment.sell(amount, agent.inventory)
            if revenue > 0:
                agent.balance += revenue
                agent.inventory -= actual_amount_sold
                reward = agent.calculate_reward(action_type, actual_amount_sold, revenue=revenue)
            else:
                reward = agent.calculate_reward(action_type, amount, revenue=0) # Not enough inventory or order book issues

        elif action_type == "hold":
            reward = agent.calculate_reward(action_type, amount) # No cost/revenue for hold

        new_state = environment.get_state()
        agent.update_q_table(state, action_tuple, reward, new_state)

        # Store results for analysis
        results.append(
            {
                "step": step,
                "state": state,
                "action_type": action_type,
                "amount_requested": amount,
                "amount_executed": amount - (amount - actual_amount_sold if action_type == "sell" else amount - actual_amount_bought),
                "reward": reward,
                "balance": agent.balance,
                "inventory": agent.inventory,
                "price": environment.get_price(),
                "total_supply": environment.total_supply,
                "base_price": environment.base_price,
                "slope": environment.slope
            }
        )

        # Update last state and action
        agent.last_action = action_tuple
        agent.last_market_state = state

        # Print current status every 50 steps
        if (step + 1) % 50 == 0:
          clear_output(wait=True)
          df_results = pd.DataFrame(results)
          display(df_results.tail(50))
          print(f"Step: {step+1}, Balance: {agent.balance:.2f}, Inventory: {agent.inventory}, Price: {environment.get_price():.2f}, Total Supply: {environment.total_supply}")
          time.sleep(0.1)

    return pd.DataFrame(results)

# --- Run the Simulation ---

# Create an agent and environment
agent = Agent(agent_id="simulated_agent_advanced")
environment = MarketEnvironment()

# Run the simulation
df_results = run_simulation(agent, environment, num_steps=SIMULATION_STEPS)

# --- Analysis and Visualization ---

# 1. Plot Balance and Inventory
plt.figure(figsize=(12, 6))
plt.plot(df_results["step"], df_results["balance"], label="Balance")
plt.plot(df_results["step"], df_results["inventory"], label="Inventory")
plt.xlabel("Step")
plt.ylabel("Value")
plt.title("Agent's Balance and Inventory Over Time")
plt.legend()
plt.show()

# 2. Plot Market Price
plt.figure(figsize=(12, 6))
plt.plot(df_results["step"], df_results["price"], label="Price")
plt.xlabel("Step")
plt.ylabel("Price")
plt.title("Market Price Over Time")
plt.legend()
plt.show()

# 3. Plot Actions
plt.figure(figsize=(12, 6))
plt.scatter(
    df_results[df_results["action_type"] == "buy"]["step"],
    df_results[df_results["action_type"] == "buy"]["amount_executed"],
    color="green",
    label="Buy",
    alpha=0.5,
)
plt.scatter(
    df_results[df_results["action_type"] == "sell"]["step"],
    df_results[df_results["action_type"] == "sell"]["amount_executed"],
    color="red",
    label="Sell",
    alpha=0.5,
)
plt.xlabel("Step")
plt.ylabel("Amount")
plt.title("Agent's Buy/Sell Actions Over Time")
plt.legend()
plt.show()

# 4. Plot Rewards
plt.figure(figsize=(12, 6))
plt.plot(df_results["step"], df_results["reward"], label="Reward", color="purple")
plt.xlabel("Step")
plt.ylabel("Reward")
plt.title("Agent's Reward Over Time")
plt.legend()
plt.show()

# 5. Plot Base Price and Slope (Dynamic Bonding Curve)
plt.figure(figsize=(12, 6))
plt.plot(df_results["step"], df_results["base_price"], label="Base Price", color="blue")
plt.plot(df_results["step"], df_results["slope"], label="Slope", color="orange")
plt.xlabel("Step")
plt.ylabel("Value")
plt.title("Dynamic Bonding Curve Parameters Over Time")
plt.legend()
plt.show()

# 6. Heatmap of Q-table (for a specific action, e.g., "buy")
# This is more complex to visualize for the full state space. We'll focus on a subset.
def plot_q_table_heatmap(agent, action_type, amount):
    q_subset = {}
    for state_tuple, actions in agent.q_table.items():
        if (action_type, amount) in actions:
            # Extract relevant state information (e.g., trend and supply level)
            simplified_state = (state_tuple[0], state_tuple[1])  # Trend, Supply Level
            q_subset[simplified_state] = actions[(action_type, amount)]

    if not q_subset:
      print(f"No Q-values found for action '{action_type}' with amount {amount}.")
      return

    q_df = pd.DataFrame.from_dict(q_subset, orient="index")
    q_df.index = pd.MultiIndex.from_tuples(q_df.index, names=["Trend", "Supply Level"])
    q_df.columns = [f"{action_type} ({amount})"]

    plt.figure(figsize=(8, 6))
    sns.heatmap(q_df, annot=True, cmap="coolwarm", cbar=True)
    plt.title(f"Q-table Heatmap for {action_type} (Amount: {amount})")
    plt.show()

# Example: Plot Q-values for "buy" action with amount 10
plot_q_table_heatmap(agent, "buy", 10)

# --- Performance Metrics ---

# Calculate total return
total_return = (df_results["balance"].iloc[-1] - INITIAL_BALANCE) / INITIAL_BALANCE
print(f"Total Return: {total_return * 100:.2f}%")

# Calculate Sharpe Ratio (simplified, assuming risk-free rate = 0)
daily_returns = df_results["balance"].pct_change()
sharpe_ratio = daily_returns.mean() / daily_returns.std() * np.sqrt(SIMULATION_STEPS)
print(f"Sharpe Ratio (annualized): {sharpe_ratio:.2f}")

# Calculate Maximum Drawdown
cumulative_returns = (1 + daily_returns).cumprod()
peak = cumulative_returns.expanding(min_periods=1).max()
drawdown = (cumulative_returns - peak) / peak
max_drawdown = drawdown.min()
print(f"Maximum Drawdown: {max_drawdown * 100:.2f}%")

# ---

# Display the final Q-table
print("\nFinal Q-Table:")
q_table_df = pd.DataFrame.from_dict(agent.q_table, orient='index')
display(q_table_df)

Key Improvements and Explanations:

Market Environment:

Agent:

Simulation Loop:

Analysis and Visualization:

How to Use and Experiment:

  1. Copy and Run: Copy the code into a Kaggle notebook or a local Python environment and run it.
  2. Observe: Watch the simulation progress (balance, inventory, price, etc.) and examine the final plots and performance metrics.
  3. Experiment:

Further Development:

This more comprehensive simulation provides a much richer environment for testing and developing your autonomous trading agent. Remember that this is still a simplification of the real world, but it's a significant step up from the initial version and should give you a much better idea of how your agent might perform on the actual Solana blockchain. Good luck!