# main.py - FastAPI application for Pokemon Livestream
import asyncio
import os
import random
import time
import traceback
import logging
from typing import List, Dict, Optional, Set
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
# --- Imports for poke_env and agents ---
from poke_env.player import Player
from poke_env import AccountConfiguration, ServerConfiguration
from poke_env.environment.battle import Battle
# Import the actual agent classes
from agents import OpenAIAgent, GeminiAgent, MistralAgent
# --- Configuration ---
CUSTOM_SERVER_URL = "wss://jofthomas.com/showdown/websocket"
CUSTOM_ACTION_URL = 'https://play.pokemonshowdown.com/action.php?'
CUSTOM_BATTLE_VIEW_URL_TEMPLATE = "https://jofthomas.com/play.pokemonshowdown.com/testclient.html#{battle_id}"
custom_config = ServerConfiguration(CUSTOM_SERVER_URL, CUSTOM_ACTION_URL)
DEFAULT_BATTLE_FORMAT = "gen9randombattle"
# Define available agents with their corresponding classes
AGENT_CONFIGS = {
"OpenAIAgent": {"class": OpenAIAgent, "password_env_var": "OPENAI_AGENT_PASSWORD"},
"GeminiAgent": {"class": GeminiAgent, "password_env_var": "GEMINI_AGENT_PASSWORD"},
"MistralAgent": {"class": MistralAgent, "password_env_var": "MISTRAL_AGENT_PASSWORD"},
}
# Filter out agents with missing passwords
AVAILABLE_AGENT_NAMES = [
name for name, cfg in AGENT_CONFIGS.items()
if os.environ.get(cfg.get("password_env_var", ""))
]
if not AVAILABLE_AGENT_NAMES:
print("FATAL ERROR: No agent configurations have their required password environment variables set. Exiting.")
exit(1)
# --- Global State Variables ---
active_agent_name: Optional[str] = None
active_agent_instance: Optional[Player] = None
active_agent_task: Optional[asyncio.Task] = None
current_battle_instance: Optional[Battle] = None
background_task_handle: Optional[asyncio.Task] = None
# --- Create FastAPI app ---
app = FastAPI(title="Pokemon Battle Livestream")
# --- Helper Functions ---
def get_active_battle(agent: Player) -> Optional[Battle]:
"""Returns the first non-finished battle for an agent."""
if agent and agent._battles:
active_battles = [b for b in agent._battles.values() if not b.finished]
if active_battles:
# Ensure the battle object has a battle_tag before returning
if hasattr(active_battles[0], 'battle_tag') and active_battles[0].battle_tag:
# Check if the battle_tag has the expected format (starts with 'battle-')
if active_battles[0].battle_tag.startswith("battle-"):
return active_battles[0]
else:
# This handles cases where the battle object might exist but tag isn't ready
# print(f"DEBUG: Found active battle for {agent.username} but tag '{active_battles[0].battle_tag}' not ready.")
return None
else:
# print(f"DEBUG: Found active battle for {agent.username} but it has no battle_tag attribute yet.")
return None
return None
def create_battle_iframe(battle_id: str) -> str:
"""Creates JUST the HTML for the battle iframe tag."""
print("Creating iframe content for battle ID: ", battle_id)
# Use the official client URL unless you specifically need the test client
# battle_url = f"https://play.pokemonshowdown.com/{battle_id}"
battle_url = f"https://jofthomas.com/play.pokemonshowdown.com/testclient.html#{battle_id}" # Using your custom URL
# Return ONLY the iframe tag with a class for styling
return f"""
"""
def create_idle_html(status_message: str, instruction: str) -> str:
"""Creates a visually appealing idle screen HTML fragment."""
# Returns ONLY the content div, not the full HTML page
return f"""
{status_message}
{instruction}
"""
def create_error_html(error_msg: str) -> str:
"""Creates HTML fragment to display an error message."""
# Returns ONLY the content div, not the full HTML page
return f"""
"""
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
# Keep connection alive. Client doesn't send messages in this setup.
# FastAPI's WebSocket implementation handles ping/pong internally usually.
# If needed, you could implement explicit keepalive here.
data = await websocket.receive_text()
# We don't expect messages from the client in this design,
# but log if received for debugging.
print(f"Received unexpected message from client: {data}")
# Or simply keep listening:
# await asyncio.sleep(60) # Example keepalive interval if needed
except WebSocketDisconnect as e:
print(f"WebSocket disconnected: Code {e.code}, Reason: {getattr(e, 'reason', 'N/A')}")
await manager.disconnect(websocket) # Use await here
except Exception as e:
# Catch other potential errors on the connection
print(f"WebSocket error: {e}")
traceback.print_exc()
await manager.disconnect(websocket) # Ensure disconnect on error
@app.on_event("startup")
async def startup_event():
"""Start background tasks when the application starts."""
global background_task_handle
# Mount static files directory (make sure 'static' folder exists)
# Place your 'pokemon_huggingface.png' inside this 'static' folder
static_dir = "static"
if not os.path.exists(static_dir):
os.makedirs(static_dir)
print(f"Created static directory at: {os.path.abspath(static_dir)}")
print("!!! Please add 'pokemon_huggingface.png' to this directory! !!!")
app.mount("/static", StaticFiles(directory=static_dir), name="static")
print(f"Mounted static directory '{static_dir}' at '/static'")
print("π Starting background tasks")
# Start the main lifecycle manager task
background_task_handle = asyncio.create_task(manage_agent_lifecycle(), name="LifecycleManager")
# Add the exception logging callback
background_task_handle.add_done_callback(log_task_exception)
print("β Background tasks started")
@app.on_event("shutdown")
async def shutdown_event():
"""Clean up tasks when shutting down."""
global background_task_handle, active_agent_instance
print("\nπ Shutting down application. Cleaning up...")
# 1. Cancel the main lifecycle manager task
if background_task_handle and not background_task_handle.done():
print("Cancelling background task...")
background_task_handle.cancel()
try:
await asyncio.wait_for(background_task_handle, timeout=5.0)
print("Background task cancelled successfully.")
except asyncio.CancelledError:
print("Background task cancellation confirmed (CancelledError).")
except asyncio.TimeoutError:
print("Background task did not finish cancelling within timeout.")
except Exception as e:
print(f"Error during background task cancellation: {e}")
# 2. Deactivate and disconnect any currently active agent
# Use a copy of the instance in case it gets cleared elsewhere during shutdown.
agent_to_disconnect = active_agent_instance
if agent_to_disconnect:
agent_name = agent_to_disconnect.username if hasattr(agent_to_disconnect, 'username') else 'Unknown Agent'
print(f"Disconnecting active agent '{agent_name}'...")
try:
# Check websocket status before disconnecting
if hasattr(agent_to_disconnect, '_websocket') and agent_to_disconnect._websocket and agent_to_disconnect._websocket.open:
await agent_to_disconnect.disconnect()
print(f"Agent '{agent_name}' disconnected.")
else:
print(f"Agent '{agent_name}' already disconnected or websocket not available.")
except Exception as e:
print(f"Error during agent disconnect on shutdown for '{agent_name}': {e}")
# 3. Close all active WebSocket connections cleanly
print(f"Closing {len(manager.active_connections)} client WebSocket connections...")
# Create tasks to close all connections concurrently
close_tasks = [
conn.close(code=1000, reason="Server shutting down") # 1000 = Normal Closure
for conn in list(manager.active_connections) # Iterate over a copy
]
if close_tasks:
await asyncio.gather(*close_tasks, return_exceptions=True) # Log potential errors during close
print("β Cleanup complete. Application shutdown.")
# For direct script execution
if __name__ == "__main__":
import uvicorn
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s:%(lineno)d - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Reduce noise from poke_env's default INFO logging if desired
logging.getLogger('poke_env').setLevel(logging.WARNING)
logging.getLogger('websockets.client').setLevel(logging.INFO) # Show websocket connection attempts
print("Starting Pokemon Battle Livestream Server...")
print("="*60)
if not AVAILABLE_AGENT_NAMES:
print("βββββββββββββββββββββ FATAL ERROR βββββββββββββββββββββ")
print(" No agents found with configured passwords!")
print(" Please set the required environment variables:")
for name, cfg in AGENT_CONFIGS.items():
print(f" - {cfg.get('password_env_var', 'N/A')} (for agent: {name})")
print("="*60)
exit("Exiting due to missing agent passwords.")
else:
print("β¨ Available Agents Found:")
for name in AVAILABLE_AGENT_NAMES:
print(f" - {name}")
print("="*60)
print(f"Server will run on http://0.0.0.0:7860")
print("="*60)
# Run with uvicorn
uvicorn.run(
"main:app", # Point to the FastAPI app instance
host="0.0.0.0",
port=7860,
reload=False, # Disable reload for production/stable testing
log_level="info" # Uvicorn's log level
)