Building Stateful MCP Environments
A comprehensive tutorial for creating stateful MCP environments for reinforcement learning
Building Stateful MCP Environments
This tutorial teaches you how to build stateful environments using the Model Context Protocol (MCP). You'll learn how to create environments that maintain state between agent interactions, enabling complex multi-step tasks for reinforcement learning.
What You'll Build
By the end of this tutorial, you'll understand how to:
- Create an MCP server that maintains per-session state
- Build a gym-style environment wrapper that communicates with your MCP server
- Generate training datasets for your custom environment
- Deploy and test your stateful environment locally
Prerequisites
- Basic understanding of reinforcement learning concepts (states, actions, rewards)
- Familiarity with Python and async programming
- Understanding of REST APIs and HTTP concepts
Understanding the Architecture
A stateful MCP environment consists of four key components:
1. The MCP Server (State Owner)
The MCP server owns all state and exposes tools that agents can call. Unlike stateless APIs, it maintains session-specific data across multiple tool invocations.
Key responsibilities:
- Store per-session state (scores, history, resources, etc.)
- Expose tools for agent actions (submit, query, manipulate state)
- Provide reward calculations and observations
- Manage session lifecycle and cleanup
2. The Environment Wrapper
A minimal gym-style wrapper that translates between the RL harness and your MCP server.
Key responsibilities:
- Initialize with server connection details
- Forward agent actions to MCP tools
- Extract rewards and observations from MCP responses
- Signal episode termination
3. The Dataset Generator
Creates training tasks with initial prompts and state parameters.
Key responsibilities:
- Generate JSONL files with task specifications
- Provide agent instructions and hints
- Create curriculum variations (difficulty levels, parameters)
4. The MCP Client Config
Registers your server so the runtime can discover and connect to it.
Key responsibilities:
- Map server names to connection endpoints
- Specify transport types (HTTP, stdio)
- Configure authentication if needed
Tutorial: Building a Number Guessing Game
Let's build a complete stateful environment step by step. We'll create a game where agents guess numbers within a range and receive feedback.
Step 1: Define Your Session State
First, define what data you need to track per session:
from dataclasses import dataclass, field
from typing import List, Dict, Any
from datetime import datetime
@dataclass
class SessionState:
"""Tracks state for a single agent session."""
# Game configuration
min_range: int = 1
max_range: int = 100
target_number: int = 50
# Tracking submissions
submissions: List[int] = field(default_factory=list)
best_score: float = 0.0
total_attempts: int = 0
# History for observation
recent_history: List[Dict[str, Any]] = field(default_factory=list)
max_history: int = 25
# Session metadata
created_at: datetime = field(default_factory=datetime.now)
def add_submission(self, number: int) -> Dict[str, Any]:
"""Record a guess and calculate feedback."""
self.total_attempts += 1
self.submissions.append(number)
# Calculate distance-based score
distance = abs(self.target_number - number)
max_distance = self.max_range - self.min_range
score = max(0.0, 1.0 - (distance / max_distance))
# Update best score
if score > self.best_score:
self.best_score = score
# Create feedback
if number < self.target_number:
hint = "too low"
elif number > self.target_number:
hint = "too high"
else:
hint = "correct"
# Add to history
entry = {
"attempt": self.total_attempts,
"guess": number,
"hint": hint,
"score": score,
"distance": distance
}
self.recent_history.append(entry)
# Trim history to prevent unbounded growth
if len(self.recent_history) > self.max_history:
self.recent_history = self.recent_history[-self.max_history:]
return entry
def to_public_dict(self) -> Dict[str, Any]:
"""Return state visible to the agent (no target!)."""
return {
"range": [self.min_range, self.max_range],
"attempts": self.total_attempts,
"best_score": self.best_score,
"recent_history": self.recent_history[-5:] # Last 5 only
}
def to_private_dict(self) -> Dict[str, Any]:
"""Return full state for debugging."""
return {
"target": self.target_number,
"range": [self.min_range, self.max_range],
"attempts": self.total_attempts,
"submissions": self.submissions,
"best_score": self.best_score,
"history_length": len(self.recent_history)
}Design tips:
- Keep public vs. private state separate (don't leak the target!)
- Limit history size to prevent memory bloat
- Store both raw data and computed metrics
- Make state serializable for debugging
Step 2: Build the MCP Server
Now create the MCP server that manages sessions and exposes tools:
import asyncio
import json
import random
from typing import Any, Dict, Optional
from threading import Lock
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
# Session storage
_session_state: Dict[str, SessionState] = {}
_session_lock = Lock()
def _extract_session_id(request_context: Any) -> str:
"""Extract stable session ID from request metadata."""
# The MCP protocol passes session info in request context
meta = getattr(request_context, 'meta', {})
session_id = meta.get('sessionId', 'default')
return session_id
def _get_session(session_id: str, init_params: Optional[Dict] = None) -> SessionState:
"""Get or create session state."""
with _session_lock:
if session_id not in _session_state:
# Initialize new session
params = init_params or {}
min_val = params.get('min_range', 1)
max_val = params.get('max_range', 100)
target = params.get('target')
if target is None:
target = random.randint(min_val, max_val)
_session_state[session_id] = SessionState(
min_range=min_val,
max_range=max_val,
target_number=target
)
return _session_state[session_id]
# Create the MCP server
app = Server("number-guessing-game")
@app.list_tools()
async def list_tools() -> list[Tool]:
"""List available tools for agents."""
return [
Tool(
name="submit-number",
description="Submit a number guess and receive feedback",
inputSchema={
"type": "object",
"properties": {
"number": {
"type": "integer",
"description": "Your guess"
},
"init": {
"type": "object",
"description": "Optional initialization params for new sessions",
"properties": {
"min_range": {"type": "integer"},
"max_range": {"type": "integer"},
"target": {"type": "integer"}
}
}
},
"required": ["number"]
}
),
Tool(
name="get-reward",
description="Get current reward and observation",
inputSchema={
"type": "object",
"properties": {}
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: Any, request_context: Any) -> list[TextContent]:
"""Handle tool calls from agents."""
session_id = _extract_session_id(request_context)
if name == "submit-number":
# Extract arguments
number = arguments.get("number")
init_params = arguments.get("init")
if number is None:
return [TextContent(
type="text",
text=json.dumps({
"error": "Missing required parameter: number"
})
)]
# Get session and record submission
session = _get_session(session_id, init_params)
result = session.add_submission(number)
# Return human-readable and structured response
response = {
"message": f"You guessed {number}. That's {result['hint']}!",
"score": result["score"],
"distance": result["distance"],
"attempt": result["attempt"],
"state": session.to_public_dict()
}
return [TextContent(
type="text",
text=json.dumps(response, indent=2)
)]
elif name == "get-reward":
# Retrieve current session state
session = _get_session(session_id)
# Return reward and full observation
response = {
"reward": session.best_score,
"total_attempts": session.total_attempts,
"state": session.to_public_dict(),
"recent_history": session.recent_history[-10:],
"done": session.best_score >= 1.0 # Perfect guess ends episode
}
return [TextContent(
type="text",
text=json.dumps(response, indent=2)
)]
else:
return [TextContent(
type="text",
text=json.dumps({"error": f"Unknown tool: {name}"})
)]
async def main():
"""Run the MCP server."""
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())Key patterns:
- Session management: Use
_extract_session_id()to get a stable session key from request metadata - Thread safety: Protect shared session dictionary with a lock
- Tool schema: Define clear input schemas so agents know what parameters to send
- Structured responses: Return both human-readable text and machine-parseable JSON
- Lazy initialization: Create sessions on first access with optional parameters
Step 3: Create the Environment Wrapper
Build a minimal gym-style wrapper that connects to your MCP server:
import json
from typing import Any, Dict, Tuple
class NumberGuessingEnv:
"""Gym-style wrapper for the number guessing MCP server."""
def __init__(self):
self.mcp = None # Set by harness during setup
self.server_id = None
self.reward_tool_name = None
self.action_tool_name = None
self.done = False
def setup(self, **state: Any) -> str:
"""Initialize environment from task state.
Args:
state: Task configuration containing:
- server_id: Which MCP server to use
- reward_tool: Tool name for getting rewards
- action_tool: Tool name for submitting actions
- prompt: Initial instruction for the agent
Returns:
The initial observation (prompt text)
"""
# Extract required configuration
self.server_id = state.get("server_id", "NumberGuessingGame")
self.reward_tool_name = state.get("reward_tool", "get-reward")
self.action_tool_name = state.get("action_tool", "submit-number")
# Get initial prompt
prompt = state.get("prompt", "Guess the target number!")
# Optionally initialize server with parameters
init_params = state.get("init_params", {})
if init_params:
# Make a dummy call to initialize the session
self.mcp.call_tool(
self.server_id,
self.action_tool_name,
{"number": (init_params.get("min_range", 1) + init_params.get("max_range", 100)) // 2, "init": init_params}
)
self.done = False
return prompt
def step(self, action: str) -> Tuple[str, float, bool, Dict[str, Any]]:
"""Execute one environment step.
Args:
action: The agent's action (typically a tool call in text form)
Returns:
observation: Text observation for next step
reward: Scalar reward value
done: Whether episode is complete
info: Auxiliary information for debugging
"""
# The agent has already called action tools (submit-number)
# Now we just need to get the reward
result = self.mcp.call_tool(
self.server_id,
self.reward_tool_name,
{}
)
# Parse the structured response
try:
data = json.loads(result.get("content", [{}])[0].get("text", "{}"))
reward = data.get("reward", 0.0)
done = data.get("done", False)
observation = json.dumps(data.get("state", {}), indent=2)
# Include debugging info
info = {
"raw_result": result,
"action": action,
"total_attempts": data.get("total_attempts", 0),
"recent_history": data.get("recent_history", [])
}
self.done = done
return observation, reward, done, info
except (json.JSONDecodeError, KeyError) as e:
# Handle errors gracefully
return f"Error: {str(e)}", 0.0, True, {"error": str(e), "raw_result": result}Design principles:
- Minimal wrapper: The wrapper doesn't duplicate state logic—that lives in the MCP server
- Single-step episodes: This example ends after one
step()call, but you can extend it for multi-step - Error handling: Gracefully handle JSON parsing errors and missing fields
- Info dict: Return debugging details to help diagnose issues
Step 4: Generate Training Data
Create a dataset generator that produces JSONL tasks:
import json
import random
from pathlib import Path
def generate_number_guessing_dataset(
output_path: str,
num_tasks: int = 100,
difficulty_levels: list = ["easy", "medium", "hard"]
):
"""Generate training tasks for the number guessing environment.
Args:
output_path: Where to save the JSONL file
num_tasks: Number of tasks to generate
difficulty_levels: Mix of difficulty settings
"""
tasks = []
for i in range(num_tasks):
# Vary difficulty
difficulty = random.choice(difficulty_levels)
if difficulty == "easy":
min_range, max_range = 1, 10
elif difficulty == "medium":
min_range, max_range = 1, 100
else: # hard
min_range, max_range = 1, 1000
# Create task specification
task = {
"id": f"number_guess_{i:04d}",
"server_id": "NumberGuessingGame",
"reward_tool": "get-reward",
"action_tool": "submit-number",
"init_params": {
"min_range": min_range,
"max_range": max_range
},
"prompt": f"Guess a number between {min_range} and {max_range}. Use the submit-number tool to make guesses. Try to get as close as possible!",
"hints": [
"Start with the midpoint of the range",
"Use binary search strategy for efficiency",
"Pay attention to 'too high' and 'too low' feedback"
],
"difficulty": difficulty,
"max_attempts": 10
}
tasks.append(task)
# Write JSONL
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w") as f:
for task in tasks:
f.write(json.dumps(task) + "\n")
print(f"Generated {len(tasks)} tasks to {output_path}")
if __name__ == "__main__":
generate_number_guessing_dataset(
"datasets/number_guessing_train.jsonl",
num_tasks=500
)Dataset design tips:
- Curriculum learning: Mix easy and hard tasks for better training
- Explicit hints: Guide agents toward good strategies
- Metadata: Include task IDs and difficulty for analysis
- Variation: Randomize parameters to improve generalization
Step 5: Configure the MCP Client
Register your server in the MCP client configuration:
{
"mcpServers": {
"NumberGuessingGame": {
"url": "http://localhost:8080/mcp",
"transport": "http"
}
}
}For stdio transport:
{
"mcpServers": {
"NumberGuessingGame": {
"command": "python",
"args": ["-m", "number_guessing.server"],
"transport": "stdio"
}
}
}Step 6: Run Your Environment Locally
Start the MCP Server (HTTP mode)
For HTTP deployment, wrap your server with the HTTP session manager:
from mcp.server.http import StreamableHTTPSessionManager
async def run_http_server(host: str = "127.0.0.1", port: int = 8080):
"""Run server over HTTP."""
manager = StreamableHTTPSessionManager(
app,
stateless=False # Critical for session persistence!
)
# Start HTTP server
import uvicorn
uvicorn.run(
manager.as_asgi_app(),
host=host,
port=port
)
if __name__ == "__main__":
import sys
if "--http" in sys.argv:
asyncio.run(run_http_server())
else:
asyncio.run(main()) # stdio modeRun it:
# HTTP mode
python -m number_guessing.server --http
# stdio mode
python -m number_guessing.serverTest with the MCP CLI
Before wiring into the RL harness, test your server:
# Install MCP CLI
pip install mcp-cli
# Test tool listing
mcp-cli call http://localhost:8080/mcp list_tools
# Test a guess
mcp-cli call http://localhost:8080/mcp call_tool \
--name submit-number \
--args '{"number": 50}'
# Get reward
mcp-cli call http://localhost:8080/mcp call_tool \
--name get-rewardRun a Training Job
Once everything works:
from runrl import RunRL
client = RunRL(api_key="your-key")
run = client.runs.create(
model="gpt-4",
dataset="datasets/number_guessing_train.jsonl",
algorithm="ppo",
config={
"environment": "NumberGuessingEnv",
"mcp_servers_config": "mcp_servers.json"
}
)
print(f"Started run: {run.id}")Advanced Patterns
Multi-Step Episodes
Extend the wrapper to support multiple steps before episode termination:
class MultiStepEnv:
def __init__(self):
self.max_steps = 10
self.current_step = 0
def step(self, action: str):
self.current_step += 1
# Get reward from MCP
result = self.mcp.call_tool(self.server_id, self.reward_tool_name, {})
data = json.loads(result["content"][0]["text"])
# End episode on max steps or success
done = (
self.current_step >= self.max_steps or
data.get("done", False)
)
return observation, reward, done, infoShaped Rewards
Return incremental rewards to guide learning:
def calculate_reward(self, state: SessionState) -> float:
"""Return dense reward signal."""
if not state.submissions:
return 0.0
last_guess = state.submissions[-1]
distance = abs(state.target_number - last_guess)
# Dense reward based on improvement
if len(state.submissions) > 1:
prev_distance = abs(state.target_number - state.submissions[-2])
improvement = prev_distance - distance
reward = improvement / (state.max_range - state.min_range)
else:
# First guess: reward based on distance
reward = 1.0 - (distance / (state.max_range - state.min_range))
# Bonus for correct guess
if distance == 0:
reward += 10.0
return rewardAsync Tool Calls
For concurrent environments, use async MCP calls:
async def async_step(self, action: str):
"""Async version of step for parallel environments."""
result = await self.mcp.call_tool_async(
self.server_id,
self.reward_tool_name,
{}
)
# Process result...
return observation, reward, done, infoSession Cleanup
Add session cleanup to prevent memory leaks:
@app.cleanup_session()
async def cleanup_session(session_id: str):
"""Clean up session when agent disconnects."""
with _session_lock:
if session_id in _session_state:
state = _session_state.pop(session_id)
print(f"Cleaned up session {session_id} after {state.total_attempts} attempts")Debugging Tips
1. Check Session Persistence
Verify sessions persist across tool calls:
# Call 1
result1 = mcp.call_tool("MyServer", "submit-number", {"number": 25})
# Call 2 - should see attempt count = 2
result2 = mcp.call_tool("MyServer", "get-reward", {})2. Inspect MCP Responses
Log raw MCP responses to understand structure:
result = self.mcp.call_tool(server, tool, args)
print(f"Raw MCP result: {json.dumps(result, indent=2)}")3. Validate Session IDs
Ensure the server receives stable session IDs:
def _extract_session_id(request_context: Any) -> str:
session_id = request_context.meta.get('sessionId', 'fallback')
print(f"Extracted session ID: {session_id}")
return session_id4. Monitor Memory Usage
Track session count and history size:
def get_stats() -> Dict:
with _session_lock:
return {
"active_sessions": len(_session_state),
"total_attempts": sum(s.total_attempts for s in _session_state.values()),
"avg_history_size": sum(len(s.recent_history) for s in _session_state.values()) / max(len(_session_state), 1)
}Best Practices
Security
- Validate inputs: Check tool arguments before processing
- Sanitize outputs: Don't leak sensitive state (like target numbers)
- Rate limiting: Add request limits to prevent abuse
- CORS policies: Configure allowed origins for HTTP servers
Performance
- Trim history: Limit stored history to prevent unbounded growth
- Lazy initialization: Create sessions on-demand
- Async operations: Use async/await for I/O-bound operations
- Session cleanup: Remove inactive sessions periodically
Testing
- Unit test state logic: Test
SessionStatemethods independently - Integration test tools: Verify tool calls return expected structures
- Load test sessions: Ensure server handles concurrent sessions
- Validate dataset: Check generated tasks are well-formed
Common Pitfalls
1. Forgetting stateless=False
For HTTP servers, you must use:
manager = StreamableHTTPSessionManager(app, stateless=False)Without this, sessions won't persist!
2. Not Trimming History
Always limit history size:
if len(self.history) > self.max_history:
self.history = self.history[-self.max_history:]3. Leaking Private State
Never include secret state in public observations:
# BAD - leaks target!
return {"target": self.target_number, ...}
# GOOD - only public info
return {"range": [self.min, self.max], ...}4. Blocking Async Operations
Use async properly:
# BAD - blocks event loop
time.sleep(1)
# GOOD - yields to event loop
await asyncio.sleep(1)Next Steps
Now that you understand stateful MCP environments, you can:
- Build complex tasks: Multi-step games, resource management, dialogue
- Add persistence: Save sessions to database for long-running episodes
- Implement multiplayer: Support multi-agent interactions in shared sessions
- Create benchmarks: Design evaluation suites for your environments
Additional Resources
Getting Help
If you encounter issues:
- Check server logs for errors
- Verify MCP client configuration
- Test tools with MCP CLI before integrating
- Review session ID extraction logic
- Contact support with session dumps and error logs