Skip to main content

Table of Contents

Module spoon_ai

Module spoon_ai.graph

Graph-based execution system for SpoonOS agents.

This module provides a LangGraph-inspired framework with advanced features:

  • State management with TypedDict and reducers
  • LLM Manager integration
  • Error handling and recovery
  • Human-in-the-loop patterns
  • Multi-agent coordination
  • Comprehensive testing support
  • Checkpointing and persistence

GraphExecutionError Objects

class GraphExecutionError(Exception)

Raised when graph execution encounters an error.

NodeExecutionError Objects

class NodeExecutionError(Exception)

Raised when a node fails to execute.

StateValidationError Objects

class StateValidationError(Exception)

Raised when state validation fails.

CheckpointError Objects

class CheckpointError(Exception)

Raised when checkpoint operations fail.

GraphConfigurationError Objects

class GraphConfigurationError(Exception)

Raised when graph configuration is invalid.

EdgeRoutingError Objects

class EdgeRoutingError(Exception)

Raised when edge routing fails.

InterruptError Objects

class InterruptError(Exception)

Raised when graph execution is interrupted for human input.

Command Objects

@dataclass
class Command()

Command object for controlling graph flow and state updates.

StateSnapshot Objects

@dataclass
class StateSnapshot()

Snapshot of graph state at a specific point in time.

InMemoryCheckpointer Objects

class InMemoryCheckpointer()

Simple in-memory checkpointer for development and testing.

This checkpointer stores state snapshots in memory and provides basic checkpoint management functionality. For production use, consider using persistent checkpointers like Redis or PostgreSQL.

__init__

def __init__(max_checkpoints_per_thread: int = 100)

Initialize the in-memory checkpointer.

Arguments:

  • max_checkpoints_per_thread - Maximum number of checkpoints to keep per thread

save_checkpoint

def save_checkpoint(thread_id: str, snapshot: StateSnapshot) -> None

Save a checkpoint for a thread.

Arguments:

  • thread_id - Unique identifier for the thread
  • snapshot - State snapshot to save

Raises:

  • CheckpointError - If checkpoint saving fails

get_checkpoint

def get_checkpoint(thread_id: str,
checkpoint_id: str = None) -> Optional[StateSnapshot]

Get a specific checkpoint or the latest one.

Arguments:

  • thread_id - Unique identifier for the thread
  • checkpoint_id - Optional specific checkpoint ID

Returns:

StateSnapshot or None if not found

Raises:

  • CheckpointError - If checkpoint retrieval fails

list_checkpoints

def list_checkpoints(thread_id: str) -> List[StateSnapshot]

List all checkpoints for a thread.

Arguments:

  • thread_id - Unique identifier for the thread

Returns:

List of state snapshots

Raises:

  • CheckpointError - If checkpoint listing fails

clear_thread

def clear_thread(thread_id: str) -> None

Clear all checkpoints for a thread.

Arguments:

  • thread_id - Unique identifier for the thread

get_stats

def get_stats() -> Dict[str, Any]

Get checkpointer statistics.

Returns:

Dictionary with statistics

add_messages

def add_messages(existing: List[Any], new: List[Any]) -> List[Any]

Reducer function for adding messages to a list.

interrupt

def interrupt(data: Dict[str, Any]) -> Any

Interrupt execution and wait for human input.

StateGraph Objects

class StateGraph()

Enhanced StateGraph with LangGraph-inspired features and SpoonOS integration.

Features:

  • TypedDict state management with reducers
  • LLM Manager integration
  • Error handling and recovery
  • Human-in-the-loop patterns
  • Checkpointing and persistence
  • Multi-agent coordination support

__init__

def __init__(state_schema: type, checkpointer: InMemoryCheckpointer = None)

Initialize the enhanced state graph.

Arguments:

  • state_schema - TypedDict class defining the state structure
  • checkpointer - Optional checkpointer for state persistence

add_node

def add_node(name: str, action: Callable) -> "StateGraph"

Add a node to the graph.

Arguments:

  • name - Unique identifier for the node
  • action - Function or coroutine that processes the state Should accept state dict and return dict of updates or Command

Returns:

Self for method chaining

Raises:

  • GraphConfigurationError - If node name already exists or is invalid

add_llm_node

def add_llm_node(
name: str,
system_prompt: str,
provider: Optional[str] = None,
model_params: Optional[Dict[str, Any]] = None) -> "StateGraph"

Add an LLM-powered node to the graph.

Arguments:

  • name - Unique identifier for the node
  • system_prompt - System prompt for the LLM
  • provider - Specific LLM provider to use
  • model_params - Parameters for the LLM call

Returns:

Self for method chaining

add_edge

def add_edge(start_node: str, end_node: str) -> "StateGraph"

Add a direct, unconditional edge between two nodes.

Arguments:

  • start_node - Name of the source node (or "START")
  • end_node - Name of the destination node (or "END")

Returns:

Self for method chaining

Raises:

  • GraphConfigurationError - If nodes don't exist or edge is invalid

add_conditional_edges

def add_conditional_edges(start_node: str,
condition: Callable[[Dict[str, Any]], str],
path_map: Dict[str, str]) -> "StateGraph"

Add conditional edges that route to different nodes based on state.

Arguments:

  • start_node - Name of the source node
  • condition - Function that takes state and returns a key from path_map
  • path_map - Mapping from condition results to destination node names

Returns:

Self for method chaining

Raises:

  • GraphConfigurationError - If configuration is invalid

set_entry_point

def set_entry_point(node_name: str) -> "StateGraph"

Set the starting node for graph execution.

Arguments:

  • node_name - Name of the node to start execution from

Returns:

Self for method chaining

Raises:

  • GraphConfigurationError - If entry point node doesn't exist

compile

def compile() -> "CompiledGraph"

Compile the graph into an executable form.

Returns:

CompiledGraph instance ready for execution

Raises:

  • GraphConfigurationError - If graph configuration is invalid

CompiledGraph Objects

class CompiledGraph()

Executable version of a StateGraph with advanced features.

__init__

def __init__(graph: StateGraph)

Initialize with a compiled StateGraph.

invoke

async def invoke(initial_state: Optional[Dict[str, Any]] = None,
config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]

Execute the graph from the entry point.

stream

async def stream(initial_state: Optional[Dict[str, Any]] = None,
config: Optional[Dict[str, Any]] = None,
stream_mode: str = "values")

Stream graph execution with different modes.

get_execution_history

def get_execution_history() -> List[Dict[str, Any]]

Get the execution history for debugging and analysis.

get_state

def get_state(config: Dict[str, Any]) -> Optional[StateSnapshot]

Get the current state snapshot for a thread.

Module spoon_ai.llm.vlm_provider.gemini

GeminiConfig Objects

class GeminiConfig(LLMConfig)

Gemini Configuration

validate_api_key

@model_validator(mode='after')
def validate_api_key()

Validate that API key is provided

GeminiProvider Objects

@LLMFactory.register("gemini")
class GeminiProvider(LLMBase)

Gemini Provider Implementation

__init__

def __init__(config_path: str = "config/config.toml",
config_name: str = "chitchat")

Initialize Gemini Provider

Arguments:

  • config_path - Configuration file path
  • config_name - Configuration name

Raises:

  • ValueError - If GEMINI_API_KEY environment variable is not set

chat

async def chat(messages: List[Message],
system_msgs: Optional[List[Message]] = None,
response_modalities: Optional[List[str]] = None,
**kwargs) -> LLMResponse

Send chat request to Gemini and get response

Arguments:

  • messages - List of messages
  • system_msgs - List of system messages
  • response_modalities - List of response modalities (optional, e.g. ['Text', 'Image'])
  • **kwargs - Other parameters

Returns:

  • LLMResponse - LLM response

completion

async def completion(prompt: str, **kwargs) -> LLMResponse

Send text completion request to Gemini and get response

Arguments:

  • prompt - Prompt text
  • **kwargs - Other parameters

Returns:

  • LLMResponse - LLM response

chat_with_tools

async def chat_with_tools(messages: List[Message],
system_msgs: Optional[List[Message]] = None,
tools: Optional[List[Dict]] = None,
tool_choice: Literal["none", "auto",
"required"] = "auto",
**kwargs) -> LLMResponse

Send chat request to Gemini that may contain tool calls and get response

Note: Gemini currently doesn't support tool calls, this method will use regular chat method

Arguments:

  • messages - List of messages
  • system_msgs - List of system messages
  • tools - List of tools (not supported by Gemini)
  • tool_choice - Tool choice mode (not supported by Gemini)
  • **kwargs - Other parameters

Returns:

  • LLMResponse - LLM response

generate_content

async def generate_content(model: Optional[str] = None,
contents: Union[str, List, types.Content,
types.Part] = None,
config: Optional[
types.GenerateContentConfig] = None,
**kwargs) -> LLMResponse

Directly call Gemini's generate_content interface

Arguments:

  • model - Model name (optional, will override model in configuration)
  • contents - Request content, can be string, list, or types.Content/types.Part object
  • config - Generation configuration
  • **kwargs - Other parameters

Returns:

  • LLMResponse - LLM response

Module spoon_ai.llm.vlm_provider.base

LLMConfig Objects

class LLMConfig(BaseModel)

Base class for LLM configuration

LLMResponse Objects

class LLMResponse(BaseModel)

Base class for LLM response

text

Original text response

LLMBase Objects

class LLMBase(ABC)

Base abstract class for LLM, defining interfaces that all LLM providers must implement

__init__

def __init__(config_path: str = "config/config.toml",
config_name: str = "llm")

Initialize LLM interface

Arguments:

  • config_path - Configuration file path
  • config_name - Configuration name

chat

@abstractmethod
async def chat(messages: List[Message],
system_msgs: Optional[List[Message]] = None,
**kwargs) -> LLMResponse

Send chat request to LLM and get response

Arguments:

  • messages - List of messages
  • system_msgs - List of system messages
  • **kwargs - Other parameters

Returns:

  • LLMResponse - LLM response

completion

@abstractmethod
async def completion(prompt: str, **kwargs) -> LLMResponse

Send text completion request to LLM and get response

Arguments:

  • prompt - Prompt text
  • **kwargs - Other parameters

Returns:

  • LLMResponse - LLM response

chat_with_tools

@abstractmethod
async def chat_with_tools(messages: List[Message],
system_msgs: Optional[List[Message]] = None,
tools: Optional[List[Dict]] = None,
tool_choice: Literal["none", "auto",
"required"] = "auto",
**kwargs) -> LLMResponse

Send chat request that may contain tool calls to LLM and get response

Arguments:

  • messages - List of messages
  • system_msgs - List of system messages
  • tools - List of tools
  • tool_choice - Tool selection mode
  • **kwargs - Other parameters

Returns:

  • LLMResponse - LLM response

generate_image

async def generate_image(prompt: str, **kwargs) -> Union[str, List[str]]

Generate image (optional implementation)

Arguments:

  • prompt - Prompt text
  • **kwargs - Other parameters

Returns:

Union[str, List[str]]: Image URL or list of URLs

reset_output_handler

def reset_output_handler()

Reset output handler

Module spoon_ai.llm.factory

LLMFactory Objects

class LLMFactory()

LLM factory class, used to create different LLM instances

register

@classmethod
def register(cls, provider_name: str)

Register LLM provider

Arguments:

  • provider_name - Provider name

Returns:

Decorator function

create

@classmethod
def create(cls,
provider: Optional[str] = None,
config_path: str = "config/config.toml",
config_name: str = "llm") -> LLMBase

Create LLM instance

Arguments:

  • provider - Provider name, if None, read from configuration file
  • config_path - Configuration file path
  • config_name - Configuration name

Returns:

  • LLMBase - LLM instance

Raises:

  • ValueError - If provider does not exist

Module spoon_ai.llm.monitoring

Comprehensive monitoring, debugging, and metrics collection for LLM operations.

RequestMetrics Objects

@dataclass
class RequestMetrics()

Metrics for a single LLM request.

ProviderStats Objects

@dataclass
class ProviderStats()

Aggregated statistics for a provider.

get

def get(key: str, default=None)

Get attribute value with default fallback for dictionary-like access.

Arguments:

  • key - Attribute name
  • default - Default value if attribute doesn't exist

Returns:

Attribute value or default

success_rate

@property
def success_rate() -> float

Calculate success rate as a percentage.

avg_response_time

@property
def avg_response_time() -> float

Get average response time.

DebugLogger Objects

class DebugLogger()

Comprehensive logging and debugging system for LLM operations.

__init__

def __init__(max_history: int = 1000, enable_detailed_logging: bool = True)

Initialize debug logger.

Arguments:

  • max_history - Maximum number of requests to keep in history
  • enable_detailed_logging - Whether to enable detailed request/response logging

log_request

def log_request(provider: str, method: str, params: Dict[str, Any]) -> str

Log request with unique ID.

Arguments:

  • provider - Provider name
  • method - Method being called (chat, completion, etc.)
  • params - Request parameters

Returns:

  • str - Unique request ID

log_response

def log_response(request_id: str, response: LLMResponse,
duration: float) -> None

Log response with timing information.

Arguments:

  • request_id - Request ID from log_request
  • response - LLM response object
  • duration - Request duration in seconds

log_error

def log_error(request_id: str, error: Exception, context: Dict[str,
Any]) -> None

Log error with context.

Arguments:

  • request_id - Request ID from log_request
  • error - Exception that occurred
  • context - Additional error context

log_fallback

def log_fallback(from_provider: str, to_provider: str, reason: str) -> None

Log provider fallback event.

Arguments:

  • from_provider - Provider that failed
  • to_provider - Provider being used as fallback
  • reason - Reason for fallback

get_request_history

def get_request_history(provider: Optional[str] = None,
limit: Optional[int] = None) -> List[RequestMetrics]

Get request history.

Arguments:

  • provider - Filter by provider (optional)
  • limit - Maximum number of requests to return (optional)

Returns:

  • List[RequestMetrics] - List of request metrics

get_active_requests

def get_active_requests() -> List[RequestMetrics]

Get currently active requests.

Returns:

  • List[RequestMetrics] - List of active request metrics

clear_history

def clear_history() -> None

Clear request history.

MetricsCollector Objects

class MetricsCollector()

Collects and aggregates performance metrics for LLM providers.

__init__

def __init__(window_size: int = 3600)

