Sessions
A session is a running instance of an agent within an environment. Sessions maintain state across multiple interactions — conversation history, file system, and tool outputs persist until the session is archived or deleted.
Session lifecycle
create → idle ⇄ running → terminated
↓
rescheduling| Status | Description |
|---|---|
idle | Waiting for input. Ready to receive user events. |
running | Agent is actively executing (processing a message, calling tools). |
rescheduling | Transient error occurred, retrying automatically. |
terminated | Unrecoverable error. Session has ended. |
Valid transitions
| From | To | Trigger |
|---|---|---|
idle | running | User sends an event (e.g. user.message) |
running | idle | Agent finishes its turn |
running | rescheduling | Transient error (rate limit, timeout) |
rescheduling | running | Automatic retry succeeds |
running | terminated | Unrecoverable error |
rescheduling | terminated | All retries exhausted |
Terminal state
Once a session reaches terminated, it cannot accept new events. You must create a new session to continue the conversation. The terminated session's event history remains accessible for debugging.
Start a session
session = client.beta.sessions.create(
agent="agent_01HqR2k7vXbZ9mNpL3wYcT8f",
environment_id="env_cloud_default",
title="Research task"
)curl -X POST https://api.sandbase.ai/v1/sessions \
-H "Authorization: Bearer sk-sb-YOUR_KEY" \
-H "Content-Type: application/json" \
-d '{"agent": "agent_01HqR2k7...", "environment_id": "env_cloud_default"}'Send messages
Send a user message to trigger agent execution:
result = client.beta.sessions.events.create(
session_id=session.id,
events=[{
"type": "user.message",
"content": "Analyze the sales data and create a summary report."
}]
)The agent transitions to running, executes tools as needed, then returns to idle when done.
Event types
User events (you send)
| Type | Description |
|---|---|
user.message | Send a text message to the agent |
user.interrupt | Stop the agent mid-execution |
user.custom_tool_result | Return result for a custom tool call |
user.tool_confirmation | Approve or deny a tool call (for always_ask policy) |
Agent events (agent produces)
| Type | Description |
|---|---|
agent.message | Agent's text response |
agent.tool_use | Agent invokes a built-in tool |
agent.tool_result | Result of tool execution |
agent.custom_tool_use | Agent invokes a custom tool (client-side) |
agent.mcp_tool_use | Agent invokes an MCP tool |
Session events (system status)
| Type | Description |
|---|---|
session.status_idle | Agent finished, waiting for input |
session.status_running | Agent is actively executing |
session.error | Unrecoverable error occurred |
Stop reasons
When the agent finishes a turn (session.status_idle), the event includes a stop_reason:
| Reason | Meaning |
|---|---|
end_turn | Agent completed its response naturally |
requires_action | Agent needs client-side input (custom tool result or confirmation) |
user_interrupt | Agent was interrupted by the user |
Stream events in real-time
Use Server-Sent Events (SSE) to receive agent responses as they're produced:
import httpx
with httpx.stream("GET",
f"https://api.sandbase.ai/v1/sessions/{session.id}/events/stream",
headers={"Authorization": "Bearer sk-sb-YOUR_KEY"}
) as response:
for line in response.iter_lines():
if line.startswith("data: ") and line != "data: [DONE]":
event = json.loads(line[6:])
if event["type"] == "agent.message":
print(event["content"]["text"], end="")Interrupt a running agent
If the agent is taking too long or going in the wrong direction:
client.beta.sessions.events.create(
session_id=session.id,
events=[{"type": "user.interrupt"}]
)The agent stops execution and returns to idle.
Pin to a specific agent version
By default, sessions use the latest agent version. Pin to a specific version for reproducibility:
session = client.beta.sessions.create(
agent={"type": "agent", "id": "agent_01HqR2k7...", "version": 2},
environment_id="env_cloud_default"
)Error recovery patterns
Sessions can encounter errors at different stages. Here's how to handle each scenario.
Detecting errors
Listen for session.error events via SSE or poll the session status:
import json, httpx
def monitor_session(session_id: str, api_key: str):
"""Monitor session and detect errors."""
with httpx.stream("GET",
f"https://api.sandbase.ai/v1/sessions/{session_id}/events/stream",
headers={"Authorization": f"Bearer {api_key}"}
) as response:
for line in response.iter_lines():
if not line.startswith("data: ") or line == "data: [DONE]":
continue
event = json.loads(line[6:])
if event["type"] == "session.error":
handle_error(session_id, event)
elif event["type"] == "session.status_idle":
handle_turn_complete(event)Handling terminated sessions
When a session reaches terminated, create a new session and optionally replay context:
def recover_from_termination(old_session_id: str, agent_id: str):
"""Create a new session after termination."""
# 1. Retrieve conversation history from the old session
events = client.beta.sessions.events.list(session_id=old_session_id)
# 2. Create a fresh session
new_session = client.beta.sessions.create(
agent=agent_id,
environment_id="env_cloud_default",
title="Recovered session",
metadata={"recovered_from": old_session_id}
)
# 3. Send a summary of prior context as the first message
context_summary = build_context_summary(events)
client.beta.sessions.events.create(
session_id=new_session.id,
events=[{
"type": "user.message",
"content": f"Continue from where we left off. Context: {context_summary}"
}]
)
return new_sessionRetrying failed messages
If sending an event fails due to a transient error (network issue, 5xx response), retry with exponential backoff:
import time
import httpx
def send_with_retry(session_id: str, message: str, max_retries: int = 3):
"""Send a message with exponential backoff on transient failures."""
for attempt in range(max_retries):
try:
result = client.beta.sessions.events.create(
session_id=session_id,
events=[{"type": "user.message", "content": message}]
)
return result
except httpx.HTTPStatusError as e:
if e.response.status_code == 409:
# Session not idle — wait for current execution to finish
time.sleep(2 ** attempt)
continue
elif e.response.status_code >= 500:
# Server error — retry
time.sleep(2 ** attempt)
continue
else:
raise # Client error, don't retry
raise RuntimeError(f"Failed to send message after {max_retries} attempts")Handling conflict errors
A 409 Conflict means the session isn't in the expected state:
| Error | Cause | Resolution |
|---|---|---|
session is not idle | Agent is still running | Wait for session.status_idle event, then retry |
session is terminated | Session ended | Create a new session |
session is running, interrupt first | Trying to delete a running session | Send user.interrupt, wait for idle, then delete |
def wait_for_idle(session_id: str, timeout: int = 60):
"""Poll until session returns to idle state."""
import time
start = time.time()
while time.time() - start < timeout:
session = client.beta.sessions.get(session_id=session_id)
if session.status == "idle":
return session
if session.status == "terminated":
raise RuntimeError("Session terminated unexpectedly")
time.sleep(1)
raise TimeoutError(f"Session did not return to idle within {timeout}s")Recovering from tool failures
When a tool call fails, the agent may enter rescheduling state and retry automatically. If it ultimately fails:
- Check the event history for
agent.tool_resultevents with error content - Send a follow-up message guiding the agent to try an alternative approach
def handle_tool_failure(session_id: str):
"""Guide the agent after a tool failure."""
events = client.beta.sessions.events.list(session_id=session_id)
# Find the last tool error
last_error = None
for event in reversed(events.data):
if event.type == "agent.tool_result" and "error" in str(event.content):
last_error = event
break
if last_error:
client.beta.sessions.events.create(
session_id=session_id,
events=[{
"type": "user.message",
"content": "The previous tool call failed. Please try an alternative approach."
}]
)Timeout handling
For long-running agent tasks, use the interrupt mechanism as a timeout:
import threading
def send_with_timeout(session_id: str, message: str, timeout_seconds: int = 120):
"""Send a message and interrupt if the agent takes too long."""
# Send the message
client.beta.sessions.events.create(
session_id=session_id,
events=[{"type": "user.message", "content": message}]
)
# Set up a timeout interrupt
def interrupt_on_timeout():
session = client.beta.sessions.get(session_id=session_id)
if session.status == "running":
client.beta.sessions.events.create(
session_id=session_id,
events=[{"type": "user.interrupt"}]
)
timer = threading.Timer(timeout_seconds, interrupt_on_timeout)
timer.start()
# Wait for completion
session = wait_for_idle(session_id, timeout=timeout_seconds + 10)
timer.cancel()
return sessionUsage tracking
Every session tracks token usage cumulatively across all turns. Use this data to monitor costs and optimize agent behavior.
Session-level usage
The usage field on a session object contains cumulative token counts:
session = client.beta.sessions.get(session_id=session.id)
print(session.usage)
# {
# "input_tokens": 4520,
# "output_tokens": 1830
# }Per-event usage
Each event includes granular usage data when available:
| Field | Type | Description |
|---|---|---|
model_used | string | Which model processed this event |
tokens_in | int | Input tokens for this event |
tokens_out | int | Output tokens for this event |
duration_ms | int | Processing time in milliseconds |
events = client.beta.sessions.events.list(session_id=session.id)
total_tokens = 0
total_cost = 0.0
for event in events.data:
if event.tokens_in or event.tokens_out:
total_tokens += event.tokens_in + event.tokens_out
print(f" {event.type}: {event.tokens_in} in / {event.tokens_out} out "
f"({event.duration_ms}ms) — model: {event.model_used}")Cost estimation
Combine per-event token counts with model pricing to estimate session costs:
# Model pricing (per million tokens)
PRICING = {
"claude-sonnet-4-20250514": {"input": 3.00, "output": 15.00},
"gpt-4o": {"input": 2.50, "output": 10.00},
}
def estimate_session_cost(session_id: str) -> float:
"""Estimate total cost for a session based on token usage."""
events = client.beta.sessions.events.list(session_id=session_id)
total_cost = 0.0
for event in events.data:
if event.model_used and event.model_used in PRICING:
pricing = PRICING[event.model_used]
input_cost = (event.tokens_in / 1_000_000) * pricing["input"]
output_cost = (event.tokens_out / 1_000_000) * pricing["output"]
total_cost += input_cost + output_cost
return total_costTracking usage with metadata
Use session metadata to tag sessions for cost attribution:
session = client.beta.sessions.create(
agent=agent_id,
environment_id="env_cloud_default",
title="Customer support — ticket #4821",
metadata={
"user_id": "usr_abc123",
"team": "support",
"task_type": "ticket_resolution",
"budget_cents": 50
}
)Then aggregate costs by metadata fields in your billing system.
Budget guardrails
Monitor token usage during a session and interrupt if it exceeds a budget:
def check_budget(session_id: str, max_tokens: int = 50000):
"""Interrupt session if token usage exceeds budget."""
session = client.beta.sessions.get(session_id=session_id)
usage = session.usage or {}
total = usage.get("input_tokens", 0) + usage.get("output_tokens", 0)
if total > max_tokens:
client.beta.sessions.events.create(
session_id=session_id,
events=[{"type": "user.interrupt"}]
)
return True # Budget exceeded
return FalseSession lifecycle best practices
Creating sessions
- Set descriptive titles — makes it easy to find sessions in the dashboard and API listings
- Use metadata for tracking — attach user IDs, task types, request IDs, or budget limits
- Pin agent versions in production — avoid unexpected behavior changes from new agent versions
- Choose the right environment — match the environment to the tools and resources the agent needs
During execution
- Stream events for real-time feedback — use SSE instead of polling for responsive UIs
- Monitor token usage — check cumulative usage periodically to avoid runaway costs
- Set timeouts — interrupt agents that run too long rather than waiting indefinitely
- Handle
requires_actionstops — when the agent needs a custom tool result or confirmation, respond promptly to avoid stale context
Cleaning up
- Archive when done — archived sessions can't receive new events but history is preserved for auditing
- Delete for privacy — permanently removes all session data including event history
- Don't delete running sessions — interrupt first, wait for idle, then delete
Multi-session patterns
- One session per task — keep sessions focused on a single objective for cleaner history
- Fan-out for parallel work — create multiple sessions from the same agent for concurrent tasks
- Chain sessions for long workflows — when a task exceeds context limits, summarize and start a new session
Next steps
- Send Events API — Full endpoint reference
- Stream Events API — SSE streaming reference
- Agent Setup — Configure the agent that powers sessions
- MCP Tools — Attach tools to your agent's environment

