RunRL

Building Stateful MCP Environments

A comprehensive tutorial for creating stateful MCP environments for reinforcement learning

Building Stateful MCP Environments

This tutorial teaches you how to build stateful environments using the Model Context Protocol (MCP). You'll learn how to create environments that maintain state between agent interactions, enabling complex multi-step tasks for reinforcement learning.

What You'll Build

By the end of this tutorial, you'll understand how to:

  • Create an MCP server that maintains per-session state
  • Build a gym-style environment wrapper that communicates with your MCP server
  • Generate training datasets for your custom environment
  • Deploy and test your stateful environment locally

Prerequisites

  • Basic understanding of reinforcement learning concepts (states, actions, rewards)
  • Familiarity with Python and async programming
  • Understanding of REST APIs and HTTP concepts

Understanding the Architecture

A stateful MCP environment consists of four key components:

1. The MCP Server (State Owner)

The MCP server owns all gameplay state and exposes tools that agents can call. Unlike stateless APIs, it maintains session-specific data across multiple tool invocations.

Key responsibilities:

  • Store per-session state (scores, history, resources, etc.)
  • Expose tools for agent actions (submit, query, manipulate state)
  • Provide reward calculations and observations
  • Manage session lifecycle and cleanup

2. The Environment Wrapper

A minimal gym-style wrapper that translates between the RL harness and your MCP server.

Key responsibilities:

  • Initialize with server connection details
  • Forward agent actions to MCP tools
  • Extract rewards and observations from MCP responses
  • Signal episode termination

3. The Dataset Generator

Creates training tasks with initial prompts and state parameters.

Key responsibilities:

  • Generate JSONL files with task specifications
  • Include server IDs and tool names
  • Provide agent instructions and hints
  • Create curriculum variations (difficulty levels, parameters)

4. The MCP Client Config

Registers your server so the runtime can discover and connect to it.

Key responsibilities:

  • Map server names to connection endpoints
  • Specify transport types (HTTP, stdio)
  • Configure authentication if needed

Tutorial: Building a Number Guessing Game

Let's build a complete stateful environment step by step. We'll create a game where agents guess numbers within a range and receive feedback.

Step 1: Define Your Session State

First, define what data you need to track per session:

from dataclasses import dataclass, field
from typing import List, Dict, Any
from datetime import datetime

@dataclass
class SessionState:
    """Tracks state for a single agent session."""

    # Game configuration
    min_range: int = 1
    max_range: int = 100
    target_number: int = 50

    # Tracking submissions
    submissions: List[int] = field(default_factory=list)
    best_score: float = 0.0
    total_attempts: int = 0

    # History for observation
    recent_history: List[Dict[str, Any]] = field(default_factory=list)
    max_history: int = 25

    # Session metadata
    created_at: datetime = field(default_factory=datetime.now)

    def add_submission(self, number: int) -> Dict[str, Any]:
        """Record a guess and calculate feedback."""
        self.total_attempts += 1
        self.submissions.append(number)

        # Calculate distance-based score
        distance = abs(self.target_number - number)
        max_distance = self.max_range - self.min_range
        score = max(0.0, 1.0 - (distance / max_distance))

        # Update best score
        if score > self.best_score:
            self.best_score = score

        # Create feedback
        if number < self.target_number:
            hint = "too low"
        elif number > self.target_number:
            hint = "too high"
        else:
            hint = "correct"

        # Add to history
        entry = {
            "attempt": self.total_attempts,
            "guess": number,
            "hint": hint,
            "score": score,
            "distance": distance
        }

        self.recent_history.append(entry)

        # Trim history to prevent unbounded growth
        if len(self.recent_history) > self.max_history:
            self.recent_history = self.recent_history[-self.max_history:]

        return entry

    def to_public_dict(self) -> Dict[str, Any]:
        """Return state visible to the agent (no target!)."""
        return {
            "range": [self.min_range, self.max_range],
            "attempts": self.total_attempts,
            "best_score": self.best_score,
            "recent_history": self.recent_history[-5:]  # Last 5 only
        }

    def to_private_dict(self) -> Dict[str, Any]:
        """Return full state for debugging."""
        return {
            "target": self.target_number,
            "range": [self.min_range, self.max_range],
            "attempts": self.total_attempts,
            "submissions": self.submissions,
            "best_score": self.best_score,
            "history_length": len(self.recent_history)
        }