Initialize metrics collector.

Arguments:

  • window_size - Time window in seconds for rolling metrics

record_request

def record_request(provider: str,
method: str,
duration: float,
success: bool,
tokens: int = 0,
model: str = '',
error: Optional[str] = None) -> None

Record request metrics.

Arguments:

  • provider - Provider name
  • method - Method called
  • duration - Request duration in seconds
  • success - Whether request was successful
  • tokens - Number of tokens used
  • model - Model name
  • error - Error message if failed

get_provider_stats

def get_provider_stats(provider: str) -> Optional[ProviderStats]

Get statistics for a specific provider.

Arguments:

  • provider - Provider name

Returns:

  • Optional[ProviderStats] - Provider statistics or None if not found

get_all_stats

def get_all_stats() -> Dict[str, ProviderStats]

Get statistics for all providers.

Returns:

Dict[str, ProviderStats]: Dictionary of provider statistics

get_rolling_metrics

def get_rolling_metrics(provider: Optional[str] = None,
method: Optional[str] = None) -> List[Dict[str, Any]]

Get rolling metrics with optional filtering.

Arguments:

  • provider - Filter by provider (optional)
  • method - Filter by method (optional)

Returns:

List[Dict[str, Any]]: List of metrics

get_summary

def get_summary() -> Dict[str, Any]

Get overall summary statistics.

Returns:

Dict[str, Any]: Summary statistics

reset_stats

def reset_stats(provider: Optional[str] = None) -> None

Reset statistics.

Arguments:

  • provider - Reset specific provider only (optional)

get_debug_logger

def get_debug_logger() -> DebugLogger

Get global debug logger instance.

Returns:

  • DebugLogger - Global debug logger

get_metrics_collector

def get_metrics_collector() -> MetricsCollector

Get global metrics collector instance.

Returns:

  • MetricsCollector - Global metrics collector

Module spoon_ai.llm.response_normalizer

Response normalizer for ensuring consistent response formats across providers.

ResponseNormalizer Objects

class ResponseNormalizer()

Normalizes responses from different providers to ensure consistency.

normalize_response

def normalize_response(response: LLMResponse) -> LLMResponse

Normalize a response from any provider.

Arguments:

  • response - Raw LLM response

Returns:

  • LLMResponse - Normalized response

Raises:

  • ValidationError - If response cannot be normalized

validate_response

def validate_response(response: LLMResponse) -> bool

Validate that a response meets minimum requirements.

Arguments:

  • response - Response to validate

Returns:

  • bool - True if response is valid

Raises:

  • ValidationError - If response is invalid

add_provider_mapping

def add_provider_mapping(provider_name: str, normalizer_func) -> None

Add a custom normalizer for a new provider.

Arguments:

  • provider_name - Name of the provider
  • normalizer_func - Function that takes and returns LLMResponse

get_supported_providers

def get_supported_providers() -> List[str]

Get list of providers with custom normalizers.

Returns:

  • List[str] - List of provider names

get_response_normalizer

def get_response_normalizer() -> ResponseNormalizer

Get global response normalizer instance.

Returns:

  • ResponseNormalizer - Global normalizer instance

Module spoon_ai.llm.cache

LLM Response Caching - Cache LLM responses to avoid redundant API calls.

LLMResponseCache Objects

class LLMResponseCache()

Cache for LLM responses to avoid redundant API calls.

__init__

def __init__(default_ttl: int = 3600, max_size: int = 1000)

Initialize the cache.

Arguments:

  • default_ttl - Default time-to-live in seconds (default: 1 hour)
  • max_size - Maximum number of cached entries (default: 1000)

get

def get(messages: List[Message],
provider: Optional[str] = None,
**kwargs) -> Optional[LLMResponse]

Get cached response if available.

Arguments:

  • messages - List of conversation messages
  • provider - Provider name (optional)
  • **kwargs - Additional parameters

Returns:

  • Optional[LLMResponse] - Cached response if found and not expired, None otherwise

set

def set(messages: List[Message],
response: LLMResponse,
provider: Optional[str] = None,
ttl: Optional[int] = None,
**kwargs) -> None

Store response in cache.

Arguments:

  • messages - List of conversation messages
  • response - LLM response to cache
  • provider - Provider name (optional)
  • ttl - Time-to-live in seconds (optional, uses default if not provided)
  • **kwargs - Additional parameters

clear

def clear() -> None

Clear all cached entries.

get_stats

def get_stats() -> Dict[str, Any]

Get cache statistics.

Returns:

Dict[str, Any]: Cache statistics including size, max_size, etc.

CachedLLMManager Objects

class CachedLLMManager()

Wrapper around LLMManager that adds response caching.

__init__

def __init__(llm_manager: LLMManager,
cache: Optional[LLMResponseCache] = None)

Initialize cached LLM manager.

Arguments:

  • llm_manager - The underlying LLMManager instance
  • cache - Optional cache instance (creates new one if not provided)

chat

async def chat(messages: List[Message],
provider: Optional[str] = None,
use_cache: bool = True,
cache_ttl: Optional[int] = None,
**kwargs) -> LLMResponse

Send chat request with caching support.

Arguments:

  • messages - List of conversation messages
  • provider - Specific provider to use (optional)
  • use_cache - Whether to use cache (default: True)
  • cache_ttl - Custom TTL for this request (optional)
  • **kwargs - Additional parameters

Returns:

  • LLMResponse - LLM response (from cache or API)

chat_stream

async def chat_stream(messages: List[Message],
provider: Optional[str] = None,
callbacks: Optional[List] = None,
**kwargs)

Send streaming chat request (caching not supported for streaming).

Arguments:

  • messages - List of conversation messages
  • provider - Specific provider to use (optional)
  • callbacks - Optional callback handlers
  • **kwargs - Additional parameters

Yields:

  • LLMResponseChunk - Streaming response chunks

clear_cache

def clear_cache() -> None

Clear the response cache.

get_cache_stats

def get_cache_stats() -> Dict[str, Any]

Get cache statistics.

Returns:

Dict[str, Any]: Cache statistics

Module spoon_ai.llm.interface

LLM Provider Interface - Abstract base class defining the unified interface for all LLM providers.

ProviderCapability Objects

class ProviderCapability(Enum)

Enumeration of capabilities that LLM providers can support.

ProviderMetadata Objects

@dataclass
class ProviderMetadata()

Metadata describing a provider's capabilities and limits.

LLMResponse Objects

@dataclass
class LLMResponse()

Enhanced LLM response with comprehensive metadata and debugging information.

LLMProviderInterface Objects

class LLMProviderInterface(ABC)

Abstract base class defining the unified interface for all LLM providers.

initialize

@abstractmethod
async def initialize(config: Dict[str, Any]) -> None

Initialize the provider with configuration.

Arguments:

  • config - Provider-specific configuration dictionary

Raises:

  • ConfigurationError - If configuration is invalid

chat

@abstractmethod
async def chat(messages: List[Message], **kwargs) -> LLMResponse

Send chat request to the provider.

Arguments:

  • messages - List of conversation messages
  • **kwargs - Additional provider-specific parameters

Returns:

  • LLMResponse - Standardized response object

Raises:

  • ProviderError - If the request fails

chat_stream

@abstractmethod
async def chat_stream(messages: List[Message],
callbacks: Optional[List[BaseCallbackHandler]] = None,
**kwargs) -> AsyncIterator[LLMResponseChunk]

Send streaming chat request to the provider with callback support.

Arguments:

  • messages - List of conversation messages
  • callbacks - Optional list of callback handlers for real-time events
  • **kwargs - Additional provider-specific parameters

Yields:

  • LLMResponseChunk - Structured streaming response chunks

Raises:

  • ProviderError - If the request fails

completion

@abstractmethod
async def completion(prompt: str, **kwargs) -> LLMResponse

Send completion request to the provider.

Arguments:

  • prompt - Text prompt for completion
  • **kwargs - Additional provider-specific parameters

Returns:

  • LLMResponse - Standardized response object

Raises:

  • ProviderError - If the request fails

chat_with_tools

@abstractmethod
async def chat_with_tools(messages: List[Message], tools: List[Dict],
**kwargs) -> LLMResponse

Send chat request with tool support.

Arguments:

  • messages - List of conversation messages
  • tools - List of available tools
  • **kwargs - Additional provider-specific parameters

Returns:

  • LLMResponse - Standardized response object with potential tool calls

Raises:

  • ProviderError - If the request fails

get_metadata

@abstractmethod
def get_metadata() -> ProviderMetadata

Get provider metadata and capabilities.

Returns:

  • ProviderMetadata - Provider information and capabilities

health_check

@abstractmethod
async def health_check() -> bool

Check if provider is healthy and available.

Returns:

  • bool - True if provider is healthy, False otherwise

cleanup

@abstractmethod
async def cleanup() -> None

Cleanup resources and connections.

This method should be called when the provider is no longer needed.

Module spoon_ai.llm.providers.deepseek_provider

DeepSeek Provider implementation for the unified LLM interface. DeepSeek provides access to their models through an OpenAI-compatible API.

DeepSeekProvider Objects

@register_provider("deepseek", [
ProviderCapability.CHAT,
ProviderCapability.COMPLETION,
ProviderCapability.TOOLS,
ProviderCapability.STREAMING
])
class DeepSeekProvider(OpenAICompatibleProvider)

DeepSeek provider implementation using OpenAI-compatible API.

get_metadata

def get_metadata() -> ProviderMetadata

Get DeepSeek provider metadata.

Module spoon_ai.llm.providers.gemini_provider

Gemini Provider implementation for the unified LLM interface.

GeminiProvider Objects

@register_provider("gemini", [
ProviderCapability.CHAT,
ProviderCapability.COMPLETION,
ProviderCapability.STREAMING,
ProviderCapability.TOOLS,
ProviderCapability.IMAGE_GENERATION,
ProviderCapability.VISION
])
class GeminiProvider(LLMProviderInterface)

Gemini provider implementation.

initialize

async def initialize(config: Dict[str, Any]) -> None

Initialize the Gemini provider with configuration.

chat

async def chat(messages: List[Message], **kwargs) -> LLMResponse

Send chat request to Gemini.

chat_stream

async def chat_stream(messages: List[Message],
callbacks: Optional[List] = None,
**kwargs) -> AsyncIterator[LLMResponseChunk]

Send streaming chat request to Gemini with callback support.

Yields:

  • LLMResponseChunk - Structured streaming response chunks

completion

async def completion(prompt: str, **kwargs) -> LLMResponse

Send completion request to Gemini.

chat_with_tools

async def chat_with_tools(messages: List[Message], tools: List[Dict],
**kwargs) -> LLMResponse

Send chat request with tools to Gemini using native function calling.

get_metadata

def get_metadata() -> ProviderMetadata

Get Gemini provider metadata.

health_check

async def health_check() -> bool

Check if Gemini provider is healthy.

cleanup

async def cleanup() -> None

Cleanup Gemini provider resources.

Module spoon_ai.llm.providers.openai_compatible_provider

OpenAI Compatible Provider base class for providers that use OpenAI-compatible APIs. This includes OpenAI, OpenRouter, DeepSeek, and other providers with similar interfaces.

OpenAICompatibleProvider Objects

class OpenAICompatibleProvider(LLMProviderInterface)

Base class for OpenAI-compatible providers.

get_provider_name

def get_provider_name() -> str

Get the provider name. Should be overridden by subclasses.

get_default_base_url

def get_default_base_url() -> str

Get the default base URL. Should be overridden by subclasses.

get_default_model

def get_default_model() -> str

Get the default model. Should be overridden by subclasses.

get_additional_headers

def get_additional_headers(config: Dict[str, Any]) -> Dict[str, str]

Get additional headers for the provider. Can be overridden by subclasses.

initialize

async def initialize(config: Dict[str, Any]) -> None

Initialize the provider with configuration.

chat

async def chat(messages: List[Message], **kwargs) -> LLMResponse

Send chat request to the provider.

chat_stream

async def chat_stream(messages: List[Message],
callbacks: Optional[List[BaseCallbackHandler]] = None,
**kwargs) -> AsyncIterator[LLMResponseChunk]

Send streaming chat request with full callback support.

Yields:

  • LLMResponseChunk - Structured streaming response chunks

completion

async def completion(prompt: str, **kwargs) -> LLMResponse

Send completion request to the provider.

chat_with_tools

async def chat_with_tools(messages: List[Message], tools: List[Dict],
**kwargs) -> LLMResponse

Send chat request with tools to the provider.

get_metadata

def get_metadata() -> ProviderMetadata

Get provider metadata. Should be overridden by subclasses.

health_check

async def health_check() -> bool

Check if provider is healthy.

cleanup

async def cleanup() -> None

Cleanup provider resources.

Module spoon_ai.llm.providers.ollama_provider

Ollama Provider implementation for the unified LLM interface.

