Okay, I understand. You want a more comprehensive and "full" version of the Python simulation, building upon the previous example. This version will include:
More Realistic Market Dynamics:
Enhanced Agent Capabilities:
BASE_AMOUNT.Advanced Analysis and Visualization:
Here's the code for this more advanced simulation:
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import display, clear_output
import time
# --- Simulation Parameters ---
EXPLORATION_RATE = 0.3 # Increased exploration
LEARNING_RATE = 0.5 # Adjusted learning rate
DISCOUNT_FACTOR = 0.95
INITIAL_BALANCE = 10000
SIMULATION_STEPS = 5000
MAX_INVENTORY = 500 # Maximum inventory allowed
INVENTORY_PENALTY_THRESHOLD = 0.8 * MAX_INVENTORY # Threshold for applying inventory penalty
INVENTORY_REWARD_THRESHOLD = 0.2 * MAX_INVENTORY # Threshold for applying inventory reward
# --- Market Environment ---
class MarketEnvironment:
def __init__(self, base_price=50, slope=2, price_fluctuation_range=0.05, order_book_depth=5):
self.base_price = base_price
self.slope = slope
self.total_supply = 0
self.price_history = []
self.price_fluctuation_range = price_fluctuation_range
self.order_book_depth = order_book_depth
self.order_book = {"bids": [], "asks": []} # Simplified order book
self.transaction_history = [] # Keep a record of recent transactions
self.event_schedule = [] # Schedule for dynamic bonding curve events
self.current_time = 0 # Internal time tracker
self.dynamic_event_probability = 0.05 # Probability of a dynamic event per step
self.generate_initial_order_book()
def generate_initial_order_book(self):
# Generate an initial order book around the starting price
current_price = self.get_price()
for i in range(self.order_book_depth):
bid_price = current_price * (1 - (i + 1) * 0.02) # Example: Bids decrease by 2%
ask_price = current_price * (1 + (i + 1) * 0.02) # Example: Asks increase by 2%
bid_amount = random.randint(10, 50) # Random amount for each level
ask_amount = random.randint(10, 50)
self.order_book["bids"].append((bid_price, bid_amount))
self.order_book["asks"].append((ask_price, ask_amount))
# Sort bids and asks
self.order_book["bids"].sort(key=lambda x: x[0], reverse=True)
self.order_book["asks"].sort(key=lambda x: x[0])
def add_dynamic_event(self, step, base_price_change, slope_change):
self.event_schedule.append((step, base_price_change, slope_change))
def apply_dynamic_events(self):
if random.random() < self.dynamic_event_probability:
# Generate a random event
base_price_change = random.uniform(-0.1, 0.1) # Up to +/- 10% change in base price
slope_change = random.uniform(-0.2, 0.2) # Up to +/- 20% change in slope
event_step = self.current_time + random.randint(50, 200) # Schedule the event in the future
# Add the event to the schedule
self.add_dynamic_event(event_step, base_price_change, slope_change)
print(f"Scheduled dynamic event at step {event_step}: base_price_change={base_price_change:.2f}, slope_change={slope_change:.2f}")
# Apply any scheduled events
for event in self.event_schedule:
if self.current_time >= event[0]:
self.base_price *= (1 + event[1])
self.slope *= (1 + event[2])
self.event_schedule.remove(event)
print(f"Applied dynamic event at step {self.current_time}: base_price={self.base_price:.2f}, slope={self.slope:.2f}")
def get_price(self):
# Base price with bonding curve
price = self.base_price + self.slope * self.total_supply
# Add random fluctuations
fluctuation = random.uniform(
-self.price_fluctuation_range, self.price_fluctuation_range
)
price *= 1 + fluctuation
# Keep track of price history
self.price_history.append(price)
return price
def get_state(self):
# Determine price trend based on recent history
if len(self.price_history) >= 5:
# Calculate moving average to determine trend
ma_current = np.mean(self.price_history[-3:])
ma_previous = np.mean(self.price_history[-5:-2])
if ma_current > ma_previous * 1.01: # 1% tolerance for trend
trend = "Upward"
elif ma_current < ma_previous * 0.99: # 1% tolerance for trend
trend = "Downward"
else:
trend = "Stable"
else:
trend = "Stable"
# Determine supply level
if self.total_supply < INITIAL_BALANCE * 0.1: # 10% of initial balance as low supply
supply_level = "Low"
elif self.total_supply < INITIAL_BALANCE * 0.5: # 50% of initial balance as medium supply
supply_level = "Medium"
else:
supply_level = "High"
# Get best bid and ask prices from the order book
best_bid = self.order_book["bids"][0][0] if self.order_book["bids"] else 0
best_ask = self.order_book["asks"][0][0] if self.order_book["asks"] else float('inf')
# Include recent transaction history
recent_transactions = self.transaction_history[-5:]
return {
"trend": trend,
"supply_level": supply_level,
"best_bid": best_bid,
"best_ask": best_ask,
"recent_transactions": recent_transactions
}
def buy(self, amount, agent_balance):
total_cost = 0
remaining_amount = amount
# Consume the order book
asks_to_remove = []
for i, (ask_price, ask_amount) in enumerate(self.order_book["asks"]):
if agent_balance >= total_cost + ask_price * min(remaining_amount, ask_amount):
if remaining_amount >= ask_amount:
total_cost += ask_price * ask_amount
remaining_amount -= ask_amount
asks_to_remove.append(i)
self.transaction_history.append((self.current_time, ask_price, ask_amount, 'buy'))
else:
total_cost += ask_price * remaining_amount
self.order_book["asks"][i] = (ask_price, ask_amount - remaining_amount)
self.transaction_history.append((self.current_time, ask_price, remaining_amount, 'buy'))
remaining_amount = 0
break
else:
# Agent cannot afford to completely fill this order
affordable_amount = agent_balance - total_cost // ask_price
if affordable_amount > 0:
total_cost += ask_price * affordable_amount
self.order_book["asks"][i] = (ask_price, ask_amount - affordable_amount)
self.transaction_history.append((self.current_time, ask_price, affordable_amount, 'buy'))
remaining_amount -= affordable_amount
agent_balance -= total_cost # Update agent's balance
break
# Remove filled orders
for index in reversed(asks_to_remove):
del self.order_book["asks"][index]
# Add new orders if the order book is depleted
if not self.order_book["asks"] or len(self.order_book["asks"]) < self.order_book_depth:
self.generate_initial_order_book() # Regenerate the order book based on the current price
# Update total supply based on the amount actually bought
self.total_supply += (amount - remaining_amount)
return total_cost, amount - remaining_amount
def sell(self, amount, agent_inventory):
total_revenue = 0
remaining_amount = amount
# Consume the order book
bids_to_remove = []
for i, (bid_price, bid_amount) in enumerate(self.order_book["bids"]):
if remaining_amount >= bid_amount:
total_revenue += bid_price * bid_amount
remaining_amount -= bid_amount
bids_to_remove.append(i)
self.transaction_history.append((self.current_time, bid_price, bid_amount, 'sell'))
else:
total_revenue += bid_price * remaining_amount
self.order_book["bids"][i] = (bid_price, bid_amount - remaining_amount)
self.transaction_history.append((self.current_time, bid_price, remaining_amount, 'sell'))
remaining_amount = 0
break
# Remove filled orders
for index in reversed(bids_to_remove):
del self.order_book["bids"][index]
# Add new orders if the order book is depleted
if not self.order_book["bids"] or len(self.order_book["bids"]) < self.order_book_depth:
self.generate_initial_order_book()
# Update total supply based on the amount actually sold
self.total_supply -= (amount - remaining_amount)
return total_revenue, amount - remaining_amount
def update_time(self):
self.current_time += 1
# --- Agent ---
class Agent:
def __init__(self, agent_id):
self.agent_id = agent_id
self.q_table = {} # Q-table (state_tuple, action_tuple) -> value
self.balance = INITIAL_BALANCE
self.inventory = 0
self.last_action = None
self.last_market_state = None
self.transaction_history = []
def choose_action(self, state, valid_actions):
if random.random() < EXPLORATION_RATE:
# Explore
action_type = random.choice(valid_actions)
if action_type == "buy":
amount = random.randint(1, min(50, int(self.balance / state["best_ask"]))) # Buy up to 50, or whatever the balance allows
elif action_type == "sell":
amount = random.randint(1, min(50, self.inventory)) # Sell up to 50 or whatever the inventory has
else:
amount = 0 # Hold
return (action_type, amount)
else:
# Exploit
state_tuple = self.get_state_tuple(state)
q_values = {}
for action_type in valid_actions:
if action_type == "buy":
for amount in range(1, min(50, int(self.balance / state["best_ask"])) + 1):
action_tuple = (action_type, amount)
q_values[action_tuple] = self.q_table.get(state_tuple, {}).get(action_tuple, 0.0)
elif action_type == "sell":
for amount in range(1, min(50, self.inventory) + 1):
action_tuple = (action_type, amount)
q_values[action_tuple] = self.q_table.get(state_tuple, {}).get(action_tuple, 0.0)
else:
action_tuple = (action_type, 0)
q_values[action_tuple] = self.q_table.get(state_tuple, {}).get(action_tuple, 0.0)
# Find the action with the highest Q-value
best_action = max(q_values, key=q_values.get)
return best_action
def get_state_tuple(self, state):
# Convert the state dictionary into a hashable tuple
# Flatten the recent transactions into a tuple
recent_transactions_flat = tuple(item for sublist in state["recent_transactions"] for item in sublist)
return (
state["trend"],
state["supply_level"],
state["best_bid"],
state["best_ask"],
recent_transactions_flat
)
def get_valid_actions(self):
valid_actions = ["hold"]
if self.balance > 0:
valid_actions.append("buy")
if self.inventory > 0:
valid_actions.append("sell")
return valid_actions
def update_q_table(self, current_state, action, reward, new_state):
current_state_tuple = self.get_state_tuple(current_state)
new_state_tuple = self.get_state_tuple(new_state)
action_tuple = action
if current_state_tuple not in self.q_table:
self.q_table[current_state_tuple] = {}
if action_tuple not in self.q_table[current_state_tuple]:
self.q_table[current_state_tuple][action_tuple] = 0.0
if new_state_tuple not in self.q_table:
self.q_table[new_state_tuple] = {}
# Initialize possible actions in the new state
for next_action_type in ["buy", "sell", "hold"]:
if next_action_type == "buy":
for amount in range(1, 51):
self.q_table[new_state_tuple][(next_action_type, amount)] = 0.0
elif next_action_type == "sell":
for amount in range(1, 51):
self.q_table[new_state_tuple][(next_action_type, amount)] = 0.0
else:
self.q_table[new_state_tuple][(next_action_type, 0)] = 0.0
current_q = self.q_table[current_state_tuple][action_tuple]
# Find the maximum Q-value for the next state among all possible actions
max_next_q = max(self.q_table[new_state_tuple].values(), default=0.0)
new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (
reward + DISCOUNT_FACTOR * max_next_q
)
self.q_table[current_state_tuple][action_tuple] = new_q
def calculate_reward(self, action_type, amount, cost=0, revenue=0):
reward = 0
if action_type == "buy":
if cost > 0:
reward -= cost # Penalty for buying (spending money)
else:
reward = -50 # Penalty if buying wasn't possible (e.g., not enough balance)
elif action_type == "sell":
if revenue > 0:
reward += revenue # Reward for selling (getting money)
else:
reward = -50 # Penalty if selling wasn't possible (e.g., not enough inventory)
elif action_type == "hold":
# Small reward/penalty for holding to encourage some activity
reward = 0
# Inventory Penalty/Reward
if self.inventory > INVENTORY_PENALTY_THRESHOLD:
reward -= (self.inventory - INVENTORY_PENALTY_THRESHOLD) * 0.1 # Penalty for exceeding the inventory threshold
elif self.inventory < INVENTORY_REWARD_THRESHOLD:
reward += (INVENTORY_REWARD_THRESHOLD - self.inventory) * 0.05 # Reward for having low inventory
return reward
# --- Simulation Loop ---
def run_simulation(agent, environment, num_steps=SIMULATION_STEPS):
results = []
for step in range(num_steps):
environment.update_time() # Update the internal time of the environment
environment.apply_dynamic_events() # Apply scheduled dynamic events
state = environment.get_state()
valid_actions = agent.get_valid_actions()
action_tuple = agent.choose_action(state, valid_actions)
action_type, amount = action_tuple
reward = 0
cost = 0
revenue = 0
if action_type == "buy":
cost, actual_amount_bought = environment.buy(amount, agent.balance)
if cost > 0:
agent.balance -= cost
agent.inventory += actual_amount_bought
reward = agent.calculate_reward(action_type, actual_amount_bought, cost=cost)
else:
reward = agent.calculate_reward(action_type, amount, cost=0) # Not enough balance or order book issues
elif action_type == "sell":
revenue, actual_amount_sold = environment.sell(amount, agent.inventory)
if revenue > 0:
agent.balance += revenue
agent.inventory -= actual_amount_sold
reward = agent.calculate_reward(action_type, actual_amount_sold, revenue=revenue)
else:
reward = agent.calculate_reward(action_type, amount, revenue=0) # Not enough inventory or order book issues
elif action_type == "hold":
reward = agent.calculate_reward(action_type, amount) # No cost/revenue for hold
new_state = environment.get_state()
agent.update_q_table(state, action_tuple, reward, new_state)
# Store results for analysis
results.append(
{
"step": step,
"state": state,
"action_type": action_type,
"amount_requested": amount,
"amount_executed": amount - (amount - actual_amount_sold if action_type == "sell" else amount - actual_amount_bought),
"reward": reward,
"balance": agent.balance,
"inventory": agent.inventory,
"price": environment.get_price(),
"total_supply": environment.total_supply,
"base_price": environment.base_price,
"slope": environment.slope
}
)
# Update last state and action
agent.last_action = action_tuple
agent.last_market_state = state
# Print current status every 50 steps
if (step + 1) % 50 == 0:
clear_output(wait=True)
df_results = pd.DataFrame(results)
display(df_results.tail(50))
print(f"Step: {step+1}, Balance: {agent.balance:.2f}, Inventory: {agent.inventory}, Price: {environment.get_price():.2f}, Total Supply: {environment.total_supply}")
time.sleep(0.1)
return pd.DataFrame(results)
# --- Run the Simulation ---
# Create an agent and environment
agent = Agent(agent_id="simulated_agent_advanced")
environment = MarketEnvironment()
# Run the simulation
df_results = run_simulation(agent, environment, num_steps=SIMULATION_STEPS)
# --- Analysis and Visualization ---
# 1. Plot Balance and Inventory
plt.figure(figsize=(12, 6))
plt.plot(df_results["step"], df_results["balance"], label="Balance")
plt.plot(df_results["step"], df_results["inventory"], label="Inventory")
plt.xlabel("Step")
plt.ylabel("Value")
plt.title("Agent's Balance and Inventory Over Time")
plt.legend()
plt.show()
# 2. Plot Market Price
plt.figure(figsize=(12, 6))
plt.plot(df_results["step"], df_results["price"], label="Price")
plt.xlabel("Step")
plt.ylabel("Price")
plt.title("Market Price Over Time")
plt.legend()
plt.show()
# 3. Plot Actions
plt.figure(figsize=(12, 6))
plt.scatter(
df_results[df_results["action_type"] == "buy"]["step"],
df_results[df_results["action_type"] == "buy"]["amount_executed"],
color="green",
label="Buy",
alpha=0.5,
)
plt.scatter(
df_results[df_results["action_type"] == "sell"]["step"],
df_results[df_results["action_type"] == "sell"]["amount_executed"],
color="red",
label="Sell",
alpha=0.5,
)
plt.xlabel("Step")
plt.ylabel("Amount")
plt.title("Agent's Buy/Sell Actions Over Time")
plt.legend()
plt.show()
# 4. Plot Rewards
plt.figure(figsize=(12, 6))
plt.plot(df_results["step"], df_results["reward"], label="Reward", color="purple")
plt.xlabel("Step")
plt.ylabel("Reward")
plt.title("Agent's Reward Over Time")
plt.legend()
plt.show()
# 5. Plot Base Price and Slope (Dynamic Bonding Curve)
plt.figure(figsize=(12, 6))
plt.plot(df_results["step"], df_results["base_price"], label="Base Price", color="blue")
plt.plot(df_results["step"], df_results["slope"], label="Slope", color="orange")
plt.xlabel("Step")
plt.ylabel("Value")
plt.title("Dynamic Bonding Curve Parameters Over Time")
plt.legend()
plt.show()
# 6. Heatmap of Q-table (for a specific action, e.g., "buy")
# This is more complex to visualize for the full state space. We'll focus on a subset.
def plot_q_table_heatmap(agent, action_type, amount):
q_subset = {}
for state_tuple, actions in agent.q_table.items():
if (action_type, amount) in actions:
# Extract relevant state information (e.g., trend and supply level)
simplified_state = (state_tuple[0], state_tuple[1]) # Trend, Supply Level
q_subset[simplified_state] = actions[(action_type, amount)]
if not q_subset:
print(f"No Q-values found for action '{action_type}' with amount {amount}.")
return
q_df = pd.DataFrame.from_dict(q_subset, orient="index")
q_df.index = pd.MultiIndex.from_tuples(q_df.index, names=["Trend", "Supply Level"])
q_df.columns = [f"{action_type} ({amount})"]
plt.figure(figsize=(8, 6))
sns.heatmap(q_df, annot=True, cmap="coolwarm", cbar=True)
plt.title(f"Q-table Heatmap for {action_type} (Amount: {amount})")
plt.show()
# Example: Plot Q-values for "buy" action with amount 10
plot_q_table_heatmap(agent, "buy", 10)
# --- Performance Metrics ---
# Calculate total return
total_return = (df_results["balance"].iloc[-1] - INITIAL_BALANCE) / INITIAL_BALANCE
print(f"Total Return: {total_return * 100:.2f}%")
# Calculate Sharpe Ratio (simplified, assuming risk-free rate = 0)
daily_returns = df_results["balance"].pct_change()
sharpe_ratio = daily_returns.mean() / daily_returns.std() * np.sqrt(SIMULATION_STEPS)
print(f"Sharpe Ratio (annualized): {sharpe_ratio:.2f}")
# Calculate Maximum Drawdown
cumulative_returns = (1 + daily_returns).cumprod()
peak = cumulative_returns.expanding(min_periods=1).max()
drawdown = (cumulative_returns - peak) / peak
max_drawdown = drawdown.min()
print(f"Maximum Drawdown: {max_drawdown * 100:.2f}%")
# ---
# Display the final Q-table
print("\nFinal Q-Table:")
q_table_df = pd.DataFrame.from_dict(agent.q_table, orient='index')
display(q_table_df)
Key Improvements and Explanations:
Market Environment:
generate_initial_order_book(): Creates a basic order book around the initial price with some random bids and asks.add_dynamic_event(): Allows you to schedule changes to the bonding curve parameters (base price and slope) at specific simulation steps. This can simulate external events impacting the market.apply_dynamic_events(): Applies the scheduled events and also introduces a probability of random dynamic events occurring at each step.get_price():get_state():buy() and sell():order_book.total_supply is updated based on the actual amount of tokens bought or sold.update_time(): Added to keep track of the current simulation step in the MarketEnvironment.Agent:
q_table: The Q-table now uses a tuple representation of the state get_state_tuple() and action (action_type, amount) to handle the more complex state and action spaces.choose_action():buy, sell, or hold.valid_actions are determined based on the agent's current balance and inventory.get_state_tuple(): Converts the state dictionary into a tuple, making it hashable so it can be used as a key in the Q-table.get_valid_actions(): Determines which actions are valid based on the agent's current balance and inventory.update_q_table():calculate_reward():Simulation Loop:
environment.update_time() and environment.apply_dynamic_events() to advance the simulation time and handle dynamic events.agent.get_valid_actions() method to determine the valid actions for the agent in each step.(action_type, amount).buy() and sell() methods of the environment are updated to return the actual amount of tokens bought or sold, which may be different from the requested amount due to order book limitations.amount_requested, amount_executed, base_price, and slope.Analysis and Visualization:
base_price and slope if dynamic events are used.How to Use and Experiment:
environment.add_dynamic_event() calls in the __main__ block to create different scenarios with changing bonding curve parameters.order_book_depth and the logic in generate_initial_order_book() to create different order book structures.calculate_reward().get_state(), such as volume, volatility indicators, or even external data.Further Development:
This more comprehensive simulation provides a much richer environment for testing and developing your autonomous trading agent. Remember that this is still a simplification of the real world, but it's a significant step up from the initial version and should give you a much better idea of how your agent might perform on the actual Solana blockchain. Good luck!