Design tips:

  • Keep public vs. private state separate (don't leak the target!)
  • Limit history size to prevent memory bloat
  • Store both raw data and computed metrics
  • Make state serializable for debugging

Step 2: Build the MCP Server

Now create the MCP server that manages sessions and exposes tools:

import asyncio
import json
import random
from typing import Any, Dict, Optional
from threading import Lock

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

# Session storage
_session_state: Dict[str, SessionState] = {}
_session_lock = Lock()

def _extract_session_id(request_context: Any) -> str:
    """Extract stable session ID from request metadata."""
    # The MCP protocol passes session info in request context
    meta = getattr(request_context, 'meta', {})
    session_id = meta.get('sessionId', 'default')
    return session_id

def _get_session(session_id: str, init_params: Optional[Dict] = None) -> SessionState:
    """Get or create session state."""
    with _session_lock:
        if session_id not in _session_state:
            # Initialize new session
            params = init_params or {}
            min_val = params.get('min_range', 1)
            max_val = params.get('max_range', 100)
            target = params.get('target')

            if target is None:
                target = random.randint(min_val, max_val)

            _session_state[session_id] = SessionState(
                min_range=min_val,
                max_range=max_val,
                target_number=target
            )

        return _session_state[session_id]

# Create the MCP server
app = Server("number-guessing-game")

@app.list_tools()
async def list_tools() -> list[Tool]:
    """List available tools for agents."""
    return [
        Tool(
            name="submit-number",
            description="Submit a number guess and receive feedback",
            inputSchema={
                "type": "object",
                "properties": {
                    "number": {
                        "type": "integer",
                        "description": "Your guess"
                    },
                    "init": {
                        "type": "object",
                        "description": "Optional initialization params for new sessions",
                        "properties": {
                            "min_range": {"type": "integer"},
                            "max_range": {"type": "integer"},
                            "target": {"type": "integer"}
                        }
                    }
                },
                "required": ["number"]
            }
        ),
        Tool(
            name="get-reward",
            description="Get current reward and observation",
            inputSchema={
                "type": "object",
                "properties": {}
            }
        )
    ]

@app.call_tool()
async def call_tool(name: str, arguments: Any, request_context: Any) -> list[TextContent]:
    """Handle tool calls from agents."""
    session_id = _extract_session_id(request_context)

    if name == "submit-number":
        # Extract arguments
        number = arguments.get("number")
        init_params = arguments.get("init")

        if number is None:
            return [TextContent(
                type="text",
                text=json.dumps({
                    "error": "Missing required parameter: number"
                })
            )]

        # Get session and record submission
        session = _get_session(session_id, init_params)
        result = session.add_submission(number)

        # Return human-readable and structured response
        response = {
            "message": f"You guessed {number}. That's {result['hint']}!",
            "score": result["score"],
            "distance": result["distance"],
            "attempt": result["attempt"],
            "state": session.to_public_dict()
        }

        return [TextContent(
            type="text",
            text=json.dumps(response, indent=2)
        )]

    elif name == "get-reward":
        # Retrieve current session state
        session = _get_session(session_id)

        # Return reward and full observation
        response = {
            "reward": session.best_score,
            "total_attempts": session.total_attempts,
            "state": session.to_public_dict(),
            "recent_history": session.recent_history[-10:],
            "done": session.best_score >= 1.0  # Perfect guess ends episode
        }

        return [TextContent(
            type="text",
            text=json.dumps(response, indent=2)
        )]

    else:
        return [TextContent(
            type="text",
            text=json.dumps({"error": f"Unknown tool: {name}"})
        )]

async def main():
    """Run the MCP server."""
    async with stdio_server() as (read_stream, write_stream):
        await app.run(
            read_stream,
            write_stream,
            app.create_initialization_options()
        )

if __name__ == "__main__":
    asyncio.run(main())

Key patterns:

  • Session management: Use _extract_session_id() to get a stable session key from request metadata
  • Thread safety: Protect shared session dictionary with a lock
  • Tool schema: Define clear input schemas so agents know what parameters to send
  • Structured responses: Return both human-readable text and machine-parseable JSON
  • Lazy initialization: Create sessions on first access with optional parameters

Step 3: Create the Environment Wrapper

Build a minimal gym-style wrapper that connects to your MCP server:

import json
from typing import Any, Dict, Tuple

class NumberGuessingEnv:
    """Gym-style wrapper for the number guessing MCP server."""

    def __init__(self):
        self.mcp = None  # Set by harness during setup
        self.server_id = None
        self.reward_tool_name = None
        self.action_tool_name = None
        self.done = False

    def setup(self, **state: Any) -> str:
        """Initialize environment from task state.

        Args:
            state: Task configuration containing:
                - server_id: Which MCP server to use
                - reward_tool: Tool name for getting rewards
                - action_tool: Tool name for submitting actions
                - prompt: Initial instruction for the agent

        Returns:
            The initial observation (prompt text)
        """
        # Extract required configuration
        self.server_id = state.get("server_id", "NumberGuessingGame")
        self.reward_tool_name = state.get("reward_tool", "get-reward")
        self.action_tool_name = state.get("action_tool", "submit-number")

        # Get initial prompt
        prompt = state.get("prompt", "Guess the target number!")

        # Optionally initialize server with parameters
        init_params = state.get("init_params", {})
        if init_params:
            # Make a dummy call to initialize the session
            self.mcp.call_tool(
                self.server_id,
                self.action_tool_name,
                {"number": (init_params.get("min_range", 1) + init_params.get("max_range", 100)) // 2, "init": init_params}
            )

        self.done = False
        return prompt

    def step(self, action: str) -> Tuple[str, float, bool, Dict[str, Any]]:
        """Execute one environment step.

        Args:
            action: The agent's action (typically a tool call in text form)

        Returns:
            observation: Text observation for next step
            reward: Scalar reward value
            done: Whether episode is complete
            info: Auxiliary information for debugging
        """
        # The agent has already called action tools (submit-number)
        # Now we just need to get the reward

        result = self.mcp.call_tool(
            self.server_id,
            self.reward_tool_name,
            {}
        )

        # Parse the structured response
        try:
            data = json.loads(result.get("content", [{}])[0].get("text", "{}"))

            reward = data.get("reward", 0.0)
            done = data.get("done", False)
            observation = json.dumps(data.get("state", {}), indent=2)

            # Include debugging info
            info = {
                "raw_result": result,
                "action": action,
                "total_attempts": data.get("total_attempts", 0),
                "recent_history": data.get("recent_history", [])
            }

            self.done = done

            return observation, reward, done, info

        except (json.JSONDecodeError, KeyError) as e:
            # Handle errors gracefully
            return f"Error: {str(e)}", 0.0, True, {"error": str(e), "raw_result": result}

    def reset(self) -> str:
        """Reset environment (handled by session cleanup in server)."""
        self.done = False
        return "Environment reset. Start a new session."

Design principles:

  • Minimal wrapper: The wrapper doesn't duplicate state logic—that lives in the MCP server
  • Single-step episodes: This example ends after one step() call, but you can extend it for multi-step
  • Error handling: Gracefully handle JSON parsing errors and missing fields
  • Info dict: Return debugging details to help diagnose issues

Step 4: Generate Training Data

Create a dataset generator that produces JSONL tasks:

import json
import random
from pathlib import Path

def generate_number_guessing_dataset(
    output_path: str,
    num_tasks: int = 100,
    difficulty_levels: list = ["easy", "medium", "hard"]
):
    """Generate training tasks for the number guessing environment.

    Args:
        output_path: Where to save the JSONL file
        num_tasks: Number of tasks to generate
        difficulty_levels: Mix of difficulty settings
    """

    tasks = []

    for i in range(num_tasks):
        # Vary difficulty
        difficulty = random.choice(difficulty_levels)

        if difficulty == "easy":
            min_range, max_range = 1, 10
        elif difficulty == "medium":
            min_range, max_range = 1, 100
        else:  # hard
            min_range, max_range = 1, 1000

        # Create task specification
        task = {
            "id": f"number_guess_{i:04d}",
            "server_id": "NumberGuessingGame",
            "reward_tool": "get-reward",
            "action_tool": "submit-number",
            "init_params": {
                "min_range": min_range,
                "max_range": max_range
            },
            "prompt": f"Guess a number between {min_range} and {max_range}. Use the submit-number tool to make guesses. Try to get as close as possible!",
            "hints": [
                "Start with the midpoint of the range",
                "Use binary search strategy for efficiency",
                "Pay attention to 'too high' and 'too low' feedback"
            ],
            "difficulty": difficulty,
            "max_attempts": 10
        }

        tasks.append(task)

    # Write JSONL
    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)

    with output_path.open("w") as f:
        for task in tasks:
            f.write(json.dumps(task) + "\n")

    print(f"Generated {len(tasks)} tasks to {output_path}")

if __name__ == "__main__":
    generate_number_guessing_dataset(
        "datasets/number_guessing_train.jsonl",
        num_tasks=500
    )

Dataset design tips:

  • Curriculum learning: Mix easy and hard tasks for better training
  • Explicit hints: Guide agents toward good strategies
  • Metadata: Include task IDs and difficulty for analysis
  • Variation: Randomize parameters to improve generalization

Step 5: Configure the MCP Client

Register your server in the MCP client configuration:

{
  "mcpServers": {
    "NumberGuessingGame": {
      "url": "http://localhost:8080/mcp",
      "transport": "http"
    }
  }
}

For stdio transport:

{
  "mcpServers": {
    "NumberGuessingGame": {
      "command": "python",
      "args": ["-m", "number_guessing.server"],
      "transport": "stdio"
    }
  }
}

Step 6: Run Your Environment Locally

Start the MCP Server (HTTP mode)

For HTTP deployment, wrap your server with the HTTP session manager:

from mcp.server.http import StreamableHTTPSessionManager

async def run_http_server(host: str = "127.0.0.1", port: int = 8080):
    """Run server over HTTP."""
    manager = StreamableHTTPSessionManager(
        app,
        stateless=False  # Critical for session persistence!
    )

    # Start HTTP server
    import uvicorn
    uvicorn.run(
        manager.as_asgi_app(),
        host=host,
        port=port
    )

if __name__ == "__main__":
    import sys
    if "--http" in sys.argv:
        asyncio.run(run_http_server())
    else:
        asyncio.run(main())  # stdio mode

Run it:

# HTTP mode
python -m number_guessing.server --http

# stdio mode
python -m number_guessing.server

Test with the MCP CLI

Before wiring into the RL harness, test your server:

# Install MCP CLI
pip install mcp-cli

# Test tool listing
mcp-cli call http://localhost:8080/mcp list_tools

# Test a guess
mcp-cli call http://localhost:8080/mcp call_tool \
  --name submit-number \
  --args '{"number": 50}'

# Get reward
mcp-cli call http://localhost:8080/mcp call_tool \
  --name get-reward

Run a Training Job

Once everything works:

from runrl import RunRL

client = RunRL(api_key="your-key")

run = client.runs.create(
    model="gpt-4",
    dataset="datasets/number_guessing_train.jsonl",
    algorithm="ppo",
    config={
        "environment": "NumberGuessingEnv",
        "mcp_servers_config": "mcp_servers.json"
    }
)

print(f"Started run: {run.id}")

Advanced Patterns

Multi-Step Episodes

Extend the wrapper to support multiple steps before episode termination:

class MultiStepEnv:
    def __init__(self):
        self.max_steps = 10
        self.current_step = 0

    def step(self, action: str):
        self.current_step += 1

        # Get reward from MCP
        result = self.mcp.call_tool(self.server_id, self.reward_tool_name, {})
        data = json.loads(result["content"][0]["text"])

        # End episode on max steps or success
        done = (
            self.current_step >= self.max_steps or
            data.get("done", False)
        )

        return observation, reward, done, info

Shaped Rewards

Return incremental rewards to guide learning:

def calculate_reward(self, state: SessionState) -> float:
    """Return dense reward signal."""
    if not state.submissions:
        return 0.0

    last_guess = state.submissions[-1]
    distance = abs(state.target_number - last_guess)

    # Dense reward based on improvement
    if len(state.submissions) > 1:
        prev_distance = abs(state.target_number - state.submissions[-2])
        improvement = prev_distance - distance
        reward = improvement / (state.max_range - state.min_range)
    else:
        # First guess: reward based on distance
        reward = 1.0 - (distance / (state.max_range - state.min_range))

    # Bonus for correct guess
    if distance == 0:
        reward += 10.0

    return reward

Async Tool Calls

For concurrent environments, use async MCP calls:

async def async_step(self, action: str):
    """Async version of step for parallel environments."""
    result = await self.mcp.call_tool_async(
        self.server_id,
        self.reward_tool_name,
        {}
    )

    # Process result...
    return observation, reward, done, info

Session Cleanup

Add session cleanup to prevent memory leaks:

@app.cleanup_session()
async def cleanup_session(session_id: str):
    """Clean up session when agent disconnects."""
    with _session_lock:
        if session_id in _session_state:
            state = _session_state.pop(session_id)
            print(f"Cleaned up session {session_id} after {state.total_attempts} attempts")

Debugging Tips

1. Check Session Persistence

Verify sessions persist across tool calls:

# Call 1
result1 = mcp.call_tool("MyServer", "submit-number", {"number": 25})

# Call 2 - should see attempt count = 2
result2 = mcp.call_tool("MyServer", "get-reward", {})

2. Inspect MCP Responses

Log raw MCP responses to understand structure:

result = self.mcp.call_tool(server, tool, args)
print(f"Raw MCP result: {json.dumps(result, indent=2)}")

3. Validate Session IDs

Ensure the server receives stable session IDs:

def _extract_session_id(request_context: Any) -> str:
    session_id = request_context.meta.get('sessionId', 'fallback')
    print(f"Extracted session ID: {session_id}")
    return session_id

4. Monitor Memory Usage

Track session count and history size:

def get_stats() -> Dict:
    with _session_lock:
        return {
            "active_sessions": len(_session_state),
            "total_attempts": sum(s.total_attempts for s in _session_state.values()),
            "avg_history_size": sum(len(s.recent_history) for s in _session_state.values()) / max(len(_session_state), 1)
        }

Best Practices

Security

  • Validate inputs: Check tool arguments before processing
  • Sanitize outputs: Don't leak sensitive state (like target numbers)
  • Rate limiting: Add request limits to prevent abuse
  • CORS policies: Configure allowed origins for HTTP servers

Performance

  • Trim history: Limit stored history to prevent unbounded growth
  • Lazy initialization: Create sessions on-demand
  • Async operations: Use async/await for I/O-bound operations
  • Session cleanup: Remove inactive sessions periodically

Testing

  • Unit test state logic: Test SessionState methods independently
  • Integration test tools: Verify tool calls return expected structures
  • Load test sessions: Ensure server handles concurrent sessions
  • Validate dataset: Check generated tasks are well-formed

Common Pitfalls

1. Forgetting stateless=False

For HTTP servers, you must use:

manager = StreamableHTTPSessionManager(app, stateless=False)

Without this, sessions won't persist!

2. Not Trimming History

Always limit history size:

if len(self.history) > self.max_history:
    self.history = self.history[-self.max_history:]

3. Leaking Private State

Never include secret state in public observations:

# BAD - leaks target!
return {"target": self.target_number, ...}

# GOOD - only public info
return {"range": [self.min, self.max], ...}

4. Blocking Async Operations

Use async properly:

# BAD - blocks event loop
time.sleep(1)

# GOOD - yields to event loop
await asyncio.sleep(1)

Next Steps

Now that you understand stateful MCP environments, you can:

  • Build complex tasks: Multi-step games, resource management, dialogue
  • Add persistence: Save sessions to database for long-running episodes
  • Implement multiplayer: Support multi-agent interactions in shared sessions
  • Create benchmarks: Design evaluation suites for your environments

Additional Resources

Getting Help

If you encounter issues:

  1. Check server logs for errors
  2. Verify MCP client configuration
  3. Test tools with MCP CLI before integrating
  4. Review session ID extraction logic
  5. Contact support with session dumps and error logs