Skip to main content

Table of Contents

Module spoon_ai.agents.base

ThreadSafeOutputQueue Objects​

class ThreadSafeOutputQueue()

Thread-safe output queue with fair access and timeout protection

get​

async def get(timeout: Optional[float] = 30.0) -> Any

Get item with timeout and fair access

BaseAgent Objects​

class BaseAgent(BaseModel, ABC)

Thread-safe base class for all agents with proper concurrency handling.

add_message​

async def add_message(role: Literal["user", "assistant", "tool"],
content: str,
tool_call_id: Optional[str] = None,
tool_calls: Optional[List[ToolCall]] = None,
tool_name: Optional[str] = None,
timeout: Optional[float] = None) -> None

Thread-safe message addition with timeout protection

state_context​

@asynccontextmanager
async def state_context(new_state: AgentState,
timeout: Optional[float] = None)

Thread-safe state context manager with deadlock prevention. Acquires the state lock only to perform quick transitions, not for the duration of the work inside the context, avoiding long-held locks and false timeouts during network calls.

run​

async def run(request: Optional[str] = None,
timeout: Optional[float] = None) -> str

Thread-safe run method with proper concurrency control and callback support.

step​

async def step(run_id: Optional[uuid.UUID] = None) -> str

Override this method in subclasses - now with step-level locking and callback support.

is_stuck​

async def is_stuck() -> bool

Thread-safe stuck detection

handle_stuck_state​

async def handle_stuck_state()

Thread-safe stuck state handling

add_documents​

def add_documents(documents) -> None

Store documents on the agent so CLI load-docs works without RAG mixin.

This default implementation keeps the documents in-memory under self._loaded_documents. Agents that support retrieval should override this method to index documents into their vector store.

save_chat_history​

def save_chat_history()

Thread-safe chat history saving

stream​

async def stream(timeout: Optional[float] = None)

Thread-safe streaming with proper cleanup and timeout

process_mcp_message​

async def process_mcp_message(content: Any,
sender: str,
message: Dict[str, Any],
agent_id: str,
timeout: Optional[float] = None)

Thread-safe MCP message processing with timeout protection

shutdown​

async def shutdown(timeout: float = 30.0)

Graceful shutdown with cleanup of active operations

get_diagnostics​

def get_diagnostics() -> Dict[str, Any]

Get diagnostic information about the agent's state