Table of Contents
Module spoon_ai.agents.base
ThreadSafeOutputQueue Objects​
class ThreadSafeOutputQueue()
Thread-safe output queue with fair access and timeout protection
get​
async def get(timeout: Optional[float] = 30.0) -> Any
Get item with timeout and fair access
BaseAgent Objects​
class BaseAgent(BaseModel, ABC)
Thread-safe base class for all agents with proper concurrency handling.
add_message​
async def add_message(role: Literal["user", "assistant", "tool"],
content: str,
tool_call_id: Optional[str] = None,
tool_calls: Optional[List[ToolCall]] = None,
tool_name: Optional[str] = None,
timeout: Optional[float] = None) -> None
Thread-safe message addition with timeout protection
state_context​
@asynccontextmanager
async def state_context(new_state: AgentState,
timeout: Optional[float] = None)
Thread-safe state context manager with deadlock prevention. Acquires the state lock only to perform quick transitions, not for the duration of the work inside the context, avoiding long-held locks and false timeouts during network calls.
run​
async def run(request: Optional[str] = None,
timeout: Optional[float] = None) -> str
Thread-safe run method with proper concurrency control and callback support.
step​
async def step(run_id: Optional[uuid.UUID] = None) -> str
Override this method in subclasses - now with step-level locking and callback support.
is_stuck​
async def is_stuck() -> bool
Thread-safe stuck detection
handle_stuck_state​
async def handle_stuck_state()
Thread-safe stuck state handling
add_documents​
def add_documents(documents) -> None
Store documents on the agent so CLI load-docs works without RAG mixin.
This default implementation keeps the documents in-memory under self._loaded_documents. Agents that support retrieval should override this method to index documents into their vector store.
save_chat_history​
def save_chat_history()
Thread-safe chat history saving
stream​
async def stream(timeout: Optional[float] = None)
Thread-safe streaming with proper cleanup and timeout
process_mcp_message​
async def process_mcp_message(content: Any,
sender: str,
message: Dict[str, Any],
agent_id: str,
timeout: Optional[float] = None)
Thread-safe MCP message processing with timeout protection
shutdown​
async def shutdown(timeout: float = 30.0)
Graceful shutdown with cleanup of active operations
get_diagnostics​
def get_diagnostics() -> Dict[str, Any]
Get diagnostic information about the agent's state