Ollama runs locally and exposes an HTTP API (default: http://localhost:11434). This provider supports chat, completion, and streaming.

Notes:

  • Ollama does not require an API key; the configuration layer may still provide a placeholder api_key value for consistency.
  • Tool calling is not implemented here.

OllamaProvider Objects

@register_provider(
"ollama",
[
ProviderCapability.CHAT,
ProviderCapability.COMPLETION,
ProviderCapability.STREAMING,
],
)
class OllamaProvider(LLMProviderInterface)

Local Ollama provider via HTTP.

Module spoon_ai.llm.providers.openai_provider

OpenAI Provider implementation for the unified LLM interface.

OpenAIProvider Objects

@register_provider("openai", [
ProviderCapability.CHAT,
ProviderCapability.COMPLETION,
ProviderCapability.TOOLS,
ProviderCapability.STREAMING
])
class OpenAIProvider(OpenAICompatibleProvider)

OpenAI provider implementation.

get_metadata

def get_metadata() -> ProviderMetadata

Get OpenAI provider metadata.

Module spoon_ai.llm.providers.anthropic_provider

Anthropic Provider implementation for the unified LLM interface.

AnthropicProvider Objects

@register_provider("anthropic", [
ProviderCapability.CHAT,
ProviderCapability.COMPLETION,
ProviderCapability.TOOLS,
ProviderCapability.STREAMING
])
class AnthropicProvider(LLMProviderInterface)

Anthropic provider implementation.

initialize

async def initialize(config: Dict[str, Any]) -> None

Initialize the Anthropic provider with configuration.

get_cache_metrics

def get_cache_metrics() -> Dict[str, int]

Get current cache performance metrics.

chat

async def chat(messages: List[Message], **kwargs) -> LLMResponse

Send chat request to Anthropic.

chat_stream

async def chat_stream(messages: List[Message],
callbacks: Optional[List] = None,
**kwargs) -> AsyncIterator[LLMResponseChunk]

Send streaming chat request to Anthropic with callback support.

Yields:

  • LLMResponseChunk - Structured streaming response chunks

completion

async def completion(prompt: str, **kwargs) -> LLMResponse

Send completion request to Anthropic.

chat_with_tools

async def chat_with_tools(messages: List[Message], tools: List[Dict],
**kwargs) -> LLMResponse

Send chat request with tools to Anthropic.

get_metadata

def get_metadata() -> ProviderMetadata

Get Anthropic provider metadata.

health_check

async def health_check() -> bool

Check if Anthropic provider is healthy.

cleanup

async def cleanup() -> None

Cleanup Anthropic provider resources.

Module spoon_ai.llm.providers.openrouter_provider

OpenRouter Provider implementation for the unified LLM interface. OpenRouter provides access to multiple LLM models through a unified API compatible with OpenAI.

OpenRouterProvider Objects

@register_provider("openrouter", [
ProviderCapability.CHAT,
ProviderCapability.COMPLETION,
ProviderCapability.TOOLS,
ProviderCapability.STREAMING
])
class OpenRouterProvider(OpenAICompatibleProvider)

OpenRouter provider implementation using OpenAI-compatible API.

get_additional_headers

def get_additional_headers(config: Dict[str, Any]) -> Dict[str, str]

Get OpenRouter-specific headers.

get_metadata

def get_metadata() -> ProviderMetadata

Get OpenRouter provider metadata.

Module spoon_ai.llm.providers

LLM Provider implementations.

Module spoon_ai.llm.manager

LLM Manager - Central orchestrator for managing providers, fallback, and load balancing.

ProviderState Objects

@dataclass
class ProviderState()

Track provider initialization and health state.

can_retry_initialization

def can_retry_initialization() -> bool

Check if provider initialization can be retried.

record_initialization_failure

def record_initialization_failure(error: Exception) -> None

Record initialization failure with exponential backoff.

record_initialization_success

def record_initialization_success() -> None

Record successful initialization.

FallbackStrategy Objects

class FallbackStrategy()

Handles fallback logic between providers.

execute_with_fallback

async def execute_with_fallback(providers: List[str], operation, *args,
**kwargs) -> LLMResponse

Execute operation with fallback chain.

Arguments:

  • providers - List of provider names in fallback order
  • operation - Async operation to execute *args, **kwargs: Arguments for the operation

Returns:

  • LLMResponse - Response from successful provider

Raises:

  • ProviderError - If all providers fail

LoadBalancer Objects

class LoadBalancer()

Handles load balancing between multiple provider instances.

select_provider

def select_provider(providers: List[str],
strategy: str = "round_robin") -> str

Select a provider based on load balancing strategy.

Arguments:

  • providers - List of available providers
  • strategy - Load balancing strategy ('round_robin', 'weighted', 'random')

Returns:

  • str - Selected provider name

update_provider_health

def update_provider_health(provider: str, is_healthy: bool) -> None

Update provider health status.

set_provider_weight

def set_provider_weight(provider: str, weight: float) -> None

Set provider weight for weighted load balancing.

LLMManager Objects

class LLMManager()

Central orchestrator for LLM providers with fallback and load balancing.

__init__

def __init__(config_manager: Optional[ConfigurationManager] = None,
debug_logger: Optional[DebugLogger] = None,
metrics_collector: Optional[MetricsCollector] = None,
response_normalizer: Optional[ResponseNormalizer] = None,
registry: Optional[LLMProviderRegistry] = None)

Initialize LLM Manager with enhanced provider state tracking.

cleanup

async def cleanup() -> None

Enhanced cleanup with proper resource management.

get_provider_status

def get_provider_status() -> Dict[str, Dict[str, Any]]

Get detailed status of all providers.

reset_provider

async def reset_provider(provider_name: str) -> bool

Reset a provider's state and force reinitialization.

Arguments:

  • provider_name - Name of provider to reset

Returns:

  • bool - True if reset successful

chat

async def chat(messages: List[Message],
provider: Optional[str] = None,
**kwargs) -> LLMResponse

Send chat request with automatic provider selection and fallback.

Arguments:

  • messages - List of conversation messages
  • provider - Specific provider to use (optional)
  • **kwargs - Additional parameters

Returns:

  • LLMResponse - Normalized response

chat_stream

async def chat_stream(messages: List[Message],
provider: Optional[str] = None,
callbacks: Optional[List[BaseCallbackHandler]] = None,
**kwargs) -> AsyncGenerator[LLMResponseChunk, None]

Send streaming chat request with callback support.

Arguments:

  • messages - List of conversation messages
  • provider - Specific provider to use (optional)
  • callbacks - Optional callback handlers for monitoring
  • **kwargs - Additional parameters

Yields:

  • LLMResponseChunk - Structured streaming response chunks

completion

async def completion(prompt: str,
provider: Optional[str] = None,
**kwargs) -> LLMResponse

Send completion request.

Arguments:

  • prompt - Text prompt
  • provider - Specific provider to use (optional)
  • **kwargs - Additional parameters

Returns:

  • LLMResponse - Normalized response

chat_with_tools

async def chat_with_tools(messages: List[Message],
tools: List[Dict],
provider: Optional[str] = None,
**kwargs) -> LLMResponse

Send tool-enabled chat request.

Arguments:

  • messages - List of conversation messages
  • tools - List of available tools
  • provider - Specific provider to use (optional)
  • **kwargs - Additional parameters

Returns:

  • LLMResponse - Normalized response

set_fallback_chain

def set_fallback_chain(providers: List[str]) -> None

Set fallback provider chain.

Arguments:

  • providers - List of provider names in fallback order

enable_load_balancing

def enable_load_balancing(strategy: str = "round_robin") -> None

Enable load balancing with specified strategy.

Arguments:

  • strategy - Load balancing strategy ('round_robin', 'weighted', 'random')

disable_load_balancing

def disable_load_balancing() -> None

Disable load balancing.

health_check_all

async def health_check_all() -> Dict[str, bool]

Check health of all registered providers.

Returns:

Dict[str, bool]: Provider health status

get_stats

def get_stats() -> Dict[str, Any]

Get comprehensive statistics.

Returns:

Dict[str, Any]: Manager and provider statistics

get_llm_manager

def get_llm_manager() -> LLMManager

Get global LLM manager instance.

Returns:

  • LLMManager - Global manager instance

set_llm_manager

def set_llm_manager(manager: LLMManager) -> None

Set global LLM manager instance.

Arguments:

  • manager - Manager instance to set as global

Module spoon_ai.llm.registry

LLM Provider Registry for dynamic provider registration and discovery.

LLMProviderRegistry Objects

class LLMProviderRegistry()

Registry for managing LLM provider classes and instances.

register

def register(name: str, provider_class: Type[LLMProviderInterface]) -> None

Register a provider class.

Arguments:

  • name - Unique provider name
  • provider_class - Provider class implementing LLMProviderInterface

Raises:

  • ConfigurationError - If provider name already exists or class is invalid

get_provider

def get_provider(
name: str,
config: Optional[Dict[str, Any]] = None) -> LLMProviderInterface

Get or create provider instance.

Arguments:

  • name - Provider name
  • config - Provider configuration (optional if already configured)

Returns:

  • LLMProviderInterface - Provider instance

Raises:

  • ConfigurationError - If provider not found or configuration invalid

list_providers

def list_providers() -> List[str]

List all registered provider names.

Returns:

  • List[str] - List of provider names

get_capabilities

def get_capabilities(name: str) -> List[ProviderCapability]

Get provider capabilities.

Arguments:

  • name - Provider name

Returns:

  • List[ProviderCapability] - List of supported capabilities

Raises:

  • ConfigurationError - If provider not found

is_registered

def is_registered(name: str) -> bool

Check if a provider is registered.

Arguments:

  • name - Provider name

Returns:

  • bool - True if provider is registered

unregister

def unregister(name: str) -> None

Unregister a provider.

Arguments:

  • name - Provider name

clear

def clear() -> None

Clear all registered providers and instances.

register_provider

def register_provider(name: str,
capabilities: Optional[List[ProviderCapability]] = None)

Decorator for automatic provider registration.

Arguments:

  • name - Provider name
  • capabilities - List of supported capabilities (optional)

Returns:

Decorator function

get_global_registry

def get_global_registry() -> LLMProviderRegistry

Get the global provider registry instance.

Returns:

  • LLMProviderRegistry - Global registry instance

Module spoon_ai.llm.config

Configuration management for LLM providers using environment variables.

ProviderConfig Objects

@dataclass
class ProviderConfig()

Configuration for a specific LLM provider.

__post_init__

def __post_init__()

Validate configuration after initialization.

model_dump

def model_dump() -> Dict[str, Any]

Convert the configuration to a dictionary.

Returns:

Dict[str, Any]: Configuration as dictionary

ConfigurationManager Objects

class ConfigurationManager()

Manages environment-driven configuration for LLM providers.

__init__

def __init__() -> None

Initialize configuration manager and load environment variables.

load_provider_config

def load_provider_config(provider_name: str) -> ProviderConfig

Load and validate provider configuration.

Arguments:

  • provider_name - Name of the provider

Returns:

  • ProviderConfig - Validated provider configuration

Raises:

  • ConfigurationError - If configuration is invalid or missing

validate_config

def validate_config(config: ProviderConfig) -> bool

Validate provider configuration.

Arguments:

  • config - Provider configuration to validate

Returns:

  • bool - True if configuration is valid

Raises:

  • ConfigurationError - If configuration is invalid

get_default_provider

def get_default_provider() -> str

Get default provider from configuration with intelligent selection.

Returns:

  • str - Default provider name

get_fallback_chain

def get_fallback_chain() -> List[str]

Get fallback chain from configuration.

Returns:

  • List[str] - List of provider names in fallback order

list_configured_providers

def list_configured_providers() -> List[str]

List all configured providers.

Returns:

  • List[str] - List of provider names that have configuration

get_available_providers_by_priority

def get_available_providers_by_priority() -> List[str]

Get available providers ordered by priority and quality.

Returns:

  • List[str] - List of available provider names in priority order

get_provider_info

def get_provider_info() -> Dict[str, Dict[str, Any]]

Get information about all providers and their availability.

Returns:

Dict[str, Dict[str, Any]]: Provider information including availability

reload_config

def reload_config() -> None

Reload configuration from file.

Module spoon_ai.llm

Unified LLM infrastructure package.

This package provides a unified interface for working with different LLM providers, including comprehensive configuration management, monitoring, and error handling.

Module spoon_ai.llm.errors

Standardized error hierarchy for LLM operations.

LLMError Objects

class LLMError(Exception)

Base exception for all LLM-related errors.

ProviderError Objects

class ProviderError(LLMError)

Provider-specific error with detailed context.

ConfigurationError Objects

class ConfigurationError(LLMError)

Configuration validation or loading error.

RateLimitError Objects

class RateLimitError(ProviderError)

Rate limit exceeded error.

AuthenticationError Objects

class AuthenticationError(ProviderError)

Authentication failed error.

ModelNotFoundError Objects

class ModelNotFoundError(ProviderError)

Model not found or not available error.

TokenLimitError Objects

class TokenLimitError(ProviderError)

Token limit exceeded error.

NetworkError Objects

class NetworkError(ProviderError)

Network connectivity or timeout error.

ProviderUnavailableError Objects

class ProviderUnavailableError(ProviderError)

Provider service unavailable error.

ValidationError Objects

class ValidationError(LLMError)

Input validation error.

Module spoon_ai.llm.base

LLMBase Objects

class LLMBase(ABC)

Base abstract class for LLM, defining interfaces that all LLM providers must implement

__init__

def __init__(config_path: str = "config/config.toml",
config_name: str = "llm")

Initialize LLM interface

Arguments:

  • config_path - Configuration file path
  • config_name - Configuration name

chat

@abstractmethod
async def chat(messages: List[Message],
system_msgs: Optional[List[Message]] = None,
**kwargs) -> LLMResponse

Send chat request to LLM and get response

Arguments:

  • messages - List of messages
  • system_msgs - List of system messages
  • **kwargs - Other parameters

Returns:

  • LLMResponse - LLM response

completion

@abstractmethod
async def completion(prompt: str, **kwargs) -> LLMResponse

Send text completion request to LLM and get response

Arguments:

  • prompt - Prompt text
  • **kwargs - Other parameters

Returns:

  • LLMResponse - LLM response

chat_with_tools

@abstractmethod
async def chat_with_tools(messages: List[Message],
system_msgs: Optional[List[Message]] = None,
tools: Optional[List[Dict]] = None,
tool_choice: Literal["none", "auto",
"required"] = "auto",
**kwargs) -> LLMResponse

Send chat request that may contain tool calls to LLM and get response

Arguments:

  • messages - List of messages
  • system_msgs - List of system messages
  • tools - List of tools
  • tool_choice - Tool selection mode
  • **kwargs - Other parameters

Returns:

  • LLMResponse - LLM response

generate_image

async def generate_image(prompt: str, **kwargs) -> Union[str, List[str]]

Generate image (optional implementation)

Arguments:

  • prompt - Prompt text
  • **kwargs - Other parameters

Returns:

Union[str, List[str]]: Image URL or list of URLs

reset_output_handler

def reset_output_handler()

Reset output handler

Module spoon_ai.utils.utils

Module spoon_ai.utils.config_manager

ConfigManager Objects

class ConfigManager()

Environment-based configuration helper for core usage.

__init__

def __init__() -> None

Initialize manager with environment-backed cache.

refresh

def refresh() -> None

Reload configuration snapshot from environment variables.

get

def get(key: str, default: Any = None) -> Any

Get configuration item from environment snapshot.

set

def set(key: str, value: Any) -> None

Set configuration item by exporting to environment variables.

list_config

def list_config() -> Dict[str, Any]

List configuration snapshot without persisting secrets.

get_api_key

def get_api_key(provider: str) -> Optional[str]

Get API key for specified provider with environment priority.

set_api_key

def set_api_key(provider: str, api_key: str) -> None

Set API key by exporting to environment variables.

get_model_name

def get_model_name() -> Optional[str]

Get model name override from environment.

get_base_url

def get_base_url() -> Optional[str]

Get base URL override from environment.

get_llm_provider

def get_llm_provider() -> Optional[str]

Determine LLM provider from environment variables.

Module spoon_ai.utils.config

Module spoon_ai.utils

Module spoon_ai.utils.streaming

StreamOutcome Objects

@dataclass
class StreamOutcome()

Accumulator for streaming output state.

Module spoon_ai.runnables.events

StreamEventBuilder Objects

class StreamEventBuilder()

chain_start

@staticmethod
def chain_start(run_id: UUID, name: str, inputs: Any,
**kwargs: Any) -> StreamEvent

Build chain start event.

chain_stream

@staticmethod
def chain_stream(run_id: UUID, name: str, chunk: Any,
**kwargs: Any) -> StreamEvent

Build chain stream event.

chain_end

@staticmethod
def chain_end(run_id: UUID, name: str, output: Any,
**kwargs: Any) -> StreamEvent

Build chain end event.

chain_error

@staticmethod
def chain_error(run_id: UUID, name: str, error: Exception,
**kwargs: Any) -> StreamEvent

Build chain error event.

llm_stream

@staticmethod
def llm_stream(run_id: UUID,
name: str,
token: str,
chunk: Optional[Any] = None,
**kwargs: Any) -> StreamEvent

Build LLM stream event.

Module spoon_ai.runnables

Runnable interface and utilities for composable AI components.

This module provides the foundational Runnable interface that all Spoon AI components implement, enabling streaming, composition, and standardized execution.

Module spoon_ai.runnables.base

log_patches_from_events

async def log_patches_from_events(
event_iter: AsyncIterator[Dict[str, Any]],
*,
diff: bool = True) -> AsyncIterator[RunLogPatch]

Convert a stream of events into run log patches.

Runnable Objects

class Runnable(ABC, Generic[Input, Output])

astream_log

async def astream_log(input: Input,
config: Optional[RunnableConfig] = None,
*,
diff: bool = True) -> AsyncIterator[RunLogPatch]

Asynchronously stream structured log patches derived from execution events.

astream_events

async def astream_events(
input: Input,
config: Optional[RunnableConfig] = None
) -> AsyncIterator[Dict[str, Any]]

Asynchronously stream structured execution events.

Module spoon_ai.payments.server

create_paywalled_router

def create_paywalled_router(
service: Optional[X402PaymentService] = None,
agent_factory: AgentFactory = _default_agent_factory,
payment_message: str = "Payment required to invoke this agent."
) -> APIRouter

Build a FastAPI router that protects agent invocations behind an x402 paywall.

Arguments:

  • service - Optional pre-configured payment service.
  • agent_factory - Coroutine that returns an initialized agent given its name.
  • payment_message - Message displayed when payment is required.

Returns:

  • APIRouter - Router with /invoke/{agent_name} endpoint ready to mount.

Module spoon_ai.payments.cli

Module spoon_ai.payments.facilitator_client

X402FacilitatorClient Objects

class X402FacilitatorClient()

Thin wrapper over the upstream facilitator client with async header hooks.

Module spoon_ai.payments.exceptions

X402PaymentError Objects

class X402PaymentError(Exception)

Base exception for x402 payment operations.

X402ConfigurationError Objects

class X402ConfigurationError(X402PaymentError)

Raised when integration configuration is invalid or incomplete.

X402VerificationError Objects

class X402VerificationError(X402PaymentError)

Raised when a payment header fails verification against the facilitator.

X402SettlementError Objects

class X402SettlementError(X402PaymentError)

Raised when settlement fails or returns an error response.

Module spoon_ai.payments.x402_service

X402PaymentService Objects

class X402PaymentService()

High level service that aligns the x402 SDK with SpoonOS conventions.

discover_resources

async def discover_resources(
*,
resource_type: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None) -> ListDiscoveryResourcesResponse

Query the facilitator discovery endpoint for registered paywalled resources.

render_paywall_html

def render_paywall_html(error: str,
request: Optional[X402PaymentRequest] = None,
headers: Optional[Dict[str, Any]] = None) -> str

Render the embedded paywall HTML with payment requirements.

build_payment_header

def build_payment_header(requirements: PaymentRequirements,
*,
max_value: Optional[int] = None) -> str

Create a signed X-PAYMENT header for outbound requests.

decode_payment_response

def decode_payment_response(header_value: str) -> X402PaymentReceipt

Decode an X-PAYMENT-RESPONSE header into a structured receipt.

Module spoon_ai.payments.config

X402ConfigurationError Objects

class X402ConfigurationError(Exception)

Raised when required x402 configuration is missing or invalid.

X402PaywallBranding Objects

class X402PaywallBranding(BaseModel)

Optional branding customisations for the embedded paywall template.

X402ClientConfig Objects

class X402ClientConfig(BaseModel)

Holds client-side signing configuration used for outbound payments.

X402Settings Objects

class X402Settings(BaseModel)

Resolved configuration view for x402 payments inside SpoonOS.

amount_in_atomic_units

@property
def amount_in_atomic_units() -> str

Return the configured maximum amount encoded as atomic units (string).

build_asset_extra

def build_asset_extra() -> Dict[str, Any]

Construct the extra payload for the payment requirements.

load

@classmethod
def load(cls,
config_manager: Optional[ConfigManager] = None) -> "X402Settings"

Load settings from config.json with .env fallbacks.

Module spoon_ai.payments.app

Module spoon_ai.payments

Payment utilities for integrating the SpoonOS core with the x402 payments protocol.

This package wraps the upstream x402 Python SDK with configuration and service abstractions that align to SpoonOS conventions (config.json priority, .env overrides, and async-friendly helper utilities).

Module spoon_ai.payments.models

X402PaymentRequest Objects

class X402PaymentRequest(BaseModel)

Describes a payment requirement that should be issued for a resource.

X402VerifyResult Objects

class X402VerifyResult(BaseModel)

Captures the facilitator verification response.

X402SettleResult Objects

class X402SettleResult(BaseModel)

Captures settlement details.

X402PaymentOutcome Objects

class X402PaymentOutcome(BaseModel)

Aggregates verification and settlement outcomes.

X402PaymentReceipt Objects

class X402PaymentReceipt(BaseModel)

Decoded representation of the X-PAYMENT-RESPONSE header.

Module spoon_ai.chat

ShortTermMemoryConfig Objects

class ShortTermMemoryConfig(BaseModel)

Configuration for short-term memory management.

enabled

Enable automatic short-term memory management.

max_tokens

Maximum token count before triggering trimming/summarization.

strategy

Strategy to use when exceeding max_tokens: 'summarize' or 'trim'.

messages_to_keep

Number of recent messages to keep when summarizing.

trim_strategy

Trimming strategy when using 'trim' mode.

keep_system_messages

Always keep system messages during trimming.

auto_checkpoint

Automatically save checkpoints before trimming/summarization.

checkpoint_thread_id

Thread ID for checkpoint management.

summary_model

Model to use for summarization (defaults to ChatBot's model).

ChatBot Objects

class ChatBot()

__init__

def __init__(use_llm_manager: bool = True,
model_name: str = None,
llm_provider: str = None,
api_key: str = None,
base_url: str = None,
enable_short_term_memory: bool = True,
short_term_memory_config: Optional[Union[Dict[
str, Any], ShortTermMemoryConfig]] = None,
token_counter: Optional[MessageTokenCounter] = None,
enable_long_term_memory: bool = False,
mem0_config: Optional[Dict[str, Any]] = None,
callbacks: Optional[List[BaseCallbackHandler]] = None,
**kwargs)

Initialize ChatBot with hierarchical configuration priority system.

Configuration Priority System:

  1. Full manual override (highest priority) - all params provided
  2. Partial override with config fallback - llm_provider provided, credentials pulled from environment (or config files if explicitly enabled)
  3. Full environment-based loading - only use_llm_manager=True, reads from environment variables

Arguments:

  • use_llm_manager - Enable LLM manager architecture (default: True)
  • model_name - Model name override
  • llm_provider - Provider name override
  • api_key - API key override
  • base_url - Base URL override
  • enable_short_term_memory - Enable short-term memory management (default: True)
  • short_term_memory_config - Configuration dict or ShortTermMemoryConfig instance
  • token_counter - Optional custom token counter instance
  • enable_long_term_memory - Enable Mem0-backed long-term memory retrieval/storage
  • mem0_config - Configuration dict for Mem0 (api_key, user_id/agent_id, collection, etc.)
  • callbacks - Optional list of callback handlers for monitoring
  • **kwargs - Additional parameters

update_mem0_config

def update_mem0_config(config: Optional[Dict[str, Any]] = None,
enable: Optional[bool] = None) -> None

Update Mem0 configuration and re-initialize the client if needed.

ask

async def ask(messages: List[Union[dict, Message]],
system_msg: Optional[str] = None,
output_queue: Optional[asyncio.Queue] = None) -> str

Ask method using the LLM manager architecture.

Automatically applies short-term memory strategy if enabled.

ask_tool

async def ask_tool(messages: List[Union[dict, Message]],
system_msg: Optional[str] = None,
tools: Optional[List[dict]] = None,
tool_choice: Optional[str] = None,
output_queue: Optional[asyncio.Queue] = None,
**kwargs) -> LLMResponse

Ask tool method using the LLM manager architecture.

Automatically applies short-term memory strategy if enabled.

trim_messages

async def trim_messages(messages: List[Message],
max_tokens: int,
strategy: TrimStrategy = TrimStrategy.FROM_END,
keep_system: bool = True,
model: Optional[str] = None) -> List[Message]

Trim messages to stay within the token budget.

Arguments:

  • messages - List of messages to trim
  • max_tokens - Maximum token count to retain
  • strategy - Trimming strategy (from_start or from_end)
  • keep_system - Whether to always keep the leading system message
  • model - Model name for token counting

Returns:

  • List[Message] - Trimmed messages list

remove_message

def remove_message(message_id: str, **kwargs: Any) -> "RemoveMessage"

Construct a removal instruction for the message with the given ID.

remove_all_messages

def remove_all_messages() -> "RemoveMessage"

Construct a removal instruction that clears the entire history.

summarize_messages

async def summarize_messages(
messages: List[Message],
max_tokens_before_summary: int,
messages_to_keep: int = 5,
summary_model: Optional[str] = None,
existing_summary: str = ""
) -> Tuple[List[Message], List[RemoveMessage], Optional[str]]

Summarize earlier messages and emit removal directives.

Returns a tuple (messages_for_llm, removals, summary_text) where messages_for_llm are the messages that should be sent to the language model for the next turn, removals contains RemoveMessage directives that should be applied to the stored history, and summary_text is the newly generated summary (if any).

Arguments:

  • messages - List of messages to process
  • max_tokens_before_summary - Token threshold for triggering summary
  • messages_to_keep - Number of recent messages to keep uncompressed
  • summary_model - Model to use for summarization
  • existing_summary - Previously stored summary text

latest_summary

@property
def latest_summary() -> Optional[str]

Return the most recent summary generated by short-term memory.

latest_removals

@property
def latest_removals() -> List[RemoveMessage]

Return the most recent removal directives emitted by summarization.

save_checkpoint

def save_checkpoint(thread_id: str,
messages: List[Message],
metadata: Optional[dict] = None) -> str

Save current message state to checkpoint.

Arguments:

  • thread_id - Thread identifier
  • messages - Messages to save
  • metadata - Optional metadata to store

Returns:

  • str - Checkpoint ID

restore_checkpoint

def restore_checkpoint(
thread_id: str,
checkpoint_id: Optional[str] = None) -> Optional[List[Message]]

Restore messages from checkpoint.

Arguments:

  • thread_id - Thread identifier
  • checkpoint_id - Optional specific checkpoint ID

Returns:

  • Optional[List[Message]] - Restored messages, or None if checkpoint not found

list_checkpoints

def list_checkpoints(thread_id: str) -> List[dict]

List all checkpoints for a thread.

Arguments:

  • thread_id - Thread identifier

Returns:

  • List[dict] - List of checkpoint metadata

clear_checkpoints

def clear_checkpoints(thread_id: str) -> None

Clear all checkpoints for a thread.

Arguments:

  • thread_id - Thread identifier

astream

async def astream(messages: List[Union[dict, Message]],
system_msg: Optional[str] = None,
callbacks: Optional[List[BaseCallbackHandler]] = None,
**kwargs: Any) -> AsyncIterator[LLMResponseChunk]

Stream LLM responses chunk by chunk.

astream_events

async def astream_events(messages: List[Union[dict, Message]],
system_msg: Optional[str] = None,
callbacks: Optional[List[BaseCallbackHandler]] = None,
**kwargs) -> AsyncIterator[dict]

Stream structured events during LLM execution.

This method yields detailed events tracking the execution flow, useful for monitoring and debugging.

Arguments:

  • messages - List of messages or dicts
  • system_msg - Optional system message
  • callbacks - Optional callback handlers
  • **kwargs - Additional provider parameters

Yields:

Event dictionaries with structure: {

  • "event" - event_type,
  • "run_id" - str,
  • "timestamp" - ISO datetime string,
  • "data" - {event-specific data} }

astream_log

async def astream_log(messages: List[Union[dict, Message]],
system_msg: Optional[str] = None,
callbacks: Optional[List[BaseCallbackHandler]] = None,
*,
diff: bool = True,
**kwargs: Any) -> AsyncIterator[RunLogPatch]

Stream run log patches describing ChatBot execution.

Module spoon_ai.identity.did_models

DID Data Models for SpoonOS Agents Following W3C DID Core specification and ERC-8004 standard

VerificationMethodType Objects

class VerificationMethodType(str, Enum)

Supported verification method types

ServiceType Objects

class ServiceType(str, Enum)

Agent service endpoint types

VerificationMethod Objects

class VerificationMethod(BaseModel)

Cryptographic verification method for DID authentication

ServiceEndpoint Objects

class ServiceEndpoint(BaseModel)

Service endpoint for agent interaction

ReputationScore Objects

class ReputationScore(BaseModel)

Aggregated reputation score

Attestation Objects

class Attestation(BaseModel)

Verifiable attestation about an agent

AgentCard Objects

class AgentCard(BaseModel)

Agent Card following Google's A2A protocol Provides human-readable agent information

AgentDID Objects

class AgentDID(BaseModel)

Complete W3C DID Document for SpoonOS Agent

to_did_document

def to_did_document() -> Dict[str, Any]

Export as standard W3C DID Document

to_agent_card

def to_agent_card() -> Dict[str, Any]

Export agent card separately

DIDResolutionResult Objects

class DIDResolutionResult(BaseModel)

Result of DID resolution

Module spoon_ai.identity.attestation

Attestation and Trust Score Management Handles verifiable credentials and reputation calculations

AttestationManager Objects

class AttestationManager()

Manages verifiable attestations for agents

create_attestation

def create_attestation(issuer_did: str,
subject_did: str,
claim: Dict,
evidence: Optional[str] = None) -> Attestation

Create a verifiable attestation

Arguments:

  • issuer_did - DID of the attestation issuer
  • subject_did - DID of the agent being attested
  • claim - Attestation claim data
  • evidence - Optional supporting evidence

Returns:

Signed Attestation object

verify_attestation

def verify_attestation(attestation: Attestation) -> bool

Verify attestation signature

submit_reputation_on_chain

def submit_reputation_on_chain(subject_did: str, score: int,
evidence: str) -> str

Submit reputation score to on-chain registry

Arguments:

  • subject_did - DID of agent being rated
  • score - Score between -100 and 100
  • evidence - Evidence for the score

Returns:

Transaction hash

submit_validation_on_chain

def submit_validation_on_chain(subject_did: str, is_valid: bool,
reason: str) -> str

Submit validation for an agent

Arguments:

  • subject_did - DID of agent being validated
  • is_valid - Whether agent is valid
  • reason - Reason for validation decision

Returns:

Transaction hash

TrustScoreCalculator Objects

class TrustScoreCalculator()

Calculates trust scores for agents

calculate_trust_score

def calculate_trust_score(did: str) -> Dict

Calculate comprehensive trust score

Returns:

Dict with trust score components:

  • reputation_score: -100 to 100
  • validation_status: bool
  • trust_level: "high" | "medium" | "low" | "untrusted"
  • confidence: 0 to 1

get_reputation_breakdown

def get_reputation_breakdown(did: str, limit: int = 10) -> List[Dict]

Get detailed reputation submissions

get_validation_breakdown

def get_validation_breakdown(did: str, limit: int = 10) -> List[Dict]

Get detailed validation submissions

Module spoon_ai.identity.storage_client

Storage clients for DID documents and agent cards Supports NeoFS (primary) and IPFS (backup replication)

DIDStorageClient Objects

class DIDStorageClient()

Unified storage client for DID documents NeoFS primary with IPFS replication

publish_did_document

def publish_did_document(agent_id: str, did_document: Dict,
agent_card: Dict) -> Tuple[str, str]

Publish DID document and agent card to storage Returns (didDocURI, agentCardURI)

fetch_did_document

def fetch_did_document(uri: str) -> Dict

Fetch DID document from URI (NeoFS or IPFS)

publish_credential

def publish_credential(agent_id: str, credential: Dict) -> str

Publish verifiable credential

close

def close()

Close HTTP clients

Module spoon_ai.identity.did_resolver

DID Resolver for SpoonOS Agents Implements unified DID resolution with NeoFS-first policy

DIDResolver Objects

class DIDResolver()

Unified DID resolver for SpoonOS agents Resolution flow: On-chain anchor → NeoFS (primary) → IPFS (fallback)

resolve

def resolve(did: str) -> DIDResolutionResult

Resolve DID to complete DID document

Arguments:

  • did - DID string (did🥄agent:<identifier>)

Returns:

DIDResolutionResult with document and metadata

resolve_metadata_only

def resolve_metadata_only(did: str) -> Dict

Resolve only on-chain metadata (fast path)

verify_did

def verify_did(did: str) -> bool

Verify DID exists and is resolvable

Module spoon_ai.identity.erc8004_abi

Shared ERC-8004 ABI fragments (minimal, artifact-free).

These ABIs cover the common calls used by the Python SDK and demos.

Module spoon_ai.identity

SpoonOS Agent DID Identity Module Implements ERC-8004 compliant decentralized identity for agents

Module spoon_ai.identity.erc8004_client

ERC-8004 Smart Contract Client Handles on-chain interactions with agent registries

ERC8004Client Objects

class ERC8004Client()

Client for interacting with ERC-8004 agent registries

calculate_did_hash

def calculate_did_hash(did: str) -> bytes

Calculate keccak256 hash of DID string

create_eip712_signature

def create_eip712_signature(did_hash: bytes, agent_card_uri: str,
did_doc_uri: str) -> str

Create EIP-712 signature for agent registration

register_agent

def register_agent(did: str, agent_card_uri: str, did_doc_uri: str) -> str

Register agent on-chain

resolve_agent

def resolve_agent(did: str) -> Dict

Resolve agent metadata from on-chain registry

update_capabilities

def update_capabilities(did: str, capabilities: List[str]) -> str

Update agent capabilities on-chain

build_feedback_auth

def build_feedback_auth(agent_id: Union[int, bytes],
client_address: str,
index_limit: int,
expiry: int,
signer_address: Optional[str] = None,
identity_registry: Optional[str] = None,
chain_id: Optional[int] = None) -> bytes

Build and sign feedbackAuth payload required by chaoschain ERC8004 reputation registry. Returns abi.encode(struct) ++ signature (65 bytes).

give_feedback

def give_feedback(did: str,
score: int,
tag1: bytes = b"",
tag2: bytes = b"",
fileuri: str = "",
filehash: bytes = b"\x00" * 32,
index_limit: int = 10,
expiry: Optional[int] = None,
client_address: Optional[str] = None) -> str

Submit feedback using ERC8004 giveFeedback with feedbackAuth.

get_reputation_summary

def get_reputation_summary(did: str,
client_addresses: Optional[List[str]] = None,
tag1: bytes = b"\x00" * 32,
tag2: bytes = b"\x00" * 32) -> Tuple[int, int]

Return (count, averageScore 0-100).

get_reputation

def get_reputation(did: str) -> Tuple[int, int]

Backward compatible: (averageScore, count).

validation_request

def validation_request(
did: str,
validator: str,
request_uri: str,
request_hash: Optional[bytes] = None) -> Tuple[str, bytes]

Create validation request; returns tx hash and requestHash used.

get_validation_status

def get_validation_status(request_hash: bytes) -> Dict

Get per-request validation status.

submit_validation

def submit_validation(did: str, is_valid: bool, reason: str = "") -> str

Map boolean into 0/100 scale response.

register_agent

def register_agent(token_uri: str,
metadata: Optional[List[Tuple[str, bytes]]] = None) -> int

Register agent on IdentityRegistry; returns agentId.

Module spoon_ai.bridge.eth_neofs_indexer

Ethereum to NeoFS/IPFS Event Indexer Listens to ERC-8004 registry events and ensures off-chain storage is synchronized

EthereumNeoFSIndexer Objects

class EthereumNeoFSIndexer()

Event indexer that syncs Ethereum registry events to NeoFS/IPFS Ensures content hash verification and storage consistency

register_event_handler

def register_event_handler(event_name: str, handler: Callable)

Register custom event handler

start_indexing

def start_indexing(block_limit: Optional[int] = None)

Start indexing events

Arguments:

  • block_limit - Optional block limit for testing (stops after N blocks)

stop_indexing

def stop_indexing()

Stop the indexer

get_indexer_status

def get_indexer_status() -> Dict

Get current indexer status

Module spoon_ai.bridge

Cross-chain bridge module for DID synchronization Ethereum ← → NeoFS/IPFS event indexing

Module spoon_ai.tools.turnkey_tools

Turnkey Tools - Secure Blockchain Operations

This module provides Turnkey SDK tools for secure blockchain operations including:

  • Transaction signing and broadcasting
  • Message and EIP-712 signing
  • Multi-account management
  • Activity audit and monitoring
  • Wallet and account operations

TurnkeyBaseTool Objects

class TurnkeyBaseTool(BaseTool)

Base class for Turnkey tools with shared client initialization

client

@property
def client()

Lazy initialization of Turnkey client

SignEVMTransactionTool Objects

class SignEVMTransactionTool(TurnkeyBaseTool)

Sign EVM transaction using Turnkey

execute

async def execute(sign_with: str, unsigned_tx: str, **kwargs) -> str

Sign EVM transaction

SignMessageTool Objects

class SignMessageTool(TurnkeyBaseTool)

Sign arbitrary message using Turnkey

execute

async def execute(sign_with: str,
message: str,
use_keccak256: bool = True,
**kwargs) -> str

Sign message

SignTypedDataTool Objects

class SignTypedDataTool(TurnkeyBaseTool)

Sign EIP-712 structured data using Turnkey

execute

async def execute(sign_with: str, typed_data: dict, **kwargs) -> str

Sign EIP-712 typed data

BroadcastTransactionTool Objects

class BroadcastTransactionTool(TurnkeyBaseTool)

Broadcast signed transaction to blockchain

execute

async def execute(signed_tx: str, rpc_url: str = None, **kwargs) -> str

Broadcast transaction

ListWalletsTool Objects

class ListWalletsTool(TurnkeyBaseTool)

List all wallets in the organization

execute

async def execute(**kwargs) -> str

List wallets

ListWalletAccountsTool Objects

class ListWalletAccountsTool(TurnkeyBaseTool)

List accounts for a specific wallet

execute

async def execute(wallet_id: str,
limit: str = None,
before: str = None,
after: str = None,
**kwargs) -> str

List wallet accounts

GetActivityTool Objects

class GetActivityTool(TurnkeyBaseTool)

Get activity details by ID

execute

async def execute(activity_id: str, **kwargs) -> str

Get activity details

ListActivitiesTool Objects

class ListActivitiesTool(TurnkeyBaseTool)

List recent activities in the organization

execute

async def execute(limit: str = "10",
before: str = None,
after: str = None,
filter_by_status: list = None,
filter_by_type: list = None,
**kwargs) -> str

List activities

WhoAmITool Objects

class WhoAmITool(TurnkeyBaseTool)

Get organization information

execute

async def execute(**kwargs) -> str

Get organization info

BuildUnsignedEIP1559TxTool Objects

class BuildUnsignedEIP1559TxTool(BaseTool)

Build unsigned EIP-1559 transaction (supports NeoX)

execute

async def execute(from_addr: str,
to_addr: str = None,
value_wei: str = "0",
data_hex: str = "0x",
priority_gwei: str = "1",
max_fee_gwei: str = None,
gas_limit: str = None,
rpc_url: str = None,
**kwargs) -> str

Build unsigned transaction (auto-detects NeoX)

ListAllAccountsTool Objects

class ListAllAccountsTool(TurnkeyBaseTool)

List all accounts across all wallets in the organization

execute

async def execute(limit: str = "50", **kwargs) -> str

List all accounts across all wallets

BatchSignTransactionsTool Objects

class BatchSignTransactionsTool(TurnkeyBaseTool)

Batch sign transactions for multiple accounts

execute

async def execute(to_address: str,
value_wei: str,
data_hex: str = "0x",
max_accounts: str = "3",
enable_broadcast: bool = False,
rpc_url: str = None,
**kwargs) -> str

Batch sign transactions for multiple accounts

CreateWalletTool Objects

class CreateWalletTool(TurnkeyBaseTool)

Create a new wallet

execute

async def execute(wallet_name: str,
accounts_json: str = None,
mnemonic_length: str = "24",
**kwargs) -> str

Create a new wallet

GetWalletTool Objects

class GetWalletTool(TurnkeyBaseTool)

Get wallet information by wallet ID

execute

async def execute(wallet_id: str, **kwargs) -> str

Get wallet information

CreateWalletAccountsTool Objects

class CreateWalletAccountsTool(TurnkeyBaseTool)

Add accounts to an existing wallet

execute

async def execute(wallet_id: str, accounts_json: str, **kwargs) -> str

Add accounts to existing wallet

CompleteTransactionWorkflowTool Objects

class CompleteTransactionWorkflowTool(TurnkeyBaseTool)

Complete transaction workflow: build, sign, and optionally broadcast

execute

async def execute(sign_with: str,
to_address: str,
value_wei: str,
data_hex: str = "0x",
enable_broadcast: bool = False,
rpc_url: str = None,
**kwargs) -> str

Complete transaction workflow

get_turnkey_tools

def get_turnkey_tools() -> List[BaseTool]

Get all Turnkey tools

Module spoon_ai.tools.tool_manager

ToolManager Objects

class ToolManager()

reindex

def reindex() -> None

Rebuild the internal name->tool mapping. Useful if tools have been renamed dynamically.

Module spoon_ai.tools.neofs_tools

NeoFS Tools for spoon_ai framework

Simple wrappers around NeoFS client methods. Tools do NOT auto-create bearer tokens - Agent manages tokens. All parameters map directly to client method parameters.

get_shared_neofs_client

def get_shared_neofs_client() -> NeoFSClient

Get shared NeoFSClient instance for all NeoFS tools.

Returns the same client instance across all tool calls to ensure bearer token authentication works correctly.

CreateBearerTokenTool Objects

class CreateBearerTokenTool(BaseTool)

Create a bearer token for NeoFS operations

CreateContainerTool Objects

class CreateContainerTool(BaseTool)

Create a NeoFS container

UploadObjectTool Objects

class UploadObjectTool(BaseTool)

Upload object to container

DownloadObjectByIdTool Objects

class DownloadObjectByIdTool(BaseTool)

Download object by ID

GetObjectHeaderByIdTool Objects

class GetObjectHeaderByIdTool(BaseTool)

Get object header by ID

DownloadObjectByAttributeTool Objects

class DownloadObjectByAttributeTool(BaseTool)

Download object by attribute

GetObjectHeaderByAttributeTool Objects

class GetObjectHeaderByAttributeTool(BaseTool)

Get object header by attribute

DeleteObjectTool Objects

class DeleteObjectTool(BaseTool)

Delete an object

SearchObjectsTool Objects

class SearchObjectsTool(BaseTool)

Search objects in container

SetContainerEaclTool Objects

class SetContainerEaclTool(BaseTool)

Set eACL for container

GetContainerEaclTool Objects

class GetContainerEaclTool(BaseTool)

Get eACL for container

ListContainersTool Objects

class ListContainersTool(BaseTool)

List all containers

GetContainerInfoTool Objects

class GetContainerInfoTool(BaseTool)

Get container info

DeleteContainerTool Objects

class DeleteContainerTool(BaseTool)

Delete container

GetNetworkInfoTool Objects

class GetNetworkInfoTool(BaseTool)

Get network info

GetBalanceTool Objects

class GetBalanceTool(BaseTool)

Get balance for an address

Module spoon_ai.tools.rag_tools

Module spoon_ai.tools.x402_payment

X402PaymentHeaderTool Objects

class X402PaymentHeaderTool(BaseTool)

Create a signed X-PAYMENT header for a given resource.

X402PaywalledRequestTool Objects

class X402PaywalledRequestTool(BaseTool)

Fetch a paywalled resource, handling the x402 402 negotiation automatically.

Module spoon_ai.tools

Module spoon_ai.tools.mcp_tool

MCPTool Objects

class MCPTool(BaseTool, MCPClientMixin)

call_mcp_tool

async def call_mcp_tool(tool_name: str, **kwargs)

Override the mixin method to add tool-specific error handling.

list_available_tools

async def list_available_tools() -> list

List available tools from the MCP server.

Module spoon_ai.tools.base

ToolFailure Objects

class ToolFailure(Exception)

Exception to indicate a tool execution failure.

Module spoon_ai.graph.agent

GraphAgent implementation for the graph package.

Memory Objects

class Memory()

Memory implementation with persistent storage

clear

def clear()

Clear all messages and reset memory

add_message

def add_message(msg)

Add a message to memory

get_messages

def get_messages(limit: Optional[int] = None) -> List[Dict[str, Any]]

Get messages from memory

get_recent_messages

def get_recent_messages(hours: int = 24) -> List[Dict[str, Any]]

Get messages from the last N hours

search_messages

def search_messages(query: str, limit: int = 10) -> List[Dict[str, Any]]

Search messages containing the query

get_statistics

def get_statistics() -> Dict[str, Any]

Get memory statistics

set_metadata

def set_metadata(key: str, value: Any)

Set metadata

get_metadata

def get_metadata(key: str, default: Any = None) -> Any

Get metadata

MockMemory Objects

class MockMemory(Memory)

Alias for backward compatibility - now uses persistent memory

GraphAgent Objects

class GraphAgent()

search_memory

def search_memory(query: str, limit: int = 10) -> List[Dict[str, Any]]

Search memory for messages containing the query

get_recent_memory

def get_recent_memory(hours: int = 24) -> List[Dict[str, Any]]

Get recent messages from memory

get_memory_statistics

def get_memory_statistics() -> Dict[str, Any]

Get memory statistics

set_memory_metadata

def set_memory_metadata(key: str, value: Any)

Set memory metadata

get_memory_metadata

def get_memory_metadata(key: str, default: Any = None) -> Any

Get memory metadata

save_session

def save_session()

Manually save current session

load_session

def load_session(session_id: str)

Load a specific session

Module spoon_ai.graph.types

Typed structures for the graph package.

Module spoon_ai.graph.checkpointer

In-memory checkpointer for the graph package.

InMemoryCheckpointer Objects

class InMemoryCheckpointer()

iter_checkpoint_history

def iter_checkpoint_history(
config: Dict[str, Any]) -> Iterable[CheckpointTuple]

Return checkpoint tuples for the specified thread, newest last.

Module spoon_ai.graph.builder

Declarative builders and helpers for SpoonAI graphs.

Intent Objects

@dataclass
class Intent()

Result of intent analysis.

IntentAnalyzer Objects

class IntentAnalyzer()

LLM-powered intent analyzer.

Core stays generic; concrete prompts/parsers are supplied by callers.

AdaptiveStateBuilder Objects

class AdaptiveStateBuilder()

Construct initial graph state using query intent and optional parameters.

ParameterInferenceEngine Objects

class ParameterInferenceEngine()

LLM delegator for parameter extraction.

Core keeps this generic; applications provide formatting/parsing via options.

NodeSpec Objects

@dataclass
class NodeSpec()

Declarative node specification.

EdgeSpec Objects

@dataclass
class EdgeSpec()

Declarative edge specification.

end

target name or callable router

ParallelGroupSpec Objects

@dataclass
class ParallelGroupSpec()

Parallel group specification.

GraphTemplate Objects

@dataclass
class GraphTemplate()

Complete declarative template for a graph.

DeclarativeGraphBuilder Objects

class DeclarativeGraphBuilder()

Build StateGraph instances from declarative templates.

NodePlugin Objects

class NodePlugin()

Pluggable node provider.

NodePluginSystem Objects

class NodePluginSystem()

Registry and discovery for node plugins.

HighLevelGraphAPI Objects

class HighLevelGraphAPI()

Convenience facade for building graphs per query.

Module spoon_ai.graph.mcp_integration

Utility classes for intelligent MCP tool discovery and configuration.

Core graph components no longer hard-code external tools; instead, user code registers tool specifications and optional transport/configuration details via these helpers.

MCPToolSpec Objects

@dataclass
class MCPToolSpec()

Specification describing a desired MCP tool.

MCPConfigManager Objects

class MCPConfigManager()

Centralised configuration loader for MCP tools.

MCPToolDiscoveryEngine Objects

class MCPToolDiscoveryEngine()

Discover MCP tools based on registered intent mappings.

MCPIntegrationManager Objects

class MCPIntegrationManager()

High level coordinator for MCP tool usage within graphs.

Module spoon_ai.graph.exceptions

Graph engine exception definitions (public within graph package).

Module spoon_ai.graph.reducers

Reducers and validators for the graph package.

Module spoon_ai.graph.decorators

Decorators and executor for the graph package.

Module spoon_ai.graph.config

Configuration primitives for the SpoonAI graph engine.

RouterConfig Objects

@dataclass
class RouterConfig()

Controls how the graph chooses the next node after each execution step.

ParallelRetryPolicy Objects

@dataclass
class ParallelRetryPolicy()

Retry policy for individual nodes inside a parallel group.

ParallelGroupConfig Objects

@dataclass
class ParallelGroupConfig()

Controls how a parallel group executes and aggregates results.

quorum

floats in (0, 1] treated as ratio, ints as absolute

error_strategy

fail_fast, collect_errors, ignore_errors

GraphConfig Objects

@dataclass
class GraphConfig()

Top-level configuration applied to an entire graph instance.

Module spoon_ai.graph.engine

Graph engine: StateGraph, CompiledGraph, and interrupt API implementation.

BaseNode Objects

class BaseNode(ABC, Generic[State])

Base class for all graph nodes

__call__

@abstractmethod
async def __call__(state: State,
config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]

Execute the node logic

RunnableNode Objects

class RunnableNode(BaseNode[State])

Runnable node that wraps a function

__call__

async def __call__(state: State,
config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]

Execute the wrapped function

ToolNode Objects

class ToolNode(BaseNode[State])

Tool node for executing tools

__call__

async def __call__(state: State,
config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]

Execute tools based on state

ConditionNode Objects

class ConditionNode(BaseNode[State])

Conditional node for routing decisions

__call__

async def __call__(state: State,
config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]

Execute condition and return routing decision

interrupt

def interrupt(data: Dict[str, Any]) -> Any

Interrupt execution and wait for human input.

RouteRule Objects

class RouteRule()

Advanced routing rule for automatic path selection

matches

def matches(state: Dict[str, Any], query: str = "") -> bool

Check if this rule matches the current state/query

RunningSummary Objects

@dataclass
class RunningSummary()

Rolling conversation summary used by the summarisation node.

SummarizationNode Objects

class SummarizationNode(BaseNode[Dict[str, Any]])

Node that summarises conversation history before model invocation.

StateGraph Objects

class StateGraph(Generic[State])

add_node

def add_node(
node_name: str, node: Union[BaseNode[State],
Callable[[State], Any]]) -> "StateGraph"

Add a node to the graph

add_edge

def add_edge(
start_node: str,
end_node: str,
condition: Optional[Callable[[State], bool]] = None) -> "StateGraph"

Add an edge. When condition is provided, edge becomes conditional.

add_conditional_edges

def add_conditional_edges(start_node: str, condition: Callable[[State], str],
path_map: Dict[str, str]) -> "StateGraph"

Add conditional edges

set_entry_point

def set_entry_point(node_name: str) -> "StateGraph"

Set the entry point

add_tool_node

def add_tool_node(tools: List[Any], name: str = "tools") -> "StateGraph"

Add a tool node

add_conditional_node

def add_conditional_node(condition_func: Callable[[State], str],
name: str = "condition") -> "StateGraph"

Add a conditional node

add_parallel_group

def add_parallel_group(
group_name: str,
nodes: List[str],
config: Optional[Union[Dict[str, Any], ParallelGroupConfig]] = None
) -> "StateGraph"

Add a parallel execution group

add_routing_rule

def add_routing_rule(source_node: str,
condition: Union[str, Callable[[State, str], bool]],
target_node: str,
priority: int = 0) -> "StateGraph"

Add an intelligent routing rule

get_state

def get_state(
config: Optional[Dict[str, Any]] = None) -> Optional[StateSnapshot]

Fetch the latest (or specified) checkpoint snapshot for a thread.

get_state_history

def get_state_history(
config: Optional[Dict[str, Any]] = None) -> Iterable[StateSnapshot]

Return all checkpoints for the given thread, ordered by creation time.

add_pattern_routing

def add_pattern_routing(source_node: str,
pattern: str,
target_node: str,
priority: int = 0) -> "StateGraph"

Add pattern-based routing rule

set_intelligent_router

def set_intelligent_router(
router_func: Callable[[Dict[str, Any], str], str]) -> "StateGraph"

Set the intelligent router function

set_llm_router

def set_llm_router(router_func: Optional[Callable[[Dict[str, Any], str],
str]] = None,
config: Optional[Dict[str, Any]] = None) -> "StateGraph"

Set the LLM-powered router function

Arguments:

  • router_func - Custom LLM router function. If None, uses default LLM router.
  • config - Configuration for LLM router (model, temperature, max_tokens, etc.)

enable_llm_routing

def enable_llm_routing(
config: Optional[Dict[str, Any]] = None) -> "StateGraph"

Enable LLM-powered natural language routing

This automatically sets up LLM routing for the graph entry point.

compile

def compile(checkpointer: Optional[Any] = None) -> "CompiledGraph"

Compile the graph

get_graph

def get_graph() -> Dict[str, Any]

Get graph structure for visualization/debugging

CompiledGraph Objects

class CompiledGraph(Generic[State])

Compiled graph for execution

get_execution_metrics

def get_execution_metrics() -> Dict[str, Any]

Get aggregated execution metrics

Module spoon_ai.neofs.utils

SignatureError Objects

class SignatureError(Exception)

Raised when signature payload construction fails.

sign_bearer_token

def sign_bearer_token(bearer_token: str,
private_key_wif: str,
*,
wallet_connect: bool = True) -> tuple[str, str]

Returns (signature_hex, compressed_pubkey_hex)

  • wallet_connect=True: msg = WC format (with prefix/len/salt/postfix), hash=SHA-256 X-Bearer-Signature = <DER signature hex> + <16B salt hex> X-Bearer-Signature-Key = <compressed public key hex> URL needs to append ?walletConnect=true

Module spoon_ai.neofs.client

NeoFSClient Objects

class NeoFSClient()

set_container_eacl

def set_container_eacl(container_id: str,
eacl: Eacl,
*,
bearer_token: Optional[str] = None,
wallet_connect: bool = True) -> SuccessResponse

Set container eACL.

Arguments:

  • container_id - Container ID
  • eacl - eACL object
  • bearer_token - Optional Bearer Token (recommended for eACL operations)
  • wallet_connect - Whether to use wallet_connect mode (default True)

download_object_by_id

def download_object_by_id(container_id: str,
object_id: str,
*,
bearer_token: Optional[str] = None,
download: bool | None = None,
range_header: str | None = None) -> httpx.Response

Download object by ID. Bearer token is optional for public containers.

get_object_header_by_id

def get_object_header_by_id(container_id: str,
object_id: str,
*,
bearer_token: Optional[str] = None,
range_header: str | None = None) -> httpx.Response

Get object header by ID. Bearer token is optional for public containers.

download_object_by_attribute

def download_object_by_attribute(
container_id: str,
attr_key: str,
attr_val: str,
*,
bearer_token: Optional[str] = None,
download: bool | None = None,
range_header: str | None = None) -> httpx.Response

Download object by attribute. Bearer token is optional for public containers.

get_object_header_by_attribute

def get_object_header_by_attribute(
container_id: str,
attr_key: str,
attr_val: str,
*,
bearer_token: Optional[str] = None,
range_header: str | None = None) -> httpx.Response

Get object header by attribute. Bearer token is optional for public containers.

delete_object

def delete_object(container_id: str,
object_id: str,
*,
bearer_token: Optional[str] = None) -> SuccessResponse

Delete object. Bearer token is optional for public containers, required for eACL containers with DENY DELETE rule.

search_objects

def search_objects(container_id: str,
search_request: SearchRequest,
*,
bearer_token: Optional[str] = None,
cursor: str = "",
limit: int = 100) -> ObjectListV2

Search objects. Bearer token is optional for public containers.

NeoFSException Objects

class NeoFSException(Exception)

Base exception for the NeoFS client.

NeoFSAPIException Objects

class NeoFSAPIException(NeoFSException)

Raised when the API returns an error.

Module spoon_ai.neofs

NeoFS integration for Spoon Core.

Module spoon_ai.neofs.models

Pydantic models describing NeoFS REST API payloads.

NetworkInfo Objects

class NetworkInfo(BaseModel)

Describes network configuration fees reported by the gateway.

Module spoon_ai.agents.toolcall

ToolCallAgent Objects

class ToolCallAgent(ReActAgent)

tool_choices

type: ignore

mcp_tools_cache_ttl

5 minutes TTL

run

async def run(request: Optional[str] = None) -> str

Override run method to handle finish_reason termination specially.

step

async def step() -> str

Override the step method to handle finish_reason termination properly.

Module spoon_ai.agents.react

Module spoon_ai.agents.mcp_client_mixin

MCPClientMixin Objects

class MCPClientMixin()

get_session

@asynccontextmanager
async def get_session()

Get a session with robust resource management and cleanup.

Features:

  • Automatic session reuse per task
  • Resource limits to prevent exhaustion
  • Proper cleanup on cancellation/failure
  • Periodic cleanup of stale sessions

list_mcp_tools

async def list_mcp_tools()

Get the list of available tools from the MCP server

call_mcp_tool

async def call_mcp_tool(tool_name: str, **kwargs)

Call a tool on the MCP server

send_mcp_message

async def send_mcp_message(recipient: str,
message: Union[str, Dict[str, Any]],
topic: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None) -> bool

Send a message to the MCP system

Arguments:

  • recipient - Recipient ID
  • message - Message content (string or dictionary)
  • topic - Message topic
  • metadata - Additional metadata

Returns:

  • bool - Whether the message was sent successfully

cleanup

async def cleanup()

Enhanced cleanup method with comprehensive resource cleanup.

get_session_stats

def get_session_stats() -> Dict[str, Any]

Get session statistics for monitoring.

Module spoon_ai.agents.spoon_react_mcp

SpoonReactMCP Objects

class SpoonReactMCP(SpoonReactAI)

list_mcp_tools

async def list_mcp_tools()

Return MCP tools from available_tools manager

Module spoon_ai.agents.monitor

Module spoon_ai.agents.rag

RetrievalMixin Objects

class RetrievalMixin()

Mixin class for retrieval-augmented generation functionality

initialize_retrieval_client

def initialize_retrieval_client(backend: str = 'chroma', **kwargs)

Initialize the retrieval client if it doesn't exist

add_documents

def add_documents(documents, backend: str = 'chroma', **kwargs)

Add documents to the retrieval system

retrieve_relevant_documents

def retrieve_relevant_documents(query, k=5, backend: str = 'chroma', **kwargs)

Retrieve relevant documents for a query

get_context_from_query

def get_context_from_query(query)

Get context string from relevant documents for a query

Module spoon_ai.agents.custom_agent

CustomAgent Objects

class CustomAgent(ToolCallAgent)

Custom Agent class allowing users to create their own agents and add custom tools

Usage: Create custom agent and add tools: agent = CustomAgent(name="my_agent", description="My custom agent") agent.add_tool(MyCustomTool()) result = await agent.run("Use my custom tool")

add_tool

def add_tool(tool: BaseTool) -> None

Add a tool to the agent with validation.

Arguments:

  • tool - Tool instance to add

Raises:

  • ValueError - If tool is invalid or already exists

add_tools

def add_tools(tools: List[BaseTool]) -> None

Add multiple tools to the agent with atomic operation.

Arguments:

  • tools - List of tool instances to add

Raises:

  • ValueError - If any tool is invalid

remove_tool

def remove_tool(tool_name: str) -> bool

Remove a tool from the agent.

Arguments:

  • tool_name - Name of the tool to remove

Returns:

  • bool - True if tool was removed, False if not found

list_tools

def list_tools() -> List[str]

List all available tools in the agent.

Returns:

List of tool names, empty list if no tools

get_tool_info

def get_tool_info() -> Dict[str, Dict[str, Any]]

Get detailed information about all tools.

Returns:

Dictionary with tool names as keys and tool info as values

validate_tools

def validate_tools() -> Dict[str, Any]

Validate all current tools and return validation report.

Returns:

Dictionary with validation results

run

async def run(request: Optional[str] = None) -> str

Run the agent with enhanced tool validation.

Arguments:

  • request - User request

Returns:

Processing result

clear

def clear()

Enhanced clear method with proper tool state management.

Module spoon_ai.agents.graph_agent

Graph-based agent implementation for SpoonOS.

This module provides the GraphAgent class that executes StateGraph workflows, integrating the graph execution system with the existing agent architecture.

GraphAgent Objects

class GraphAgent(BaseAgent)

An agent that executes StateGraph workflows.

This agent provides a bridge between the existing SpoonOS agent architecture and the new graph-based execution system. It allows complex, stateful workflows to be defined as graphs and executed with proper state management.

Key Features:

  • Executes StateGraph workflows
  • Maintains compatibility with existing agent interfaces
  • Provides detailed execution logging and error handling
  • Supports both sync and async node functions

__init__

def __init__(**kwargs)

Initialize the GraphAgent.

Arguments:

  • graph - StateGraph instance to execute
  • **kwargs - Additional arguments passed to BaseAgent

Raises:

  • ValueError - If no graph is provided

validate_graph

@validator('graph')
def validate_graph(cls, v)

Validate that the provided graph is a StateGraph instance.

run

async def run(request: Optional[str] = None) -> str

Execute the graph workflow.

This method overrides the base run method to invoke the compiled graph instead of the traditional step-based execution loop.

Arguments:

  • request - Optional input request to include in initial state

Returns:

String representation of the execution result

Raises:

  • RuntimeError - If agent is not in IDLE state
  • GraphExecutionError - If graph execution fails

step

async def step() -> str

Step method for compatibility with BaseAgent.

Since GraphAgent uses graph execution instead of step-based execution, this method is not used in normal operation but is required by the BaseAgent interface.

Returns:

Status message indicating graph-based execution

get_execution_history

def get_execution_history() -> list

Get the execution history from the last graph run.

Returns:

List of execution steps with metadata

get_execution_metadata

def get_execution_metadata() -> Dict[str, Any]

Get metadata from the last execution.

Returns:

Dictionary containing execution metadata

clear_state

def clear_state()

Clear preserved state and execution history.

update_initial_state

def update_initial_state(updates: Dict[str, Any])

Update the initial state for future executions.

Arguments:

  • updates - Dictionary of state updates to merge

set_preserve_state

def set_preserve_state(preserve: bool)

Enable or disable state preservation between runs.

Arguments:

  • preserve - Whether to preserve state between runs

Module spoon_ai.agents

Module spoon_ai.agents.spoon_react

create_configured_chatbot

def create_configured_chatbot()

Create a ChatBot instance with intelligent provider selection.

SpoonReactAI Objects

class SpoonReactAI(ToolCallAgent)

__init__

def __init__(**kwargs)

Initialize SpoonReactAI with both ToolCallAgent and MCPClientMixin initialization

initialize

async def initialize(__context: Any = None)

Initialize async components and subscribe to topics

run

async def run(request: Optional[str] = None) -> str

Ensure prompts reflect current tools before running.

Module spoon_ai.agents.base

ThreadSafeOutputQueue Objects

class ThreadSafeOutputQueue()

Thread-safe output queue with fair access and timeout protection

get

async def get(timeout: Optional[float] = 30.0) -> Any

Get item with timeout and fair access

BaseAgent Objects

class BaseAgent(BaseModel, ABC)

Thread-safe base class for all agents with proper concurrency handling.

add_message

async def add_message(role: Literal["user", "assistant", "tool"],
content: str,
tool_call_id: Optional[str] = None,
tool_calls: Optional[List[ToolCall]] = None,
tool_name: Optional[str] = None,
timeout: Optional[float] = None) -> None

Thread-safe message addition with timeout protection

state_context

@asynccontextmanager
async def state_context(new_state: AgentState,
timeout: Optional[float] = None)

Thread-safe state context manager with deadlock prevention. Acquires the state lock only to perform quick transitions, not for the duration of the work inside the context, avoiding long-held locks and false timeouts during network calls.

run

async def run(request: Optional[str] = None,
timeout: Optional[float] = None) -> str

Thread-safe run method with proper concurrency control and callback support.

step

async def step(run_id: Optional[uuid.UUID] = None) -> str

Override this method in subclasses - now with step-level locking and callback support.

is_stuck

async def is_stuck() -> bool

Thread-safe stuck detection

handle_stuck_state

async def handle_stuck_state()

Thread-safe stuck state handling

add_documents

def add_documents(documents) -> None

Store documents on the agent so CLI load-docs works without RAG mixin.

This default implementation keeps the documents in-memory under self._loaded_documents. Agents that support retrieval should override this method to index documents into their vector store.

save_chat_history

def save_chat_history()

Thread-safe chat history saving

stream

async def stream(timeout: Optional[float] = None)

Thread-safe streaming with proper cleanup and timeout

process_mcp_message

async def process_mcp_message(content: Any,
sender: str,
message: Dict[str, Any],
agent_id: str,
timeout: Optional[float] = None)

Thread-safe MCP message processing with timeout protection

shutdown

async def shutdown(timeout: float = 30.0)

Graceful shutdown with cleanup of active operations

get_diagnostics

def get_diagnostics() -> Dict[str, Any]

Get diagnostic information about the agent's state

Module spoon_ai.rag.embeddings

HashEmbeddingClient Objects

class HashEmbeddingClient(EmbeddingClient)

Deterministic offline embedding via hashing.

Produces fixed-length vectors in [0,1] normalized range. Not semantically meaningful but stable for tests and offline demos.

get_embedding_client

def get_embedding_client(
provider: Optional[str],
*,
openai_api_key: Optional[str] = None,
openai_model: str = "text-embedding-3-small") -> EmbeddingClient

Create an embedding client.

Provider selection rules:

  • provider is None/"auto": pick the first configured embeddings provider using a dedicated priority order (OpenAI > OpenRouter > Gemini).
  • provider is "openai" / "openrouter" / "gemini" / "ollama": force that provider (uses core env config when applicable).
  • provider is "openai_compatible": use OpenAI-compatible embeddings via RAG_EMBEDDINGS_* env vars.
  • otherwise: deterministic hash embeddings (offline).

Module spoon_ai.rag.loader

Module spoon_ai.rag.retriever

Module spoon_ai.rag.qa

Module spoon_ai.rag.vectorstores.faiss_store

FaissVectorStore Objects

class FaissVectorStore(VectorStore)

FAISS-backed local vector store (cosine via inner product + L2 norm).

Module spoon_ai.rag.vectorstores.chroma_store

Module spoon_ai.rag.vectorstores.pinecone_store

Module spoon_ai.rag.vectorstores.qdrant_store

Module spoon_ai.rag.vectorstores.registry

get_vector_store

def get_vector_store(backend: Optional[str] = None) -> VectorStore

Return a vector store by backend name.

Backends:

  • faiss: local/offline (mapped to in-memory cosine store)
  • pinecone: cloud Pinecone (requires PINECONE_API_KEY)
  • qdrant: local/cloud Qdrant (requires qdrant-client, default http://localhost:6333)
  • chroma: local Chroma (requires chromadb)

Module spoon_ai.rag.vectorstores

Module spoon_ai.rag.vectorstores.base

VectorStore Objects

class VectorStore(ABC)

query

@abstractmethod
def query(
*,
collection: str,
query_embeddings: List[List[float]],
top_k: int = 5,
filter: Optional[Dict] = None) -> List[List[Tuple[str, float, Dict]]]

Return per-query list of (id, score, metadata). Higher score is better.

Module spoon_ai.rag.config

RagConfig Objects

@dataclass
class RagConfig()

backend

faiss|pinecone|qdrant|chroma

Module spoon_ai.rag

Module spoon_ai.rag.index

Module spoon_ai.prompts.toolcall

Module spoon_ai.prompts

Module spoon_ai.prompts.spoon_react

Module spoon_ai.turnkey.client

Turnkey Objects

class Turnkey()

Turnkey API client class for managing blockchain private keys and wallet operations.

__init__

def __init__(base_url=None,
api_public_key=None,
api_private_key=None,
org_id=None)

Initialize Turnkey client.

Arguments:

  • base_url str - Turnkey API base URL (defaults from .env or default value).
  • api_public_key str - Turnkey API public key.
  • api_private_key str - Turnkey API private key.
  • org_id str - Turnkey organization ID.

Raises:

  • ValueError - If required configuration parameters are missing.

whoami

def whoami()

Call whoami API to get organization information.

Returns:

  • dict - JSON response containing organization information.

import_private_key

def import_private_key(user_id,
private_key_name,
encrypted_bundle,
curve="CURVE_SECP256K1",
address_formats=["ADDRESS_FORMAT_ETHEREUM"])

Import private key to Turnkey.

Arguments:

  • user_id str - User ID.
  • private_key_name str - Private key name.
  • encrypted_bundle str - Encrypted private key bundle.
  • curve str - Elliptic curve type, defaults to CURVE_SECP256K1.
  • address_formats list - Address format list, defaults to ["ADDRESS_FORMAT_ETHEREUM"].

Returns:

  • dict - JSON response containing imported private key information.

sign_evm_transaction

def sign_evm_transaction(sign_with, unsigned_tx)

Sign EVM transaction using Turnkey.

Arguments:

  • sign_with str - Signing identity (wallet account address / private key address / private key ID).
  • unsigned_tx str - Raw unsigned transaction (hex string).

Returns:

sign_typed_data

def sign_typed_data(sign_with, typed_data)

Sign EIP-712 structured data.

Arguments:

  • sign_with str - Signing identity (wallet account address / private key address / private key ID).
  • typed_data dict|str - EIP-712 structure (domain/types/message) or its JSON string.

Returns:

  • dict - Activity response, result contains r/s/v.

Notes:

  • encoding uses PAYLOAD_ENCODING_EIP712
  • hashFunction uses HASH_FUNCTION_NOT_APPLICABLE (server completes EIP-712 spec hashing)

sign_message

def sign_message(sign_with, message, use_keccak256=True)

Sign arbitrary message (defaults to KECCAK256 following Ethereum convention).

Arguments:

  • sign_with str - Signing identity (wallet account address / private key address / private key ID).
  • message str|bytes - Text to be signed; bytes will be decoded as UTF-8.
  • use_keccak256 bool - Whether to use KECCAK256 as hash function (default True).

Returns:

  • dict - Activity response, result contains r/s/v.

get_activity

def get_activity(activity_id)

Query Activity details.

Arguments:

  • activity_id str - Activity ID.

Returns:

list_activities

def list_activities(limit=None,
before=None,
after=None,
filter_by_status=None,
filter_by_type=None)

List activities within organization (paginated).

Arguments:

  • limit str|None - Number per page.
  • before str|None - Pagination cursor (before).
  • after str|None - Pagination cursor (after).
  • filter_by_status list|None - Filter by activity status (e.g., ['ACTIVITY_STATUS_COMPLETED']).
  • filter_by_type list|None - Filter by activity type (e.g., ['ACTIVITY_TYPE_SIGN_TRANSACTION_V2']).

Returns:

get_policy_evaluations

def get_policy_evaluations(activity_id)

Query policy evaluation results for an Activity (if available).

Arguments:

  • activity_id str - Activity ID.

Returns:

get_private_key

def get_private_key(private_key_id)

Query information for specified private key.

Arguments:

  • private_key_id str - Private key ID.

Returns:

  • dict - JSON response containing private key information.

create_wallet

def create_wallet(wallet_name, accounts, mnemonic_length=24)

Create new wallet.

Arguments:

  • wallet_name str - Wallet name.
  • accounts list - Account configuration list, each account contains curve, pathFormat, path, addressFormat.
  • mnemonic_length int - Mnemonic length (default 24).

Returns:

  • dict - JSON response containing new wallet information.

create_wallet_accounts

def create_wallet_accounts(wallet_id, accounts)

Add accounts to existing wallet.

Arguments:

  • wallet_id str - Wallet ID.
  • accounts list - New account configuration list, each account contains curve, pathFormat, path, addressFormat.

Returns:

  • dict - JSON response containing new account information.

get_wallet

def get_wallet(wallet_id)

Query information for specified wallet.

Arguments:

  • wallet_id str - Wallet ID.

Returns:

  • dict - JSON response containing wallet information.

get_wallet_account

def get_wallet_account(wallet_id, address=None, path=None)

Query information for specified wallet account.

Arguments:

  • wallet_id str - Wallet ID.
  • address str, optional - Account address.
  • path str, optional - Account path (e.g., m/44'/60'/0'/0/0).

Returns:

  • dict - JSON response containing account information.

Raises:

  • ValueError - If neither address nor path is provided.

list_wallets

def list_wallets()

List all wallets in the organization.

Returns:

  • dict - JSON response containing wallet list.

list_wallet_accounts

def list_wallet_accounts(wallet_id, limit=None, before=None, after=None)

List account list for specified wallet.

Arguments:

  • wallet_id str - Wallet ID.
  • limit str, optional - Number of accounts returned per page.
  • before str, optional - Pagination cursor, returns accounts before this ID.
  • after str, optional - Pagination cursor, returns accounts after this ID.

Returns:

  • dict - JSON response containing account list.

init_import_wallet

def init_import_wallet(user_id)

Initialize wallet import process, generate import_bundle.

Arguments:

  • user_id str - User ID.

Returns:

  • dict - JSON response containing import_bundle.

encrypt_wallet

def encrypt_wallet(mnemonic,
user_id,
import_bundle,
encryption_key_name="demo-encryption-key")

Encrypt mnemonic using Turnkey CLI, generate encrypted_bundle.

Arguments:

  • mnemonic str - Mnemonic phrase (12/15/18/21/24 words).
  • user_id str - User ID.
  • import_bundle str - import_bundle obtained from init_import_wallet.
  • encryption_key_name str - Encryption key name, defaults to demo-encryption-key.

Returns:

  • str - Encrypted encrypted_bundle.

Raises:

  • RuntimeError - If CLI command fails or turnkey CLI is not installed.

encrypt_private_key

def encrypt_private_key(private_key,
user_id,
import_bundle,
key_format="hexadecimal",
encryption_key_name="demo-encryption-key")

Encrypt private key using Turnkey CLI, generate encrypted_bundle, equivalent to: turnkey encrypt --import-bundle-input "./import_bundle.txt" --plaintext-input /dev/fd/3 --key-format "hexadecimal" --encrypted-bundle-output "./encrypted_bundle.txt"

Arguments:

  • private_key str - Private key string (hexadecimal or Solana format).
  • user_id str - User ID.
  • import_bundle str - import_bundle obtained from init_import_private_key.
  • key_format str - Private key format, defaults to "hexadecimal" (supports "hexadecimal", "solana").
  • encryption_key_name str - Encryption key name, defaults to "demo-encryption-key".

Returns:

  • str - Encrypted encrypted_bundle (Base64 encoded string).

Raises:

  • ValueError - If private_key, user_id, import_bundle is empty or key_format is invalid.
  • RuntimeError - If CLI command fails or turnkey CLI is not installed.

init_import_private_key

def init_import_private_key(user_id)

Initialize private key import process, generate import_bundle.

Arguments:

  • user_id str - User ID.

Returns:

  • dict - JSON response containing import_bundle.

import_wallet

def import_wallet(user_id, wallet_name, encrypted_bundle, accounts=None)

Import wallet to Turnkey.

Arguments:

  • user_id str - User ID.
  • wallet_name str - Wallet name.
  • encrypted_bundle str - Encrypted mnemonic bundle.
  • accounts list, optional - Account configuration list, each account contains curve, pathFormat, path, addressFormat.

Returns:

  • dict - JSON response containing imported wallet information.

Module spoon_ai.turnkey

Turnkey client integration for SpoonAI.

Provides Turnkey for secure signing via Turnkey API.

Module spoon_ai.callbacks.streaming_stdout

StreamingStdOutCallbackHandler Objects

class StreamingStdOutCallbackHandler(BaseCallbackHandler)

Callback handler that streams tokens to standard output.

on_llm_new_token

def on_llm_new_token(token: str, **kwargs: Any) -> None

Print token to stdout immediately.

Arguments:

  • token - The new token to print
  • **kwargs - Additional context (ignored)

on_llm_end

def on_llm_end(response: Any, **kwargs: Any) -> None

Print newline after LLM completes.

Arguments:

  • response - The complete LLM response (ignored)
  • **kwargs - Additional context (ignored)

Module spoon_ai.callbacks.statistics

StreamingStatisticsCallback Objects

class StreamingStatisticsCallback(BaseCallbackHandler, LLMManagerMixin)

Collect simple throughput statistics during streaming runs.

By default, the callback prints summary metrics when the LLM finishes. Consumers can provide a custom print_fn to redirect output, or disable printing entirely and read the public attributes after execution.

Module spoon_ai.callbacks.stream_event

StreamEventCallbackHandler Objects

class StreamEventCallbackHandler(BaseCallbackHandler)

Translate callback invocations into standardized stream events.

Module spoon_ai.callbacks.manager

CallbackManager Objects

class CallbackManager()

Lightweight dispatcher for callback handlers.

Module spoon_ai.callbacks

Callback system for streaming and event handling in Spoon AI.

This module provides a comprehensive callback system similar to LangChain's callbacks, enabling real-time monitoring and event handling for LLM calls, agent execution, tool invocation, and graph workflows.

Module spoon_ai.callbacks.base

RetrieverManagerMixin Objects

class RetrieverManagerMixin()

Mixin providing retriever callback hooks.

on_retriever_start

def on_retriever_start(run_id: UUID, query: Any, **kwargs: Any) -> Any

Run when a retriever begins execution.

on_retriever_end

def on_retriever_end(run_id: UUID, documents: Any, **kwargs: Any) -> Any

Run when a retriever finishes successfully.

on_retriever_error

def on_retriever_error(error: BaseException, *, run_id: UUID,
**kwargs: Any) -> Any

Run when a retriever raises an error.

LLMManagerMixin Objects

class LLMManagerMixin()

Mixin providing large language model callback hooks.

on_llm_start

def on_llm_start(run_id: UUID, messages: List[Message], **kwargs: Any) -> Any

Run when an LLM or chat model begins execution.

on_llm_new_token

def on_llm_new_token(token: str,
*,
chunk: Optional[LLMResponseChunk] = None,
run_id: Optional[UUID] = None,
**kwargs: Any) -> Any

Run for each streamed token emitted by an LLM.

on_llm_end

def on_llm_end(response: LLMResponse, *, run_id: UUID, **kwargs: Any) -> Any

Run when an LLM finishes successfully.

on_llm_error

def on_llm_error(error: BaseException, *, run_id: UUID, **kwargs: Any) -> Any

Run when an LLM raises an error.

ChainManagerMixin Objects

class ChainManagerMixin()

Mixin providing chain-level callback hooks.

on_chain_start

def on_chain_start(run_id: UUID, inputs: Any, **kwargs: Any) -> Any

Run when a chain (Runnable) starts executing.

on_chain_end

def on_chain_end(run_id: UUID, outputs: Any, **kwargs: Any) -> Any

Run when a chain finishes successfully.

on_chain_error

def on_chain_error(error: BaseException, *, run_id: UUID,
**kwargs: Any) -> Any

Run when a chain raises an error.

ToolManagerMixin Objects

class ToolManagerMixin()

Mixin providing tool callback hooks.

on_tool_start

def on_tool_start(tool_name: str, tool_input: Any, *, run_id: UUID,
**kwargs: Any) -> Any

Run when a tool invocation begins.

on_tool_end

def on_tool_end(tool_name: str, tool_output: Any, *, run_id: UUID,
**kwargs: Any) -> Any

Run when a tool invocation succeeds.

on_tool_error

def on_tool_error(error: BaseException,
*,
run_id: UUID,
tool_name: Optional[str] = None,
**kwargs: Any) -> Any

Run when a tool invocation raises an error.

PromptManagerMixin Objects

class PromptManagerMixin()

Mixin providing prompt template callback hooks.

on_prompt_start

def on_prompt_start(run_id: UUID, inputs: Any, **kwargs: Any) -> Any

Run when a prompt template begins formatting.

on_prompt_end

def on_prompt_end(run_id: UUID, output: Any, **kwargs: Any) -> Any

Run when a prompt template finishes formatting.

on_prompt_error

def on_prompt_error(error: BaseException, *, run_id: UUID,
**kwargs: Any) -> Any

Run when prompt formatting raises an error.

BaseCallbackHandler Objects

class BaseCallbackHandler(LLMManagerMixin, ChainManagerMixin, ToolManagerMixin,
RetrieverManagerMixin, PromptManagerMixin, ABC)

Base class for SpoonAI callback handlers.

raise_error

Whether to re-raise exceptions originating from callbacks.

run_inline

Whether the callback prefers to run on the caller's event loop.

ignore_llm

@property
def ignore_llm() -> bool

Return True to skip LLM callbacks.

ignore_chain

@property
def ignore_chain() -> bool

Return True to skip chain callbacks.

ignore_tool

@property
def ignore_tool() -> bool

Return True to skip tool callbacks.

ignore_retriever

@property
def ignore_retriever() -> bool

Return True to skip retriever callbacks.

ignore_prompt

@property
def ignore_prompt() -> bool

Return True to skip prompt callbacks.

AsyncCallbackHandler Objects

class AsyncCallbackHandler(BaseCallbackHandler)

Async version of the callback handler base class.

Module spoon_ai.schema

Function Objects

class Function(BaseModel)

get_arguments_dict

def get_arguments_dict() -> dict

Parse arguments string to dictionary.

Returns:

  • dict - Parsed arguments as dictionary

create

@classmethod
def create(cls, name: str, arguments: Union[str, dict]) -> "Function"

Create Function with arguments as string or dict.

Arguments:

  • name - Function name
  • arguments - Function arguments as string or dict

Returns:

  • Function - Function instance with arguments as JSON string

AgentState Objects

class AgentState(str, Enum)

The state of the agent.

ToolChoice Objects

class ToolChoice(str, Enum)

Tool choice options

Role Objects

class Role(str, Enum)

Message role options

ROLE_TYPE

type: ignore

Message Objects

class Message(BaseModel)

Represents a chat message in the conversation

role

type: ignore

SystemMessage Objects

class SystemMessage(Message)

role

type: ignore

TOOL_CHOICE_TYPE

type: ignore

LLMConfig Objects

class LLMConfig(BaseModel)

Configuration for LLM providers

LLMResponse Objects

class LLMResponse(BaseModel)

Unified LLM response model

text

Original text response

LLMResponseChunk Objects

class LLMResponseChunk(BaseModel)

Enhanced LLM streaming response chunk.

Module spoon_ai.memory.utils

Memory helpers shared across Mem0 demos and utilities.

extract_memories

def extract_memories(result: Any) -> List[str]

Normalize Mem0 search/get responses into a list of memory strings. Supports common shapes: {"memories": [...]}, {"results": [...]}, {"data": [...]}, list, or scalar.

extract_first_memory_id

def extract_first_memory_id(result: Any) -> Optional[str]

Pull the first memory id from Mem0 responses. Supports common id fields: id, _id, memory_id, uuid.

Module spoon_ai.memory.short_term_manager

Short-term memory management for conversation history.

TrimStrategy Objects

class TrimStrategy(str, Enum)

Strategy for trimming messages.

FROM_START

Remove oldest messages first

FROM_END

Remove newest messages first

MessageTokenCounter Objects

class MessageTokenCounter()

Approximate token counter aligned with LangChain semantics.

ShortTermMemoryManager Objects

class ShortTermMemoryManager()

Manager for short-term conversation memory with advanced operations.

trim_messages

async def trim_messages(messages: List[Message],
max_tokens: int,
strategy: TrimStrategy = TrimStrategy.FROM_END,
keep_system: bool = True,
model: Optional[str] = None) -> List[Message]

Trim messages using a LangChain-style heuristic.

summarize_messages

async def summarize_messages(
messages: List[Message],
max_tokens_before_summary: int,
messages_to_keep: int = 5,
summary_model: Optional[str] = None,
llm_manager=None,
llm_provider: Optional[str] = None,
existing_summary: str = ""
) -> Tuple[List[Message], List[RemoveMessage], Optional[str]]

Summarize earlier messages and emit removal directives.

Module spoon_ai.memory.remove_message

Helpers for emitting message-removal directives.

RemoveMessage Objects

class RemoveMessage(BaseModel)

Lightweight message that signals another message should be removed.

Module spoon_ai.memory

Short-term memory management for conversation history.

This module provides memory management utilities for maintaining and optimizing conversation history in chat applications.

Module spoon_ai.memory.mem0_client

SpoonMem0 Objects

class SpoonMem0()

Lightweight wrapper around Mem0's MemoryClient with safe defaults.

add_text

def add_text(data: str,
user_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None) -> None

Convenience helper for adding a single text memory.

get_all_memory

def get_all_memory(user_id: Optional[str] = None,
limit: Optional[int] = None) -> List[str]

Retrieve all memories for a user (subject to backend limits).