Table of Contents
- spoon_ai
- spoon_ai.graph
- spoon_ai.llm.vlm_provider.gemini
- spoon_ai.llm.vlm_provider.base
- spoon_ai.llm.factory
- spoon_ai.llm.monitoring
- spoon_ai.llm.response_normalizer
- spoon_ai.llm.cache
- spoon_ai.llm.interface
- spoon_ai.llm.providers.deepseek_provider
- spoon_ai.llm.providers.gemini_provider
- spoon_ai.llm.providers.openai_compatible_provider
- spoon_ai.llm.providers.ollama_provider
- spoon_ai.llm.providers.openai_provider
- spoon_ai.llm.providers.anthropic_provider
- spoon_ai.llm.providers.openrouter_provider
- spoon_ai.llm.providers
- spoon_ai.llm.manager
- spoon_ai.llm.registry
- spoon_ai.llm.config
- spoon_ai.llm
- spoon_ai.llm.errors
- spoon_ai.llm.base
- spoon_ai.utils.utils
- spoon_ai.utils.config_manager
- spoon_ai.utils.config
- spoon_ai.utils
- spoon_ai.utils.streaming
- spoon_ai.runnables.events
- spoon_ai.runnables
- spoon_ai.runnables.base
- spoon_ai.payments.server
- spoon_ai.payments.cli
- spoon_ai.payments.facilitator_client
- spoon_ai.payments.exceptions
- spoon_ai.payments.x402_service
- spoon_ai.payments.config
- spoon_ai.payments.app
- spoon_ai.payments
- spoon_ai.payments.models
- spoon_ai.chat
- spoon_ai.identity.did_models
- spoon_ai.identity.attestation
- spoon_ai.identity.storage_client
- spoon_ai.identity.did_resolver
- spoon_ai.identity.erc8004_abi
- spoon_ai.identity
- spoon_ai.identity.erc8004_client
- spoon_ai.bridge.eth_neofs_indexer
- spoon_ai.bridge
- spoon_ai.tools.turnkey_tools
- TurnkeyBaseTool
- SignEVMTransactionTool
- SignMessageTool
- SignTypedDataTool
- BroadcastTransactionTool
- ListWalletsTool
- ListWalletAccountsTool
- GetActivityTool
- ListActivitiesTool
- WhoAmITool
- BuildUnsignedEIP1559TxTool
- ListAllAccountsTool
- BatchSignTransactionsTool
- CreateWalletTool
- GetWalletTool
- CreateWalletAccountsTool
- CompleteTransactionWorkflowTool
- get_turnkey_tools
- spoon_ai.tools.tool_manager
- spoon_ai.tools.neofs_tools
- get_shared_neofs_client
- CreateBearerTokenTool
- CreateContainerTool
- UploadObjectTool
- DownloadObjectByIdTool
- GetObjectHeaderByIdTool
- DownloadObjectByAttributeTool
- GetObjectHeaderByAttributeTool
- DeleteObjectTool
- SearchObjectsTool
- SetContainerEaclTool
- GetContainerEaclTool
- ListContainersTool
- GetContainerInfoTool
- DeleteContainerTool
- GetNetworkInfoTool
- GetBalanceTool
- spoon_ai.tools.rag_tools
- spoon_ai.tools.x402_payment
- spoon_ai.tools
- spoon_ai.tools.mcp_tool
- spoon_ai.tools.base
- spoon_ai.graph.agent
- spoon_ai.graph.types
- spoon_ai.graph.checkpointer
- spoon_ai.graph.builder
- spoon_ai.graph.mcp_integration
- spoon_ai.graph.exceptions
- spoon_ai.graph.reducers
- spoon_ai.graph.decorators
- spoon_ai.graph.config
- spoon_ai.graph.engine
- spoon_ai.neofs.utils
- spoon_ai.neofs.client
- spoon_ai.neofs
- spoon_ai.neofs.models
- spoon_ai.agents.toolcall
- spoon_ai.agents.react
- spoon_ai.agents.mcp_client_mixin
- spoon_ai.agents.spoon_react_mcp
- spoon_ai.agents.monitor
- spoon_ai.agents.rag
- spoon_ai.agents.custom_agent
- spoon_ai.agents.graph_agent
- spoon_ai.agents
- spoon_ai.agents.spoon_react
- spoon_ai.agents.base
- spoon_ai.rag.embeddings
- spoon_ai.rag.loader
- spoon_ai.rag.retriever
- spoon_ai.rag.qa
- spoon_ai.rag.vectorstores.faiss_store
- spoon_ai.rag.vectorstores.chroma_store
- spoon_ai.rag.vectorstores.pinecone_store
- spoon_ai.rag.vectorstores.qdrant_store
- spoon_ai.rag.vectorstores.registry
- spoon_ai.rag.vectorstores
- spoon_ai.rag.vectorstores.base
- spoon_ai.rag.config
- spoon_ai.rag
- spoon_ai.rag.index
- spoon_ai.prompts.toolcall
- spoon_ai.prompts
- spoon_ai.prompts.spoon_react
- spoon_ai.turnkey.client
- Turnkey
- __init__
- whoami
- import_private_key
- sign_evm_transaction
- sign_typed_data
- sign_message
- get_activity
- list_activities
- get_policy_evaluations
- get_private_key
- create_wallet
- create_wallet_accounts
- get_wallet
- get_wallet_account
- list_wallets
- list_wallet_accounts
- init_import_wallet
- encrypt_wallet
- encrypt_private_key
- init_import_private_key
- import_wallet
- Turnkey
- spoon_ai.turnkey
- spoon_ai.callbacks.streaming_stdout
- spoon_ai.callbacks.statistics
- spoon_ai.callbacks.stream_event
- spoon_ai.callbacks.manager
- spoon_ai.callbacks
- spoon_ai.callbacks.base
- spoon_ai.schema
- spoon_ai.memory.utils
- spoon_ai.memory.short_term_manager
- spoon_ai.memory.remove_message
- spoon_ai.memory
- spoon_ai.memory.mem0_client
Module spoon_ai
Module spoon_ai.graph
Graph-based execution system for SpoonOS agents.
This module provides a LangGraph-inspired framework with advanced features:
- State management with TypedDict and reducers
- LLM Manager integration
- Error handling and recovery
- Human-in-the-loop patterns
- Multi-agent coordination
- Comprehensive testing support
- Checkpointing and persistence
GraphExecutionError Objects
class GraphExecutionError(Exception)
Raised when graph execution encounters an error.
NodeExecutionError Objects
class NodeExecutionError(Exception)
Raised when a node fails to execute.
StateValidationError Objects
class StateValidationError(Exception)
Raised when state validation fails.
CheckpointError Objects
class CheckpointError(Exception)
Raised when checkpoint operations fail.
GraphConfigurationError Objects
class GraphConfigurationError(Exception)
Raised when graph configuration is invalid.
EdgeRoutingError Objects
class EdgeRoutingError(Exception)
Raised when edge routing fails.
InterruptError Objects
class InterruptError(Exception)
Raised when graph execution is interrupted for human input.
Command Objects
@dataclass
class Command()
Command object for controlling graph flow and state updates.
StateSnapshot Objects
@dataclass
class StateSnapshot()
Snapshot of graph state at a specific point in time.
InMemoryCheckpointer Objects
class InMemoryCheckpointer()
Simple in-memory checkpointer for development and testing.
This checkpointer stores state snapshots in memory and provides basic checkpoint management functionality. For production use, consider using persistent checkpointers like Redis or PostgreSQL.
__init__
def __init__(max_checkpoints_per_thread: int = 100)
Initialize the in-memory checkpointer.
Arguments:
max_checkpoints_per_thread- Maximum number of checkpoints to keep per thread
save_checkpoint
def save_checkpoint(thread_id: str, snapshot: StateSnapshot) -> None
Save a checkpoint for a thread.
Arguments:
thread_id- Unique identifier for the threadsnapshot- State snapshot to save
Raises:
CheckpointError- If checkpoint saving fails
get_checkpoint
def get_checkpoint(thread_id: str,
checkpoint_id: str = None) -> Optional[StateSnapshot]
Get a specific checkpoint or the latest one.
Arguments:
thread_id- Unique identifier for the threadcheckpoint_id- Optional specific checkpoint ID
Returns:
StateSnapshot or None if not found
Raises:
CheckpointError- If checkpoint retrieval fails
list_checkpoints
def list_checkpoints(thread_id: str) -> List[StateSnapshot]
List all checkpoints for a thread.
Arguments:
thread_id- Unique identifier for the thread
Returns:
List of state snapshots
Raises:
CheckpointError- If checkpoint listing fails
clear_thread
def clear_thread(thread_id: str) -> None
Clear all checkpoints for a thread.
Arguments:
thread_id- Unique identifier for the thread
get_stats
def get_stats() -> Dict[str, Any]
Get checkpointer statistics.
Returns:
Dictionary with statistics
add_messages
def add_messages(existing: List[Any], new: List[Any]) -> List[Any]
Reducer function for adding messages to a list.
interrupt
def interrupt(data: Dict[str, Any]) -> Any
Interrupt execution and wait for human input.
StateGraph Objects
class StateGraph()
Enhanced StateGraph with LangGraph-inspired features and SpoonOS integration.
Features:
- TypedDict state management with reducers
- LLM Manager integration
- Error handling and recovery
- Human-in-the-loop patterns
- Checkpointing and persistence
- Multi-agent coordination support
__init__
def __init__(state_schema: type, checkpointer: InMemoryCheckpointer = None)
Initialize the enhanced state graph.
Arguments:
state_schema- TypedDict class defining the state structurecheckpointer- Optional checkpointer for state persistence
add_node
def add_node(name: str, action: Callable) -> "StateGraph"
Add a node to the graph.
Arguments:
name- Unique identifier for the nodeaction- Function or coroutine that processes the state Should accept state dict and return dict of updates or Command
Returns:
Self for method chaining
Raises:
GraphConfigurationError- If node name already exists or is invalid
add_llm_node
def add_llm_node(
name: str,
system_prompt: str,
provider: Optional[str] = None,
model_params: Optional[Dict[str, Any]] = None) -> "StateGraph"
Add an LLM-powered node to the graph.
Arguments:
name- Unique identifier for the nodesystem_prompt- System prompt for the LLMprovider- Specific LLM provider to usemodel_params- Parameters for the LLM call
Returns:
Self for method chaining
add_edge
def add_edge(start_node: str, end_node: str) -> "StateGraph"
Add a direct, unconditional edge between two nodes.
Arguments:
start_node- Name of the source node (or "START")end_node- Name of the destination node (or "END")
Returns:
Self for method chaining
Raises:
GraphConfigurationError- If nodes don't exist or edge is invalid
add_conditional_edges
def add_conditional_edges(start_node: str,
condition: Callable[[Dict[str, Any]], str],
path_map: Dict[str, str]) -> "StateGraph"
Add conditional edges that route to different nodes based on state.
Arguments:
start_node- Name of the source nodecondition- Function that takes state and returns a key from path_mappath_map- Mapping from condition results to destination node names
Returns:
Self for method chaining
Raises:
GraphConfigurationError- If configuration is invalid
set_entry_point
def set_entry_point(node_name: str) -> "StateGraph"
Set the starting node for graph execution.
Arguments:
node_name- Name of the node to start execution from
Returns:
Self for method chaining
Raises:
GraphConfigurationError- If entry point node doesn't exist
compile
def compile() -> "CompiledGraph"
Compile the graph into an executable form.
Returns:
CompiledGraph instance ready for execution
Raises:
GraphConfigurationError- If graph configuration is invalid
CompiledGraph Objects
class CompiledGraph()
Executable version of a StateGraph with advanced features.
__init__
def __init__(graph: StateGraph)
Initialize with a compiled StateGraph.
invoke
async def invoke(initial_state: Optional[Dict[str, Any]] = None,
config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]
Execute the graph from the entry point.
stream
async def stream(initial_state: Optional[Dict[str, Any]] = None,
config: Optional[Dict[str, Any]] = None,
stream_mode: str = "values")
Stream graph execution with different modes.
get_execution_history
def get_execution_history() -> List[Dict[str, Any]]
Get the execution history for debugging and analysis.
get_state
def get_state(config: Dict[str, Any]) -> Optional[StateSnapshot]
Get the current state snapshot for a thread.
Module spoon_ai.llm.vlm_provider.gemini
GeminiConfig Objects
class GeminiConfig(LLMConfig)
Gemini Configuration
validate_api_key
@model_validator(mode='after')
def validate_api_key()
Validate that API key is provided
GeminiProvider Objects
@LLMFactory.register("gemini")
class GeminiProvider(LLMBase)
Gemini Provider Implementation
__init__
def __init__(config_path: str = "config/config.toml",
config_name: str = "chitchat")
Initialize Gemini Provider
Arguments:
config_path- Configuration file pathconfig_name- Configuration name
Raises:
ValueError- If GEMINI_API_KEY environment variable is not set
chat
async def chat(messages: List[Message],
system_msgs: Optional[List[Message]] = None,
response_modalities: Optional[List[str]] = None,
**kwargs) -> LLMResponse
Send chat request to Gemini and get response
Arguments:
messages- List of messagessystem_msgs- List of system messagesresponse_modalities- List of response modalities (optional, e.g. ['Text', 'Image'])**kwargs- Other parameters
Returns:
LLMResponse- LLM response
completion
async def completion(prompt: str, **kwargs) -> LLMResponse
Send text completion request to Gemini and get response
Arguments:
prompt- Prompt text**kwargs- Other parameters
Returns:
LLMResponse- LLM response
chat_with_tools
async def chat_with_tools(messages: List[Message],
system_msgs: Optional[List[Message]] = None,
tools: Optional[List[Dict]] = None,
tool_choice: Literal["none", "auto",
"required"] = "auto",
**kwargs) -> LLMResponse
Send chat request to Gemini that may contain tool calls and get response
Note: Gemini currently doesn't support tool calls, this method will use regular chat method
Arguments:
messages- List of messagessystem_msgs- List of system messagestools- List of tools (not supported by Gemini)tool_choice- Tool choice mode (not supported by Gemini)**kwargs- Other parameters
Returns:
LLMResponse- LLM response
generate_content
async def generate_content(model: Optional[str] = None,
contents: Union[str, List, types.Content,
types.Part] = None,
config: Optional[
types.GenerateContentConfig] = None,
**kwargs) -> LLMResponse
Directly call Gemini's generate_content interface
Arguments:
model- Model name (optional, will override model in configuration)contents- Request content, can be string, list, or types.Content/types.Part objectconfig- Generation configuration**kwargs- Other parameters
Returns:
LLMResponse- LLM response
Module spoon_ai.llm.vlm_provider.base
LLMConfig Objects
class LLMConfig(BaseModel)
Base class for LLM configuration
LLMResponse Objects
class LLMResponse(BaseModel)
Base class for LLM response
text
Original text response
LLMBase Objects
class LLMBase(ABC)
Base abstract class for LLM, defining interfaces that all LLM providers must implement
__init__
def __init__(config_path: str = "config/config.toml",
config_name: str = "llm")
Initialize LLM interface
Arguments:
config_path- Configuration file pathconfig_name- Configuration name
chat
@abstractmethod
async def chat(messages: List[Message],
system_msgs: Optional[List[Message]] = None,
**kwargs) -> LLMResponse
Send chat request to LLM and get response
Arguments:
messages- List of messagessystem_msgs- List of system messages**kwargs- Other parameters
Returns:
LLMResponse- LLM response
completion
@abstractmethod
async def completion(prompt: str, **kwargs) -> LLMResponse
Send text completion request to LLM and get response
Arguments:
prompt- Prompt text**kwargs- Other parameters
Returns:
LLMResponse- LLM response
chat_with_tools
@abstractmethod
async def chat_with_tools(messages: List[Message],
system_msgs: Optional[List[Message]] = None,
tools: Optional[List[Dict]] = None,
tool_choice: Literal["none", "auto",
"required"] = "auto",
**kwargs) -> LLMResponse
Send chat request that may contain tool calls to LLM and get response
Arguments:
messages- List of messagessystem_msgs- List of system messagestools- List of toolstool_choice- Tool selection mode**kwargs- Other parameters
Returns:
LLMResponse- LLM response
generate_image
async def generate_image(prompt: str, **kwargs) -> Union[str, List[str]]
Generate image (optional implementation)
Arguments:
prompt- Prompt text**kwargs- Other parameters
Returns:
Union[str, List[str]]: Image URL or list of URLs
reset_output_handler
def reset_output_handler()
Reset output handler
Module spoon_ai.llm.factory
LLMFactory Objects
class LLMFactory()
LLM factory class, used to create different LLM instances
register
@classmethod
def register(cls, provider_name: str)
Register LLM provider
Arguments:
provider_name- Provider name
Returns:
Decorator function
create
@classmethod
def create(cls,
provider: Optional[str] = None,
config_path: str = "config/config.toml",
config_name: str = "llm") -> LLMBase
Create LLM instance
Arguments:
provider- Provider name, if None, read from configuration fileconfig_path- Configuration file pathconfig_name- Configuration name
Returns:
LLMBase- LLM instance
Raises:
ValueError- If provider does not exist
Module spoon_ai.llm.monitoring
Comprehensive monitoring, debugging, and metrics collection for LLM operations.
RequestMetrics Objects
@dataclass
class RequestMetrics()
Metrics for a single LLM request.
ProviderStats Objects
@dataclass
class ProviderStats()
Aggregated statistics for a provider.
get
def get(key: str, default=None)
Get attribute value with default fallback for dictionary-like access.
Arguments:
key- Attribute namedefault- Default value if attribute doesn't exist
Returns:
Attribute value or default
success_rate
@property
def success_rate() -> float
Calculate success rate as a percentage.
avg_response_time
@property
def avg_response_time() -> float
Get average response time.
DebugLogger Objects
class DebugLogger()
Comprehensive logging and debugging system for LLM operations.
__init__
def __init__(max_history: int = 1000, enable_detailed_logging: bool = True)
Initialize debug logger.
Arguments:
max_history- Maximum number of requests to keep in historyenable_detailed_logging- Whether to enable detailed request/response logging
log_request
def log_request(provider: str, method: str, params: Dict[str, Any]) -> str
Log request with unique ID.
Arguments:
provider- Provider namemethod- Method being called (chat, completion, etc.)params- Request parameters
Returns:
str- Unique request ID
log_response
def log_response(request_id: str, response: LLMResponse,
duration: float) -> None
Log response with timing information.
Arguments:
request_id- Request ID from log_requestresponse- LLM response objectduration- Request duration in seconds
log_error
def log_error(request_id: str, error: Exception, context: Dict[str,
Any]) -> None
Log error with context.
Arguments:
request_id- Request ID from log_requesterror- Exception that occurredcontext- Additional error context
log_fallback
def log_fallback(from_provider: str, to_provider: str, reason: str) -> None
Log provider fallback event.
Arguments:
from_provider- Provider that failedto_provider- Provider being used as fallbackreason- Reason for fallback
get_request_history
def get_request_history(provider: Optional[str] = None,
limit: Optional[int] = None) -> List[RequestMetrics]
Get request history.
Arguments:
provider- Filter by provider (optional)limit- Maximum number of requests to return (optional)
Returns:
List[RequestMetrics]- List of request metrics
get_active_requests
def get_active_requests() -> List[RequestMetrics]
Get currently active requests.
Returns:
List[RequestMetrics]- List of active request metrics
clear_history
def clear_history() -> None
Clear request history.
MetricsCollector Objects
class MetricsCollector()
Collects and aggregates performance metrics for LLM providers.
__init__
def __init__(window_size: int = 3600)
Initialize metrics collector.
Arguments:
window_size- Time window in seconds for rolling metrics
record_request
def record_request(provider: str,
method: str,
duration: float,
success: bool,
tokens: int = 0,
model: str = '',
error: Optional[str] = None) -> None
Record request metrics.
Arguments:
provider- Provider namemethod- Method calledduration- Request duration in secondssuccess- Whether request was successfultokens- Number of tokens usedmodel- Model nameerror- Error message if failed
get_provider_stats
def get_provider_stats(provider: str) -> Optional[ProviderStats]
Get statistics for a specific provider.
Arguments:
provider- Provider name
Returns:
Optional[ProviderStats]- Provider statistics or None if not found
get_all_stats
def get_all_stats() -> Dict[str, ProviderStats]
Get statistics for all providers.
Returns:
Dict[str, ProviderStats]: Dictionary of provider statistics
get_rolling_metrics
def get_rolling_metrics(provider: Optional[str] = None,
method: Optional[str] = None) -> List[Dict[str, Any]]
Get rolling metrics with optional filtering.
Arguments:
provider- Filter by provider (optional)method- Filter by method (optional)
Returns:
List[Dict[str, Any]]: List of metrics
get_summary
def get_summary() -> Dict[str, Any]
Get overall summary statistics.
Returns:
Dict[str, Any]: Summary statistics
reset_stats
def reset_stats(provider: Optional[str] = None) -> None
Reset statistics.
Arguments:
provider- Reset specific provider only (optional)
get_debug_logger
def get_debug_logger() -> DebugLogger
Get global debug logger instance.
Returns:
DebugLogger- Global debug logger
get_metrics_collector
def get_metrics_collector() -> MetricsCollector
Get global metrics collector instance.
Returns:
MetricsCollector- Global metrics collector
Module spoon_ai.llm.response_normalizer
Response normalizer for ensuring consistent response formats across providers.
ResponseNormalizer Objects
class ResponseNormalizer()
Normalizes responses from different providers to ensure consistency.
normalize_response
def normalize_response(response: LLMResponse) -> LLMResponse
Normalize a response from any provider.
Arguments:
response- Raw LLM response
Returns:
LLMResponse- Normalized response
Raises:
ValidationError- If response cannot be normalized
validate_response
def validate_response(response: LLMResponse) -> bool
Validate that a response meets minimum requirements.
Arguments:
response- Response to validate
Returns:
bool- True if response is valid
Raises:
ValidationError- If response is invalid
add_provider_mapping
def add_provider_mapping(provider_name: str, normalizer_func) -> None
Add a custom normalizer for a new provider.
Arguments:
provider_name- Name of the providernormalizer_func- Function that takes and returns LLMResponse
get_supported_providers
def get_supported_providers() -> List[str]
Get list of providers with custom normalizers.
Returns:
List[str]- List of provider names
get_response_normalizer
def get_response_normalizer() -> ResponseNormalizer
Get global response normalizer instance.
Returns:
ResponseNormalizer- Global normalizer instance
Module spoon_ai.llm.cache
LLM Response Caching - Cache LLM responses to avoid redundant API calls.
LLMResponseCache Objects
class LLMResponseCache()
Cache for LLM responses to avoid redundant API calls.
__init__
def __init__(default_ttl: int = 3600, max_size: int = 1000)
Initialize the cache.
Arguments:
default_ttl- Default time-to-live in seconds (default: 1 hour)max_size- Maximum number of cached entries (default: 1000)
get
def get(messages: List[Message],
provider: Optional[str] = None,
**kwargs) -> Optional[LLMResponse]
Get cached response if available.
Arguments:
messages- List of conversation messagesprovider- Provider name (optional)**kwargs- Additional parameters
Returns:
Optional[LLMResponse]- Cached response if found and not expired, None otherwise
set
def set(messages: List[Message],
response: LLMResponse,
provider: Optional[str] = None,
ttl: Optional[int] = None,
**kwargs) -> None
Store response in cache.
Arguments:
messages- List of conversation messagesresponse- LLM response to cacheprovider- Provider name (optional)ttl- Time-to-live in seconds (optional, uses default if not provided)**kwargs- Additional parameters
clear
def clear() -> None
Clear all cached entries.
get_stats
def get_stats() -> Dict[str, Any]
Get cache statistics.
Returns:
Dict[str, Any]: Cache statistics including size, max_size, etc.
CachedLLMManager Objects
class CachedLLMManager()
Wrapper around LLMManager that adds response caching.
__init__
def __init__(llm_manager: LLMManager,
cache: Optional[LLMResponseCache] = None)
Initialize cached LLM manager.
Arguments:
llm_manager- The underlying LLMManager instancecache- Optional cache instance (creates new one if not provided)
chat
async def chat(messages: List[Message],
provider: Optional[str] = None,
use_cache: bool = True,
cache_ttl: Optional[int] = None,
**kwargs) -> LLMResponse
Send chat request with caching support.
Arguments:
messages- List of conversation messagesprovider- Specific provider to use (optional)use_cache- Whether to use cache (default: True)cache_ttl- Custom TTL for this request (optional)**kwargs- Additional parameters
Returns:
LLMResponse- LLM response (from cache or API)
chat_stream
async def chat_stream(messages: List[Message],
provider: Optional[str] = None,
callbacks: Optional[List] = None,
**kwargs)
Send streaming chat request (caching not supported for streaming).
Arguments:
messages- List of conversation messagesprovider- Specific provider to use (optional)callbacks- Optional callback handlers**kwargs- Additional parameters
Yields:
LLMResponseChunk- Streaming response chunks
clear_cache
def clear_cache() -> None
Clear the response cache.
get_cache_stats
def get_cache_stats() -> Dict[str, Any]
Get cache statistics.
Returns:
Dict[str, Any]: Cache statistics
Module spoon_ai.llm.interface
LLM Provider Interface - Abstract base class defining the unified interface for all LLM providers.
ProviderCapability Objects
class ProviderCapability(Enum)
Enumeration of capabilities that LLM providers can support.
ProviderMetadata Objects
@dataclass
class ProviderMetadata()
Metadata describing a provider's capabilities and limits.
LLMResponse Objects
@dataclass
class LLMResponse()
Enhanced LLM response with comprehensive metadata and debugging information.
LLMProviderInterface Objects
class LLMProviderInterface(ABC)
Abstract base class defining the unified interface for all LLM providers.
initialize
@abstractmethod
async def initialize(config: Dict[str, Any]) -> None
Initialize the provider with configuration.
Arguments:
config- Provider-specific configuration dictionary
Raises:
ConfigurationError- If configuration is invalid
chat
@abstractmethod
async def chat(messages: List[Message], **kwargs) -> LLMResponse
Send chat request to the provider.
Arguments:
messages- List of conversation messages**kwargs- Additional provider-specific parameters
Returns:
LLMResponse- Standardized response object
Raises:
ProviderError- If the request fails
chat_stream
@abstractmethod
async def chat_stream(messages: List[Message],
callbacks: Optional[List[BaseCallbackHandler]] = None,
**kwargs) -> AsyncIterator[LLMResponseChunk]
Send streaming chat request to the provider with callback support.
Arguments:
messages- List of conversation messagescallbacks- Optional list of callback handlers for real-time events**kwargs- Additional provider-specific parameters
Yields:
LLMResponseChunk- Structured streaming response chunks
Raises:
ProviderError- If the request fails
completion
@abstractmethod
async def completion(prompt: str, **kwargs) -> LLMResponse
Send completion request to the provider.
Arguments:
prompt- Text prompt for completion**kwargs- Additional provider-specific parameters
Returns:
LLMResponse- Standardized response object
Raises:
ProviderError- If the request fails
chat_with_tools
@abstractmethod
async def chat_with_tools(messages: List[Message], tools: List[Dict],
**kwargs) -> LLMResponse
Send chat request with tool support.
Arguments:
messages- List of conversation messagestools- List of available tools**kwargs- Additional provider-specific parameters
Returns:
LLMResponse- Standardized response object with potential tool calls
Raises:
ProviderError- If the request fails
get_metadata
@abstractmethod
def get_metadata() -> ProviderMetadata
Get provider metadata and capabilities.
Returns:
ProviderMetadata- Provider information and capabilities
health_check
@abstractmethod
async def health_check() -> bool
Check if provider is healthy and available.
Returns:
bool- True if provider is healthy, False otherwise
cleanup
@abstractmethod
async def cleanup() -> None
Cleanup resources and connections.
This method should be called when the provider is no longer needed.
Module spoon_ai.llm.providers.deepseek_provider
DeepSeek Provider implementation for the unified LLM interface. DeepSeek provides access to their models through an OpenAI-compatible API.
DeepSeekProvider Objects
@register_provider("deepseek", [
ProviderCapability.CHAT,
ProviderCapability.COMPLETION,
ProviderCapability.TOOLS,
ProviderCapability.STREAMING
])
class DeepSeekProvider(OpenAICompatibleProvider)
DeepSeek provider implementation using OpenAI-compatible API.
get_metadata
def get_metadata() -> ProviderMetadata
Get DeepSeek provider metadata.
Module spoon_ai.llm.providers.gemini_provider
Gemini Provider implementation for the unified LLM interface.
GeminiProvider Objects
@register_provider("gemini", [
ProviderCapability.CHAT,
ProviderCapability.COMPLETION,
ProviderCapability.STREAMING,
ProviderCapability.TOOLS,
ProviderCapability.IMAGE_GENERATION,
ProviderCapability.VISION
])
class GeminiProvider(LLMProviderInterface)
Gemini provider implementation.
initialize
async def initialize(config: Dict[str, Any]) -> None
Initialize the Gemini provider with configuration.
chat
async def chat(messages: List[Message], **kwargs) -> LLMResponse
Send chat request to Gemini.
chat_stream
async def chat_stream(messages: List[Message],
callbacks: Optional[List] = None,
**kwargs) -> AsyncIterator[LLMResponseChunk]
Send streaming chat request to Gemini with callback support.
Yields:
LLMResponseChunk- Structured streaming response chunks
completion
async def completion(prompt: str, **kwargs) -> LLMResponse
Send completion request to Gemini.
chat_with_tools
async def chat_with_tools(messages: List[Message], tools: List[Dict],
**kwargs) -> LLMResponse
Send chat request with tools to Gemini using native function calling.
get_metadata
def get_metadata() -> ProviderMetadata
Get Gemini provider metadata.
health_check
async def health_check() -> bool
Check if Gemini provider is healthy.
cleanup
async def cleanup() -> None
Cleanup Gemini provider resources.
Module spoon_ai.llm.providers.openai_compatible_provider
OpenAI Compatible Provider base class for providers that use OpenAI-compatible APIs. This includes OpenAI, OpenRouter, DeepSeek, and other providers with similar interfaces.
OpenAICompatibleProvider Objects
class OpenAICompatibleProvider(LLMProviderInterface)
Base class for OpenAI-compatible providers.
get_provider_name
def get_provider_name() -> str
Get the provider name. Should be overridden by subclasses.
get_default_base_url
def get_default_base_url() -> str
Get the default base URL. Should be overridden by subclasses.
get_default_model
def get_default_model() -> str
Get the default model. Should be overridden by subclasses.
get_additional_headers
def get_additional_headers(config: Dict[str, Any]) -> Dict[str, str]
Get additional headers for the provider. Can be overridden by subclasses.
initialize
async def initialize(config: Dict[str, Any]) -> None
Initialize the provider with configuration.
chat
async def chat(messages: List[Message], **kwargs) -> LLMResponse
Send chat request to the provider.
chat_stream
async def chat_stream(messages: List[Message],
callbacks: Optional[List[BaseCallbackHandler]] = None,
**kwargs) -> AsyncIterator[LLMResponseChunk]
Send streaming chat request with full callback support.
Yields:
LLMResponseChunk- Structured streaming response chunks
completion
async def completion(prompt: str, **kwargs) -> LLMResponse
Send completion request to the provider.
chat_with_tools
async def chat_with_tools(messages: List[Message], tools: List[Dict],
**kwargs) -> LLMResponse
Send chat request with tools to the provider.
get_metadata
def get_metadata() -> ProviderMetadata
Get provider metadata. Should be overridden by subclasses.
health_check
async def health_check() -> bool
Check if provider is healthy.
cleanup
async def cleanup() -> None
Cleanup provider resources.
Module spoon_ai.llm.providers.ollama_provider
Ollama Provider implementation for the unified LLM interface.
Ollama runs locally and exposes an HTTP API (default: http://localhost:11434). This provider supports chat, completion, and streaming.
Notes:
- Ollama does not require an API key; the configuration layer may still provide a placeholder api_key value for consistency.
- Tool calling is not implemented here.
OllamaProvider Objects
@register_provider(
"ollama",
[
ProviderCapability.CHAT,
ProviderCapability.COMPLETION,
ProviderCapability.STREAMING,
],
)
class OllamaProvider(LLMProviderInterface)
Local Ollama provider via HTTP.
Module spoon_ai.llm.providers.openai_provider
OpenAI Provider implementation for the unified LLM interface.
OpenAIProvider Objects
@register_provider("openai", [
ProviderCapability.CHAT,
ProviderCapability.COMPLETION,
ProviderCapability.TOOLS,
ProviderCapability.STREAMING
])
class OpenAIProvider(OpenAICompatibleProvider)
OpenAI provider implementation.
get_metadata
def get_metadata() -> ProviderMetadata
Get OpenAI provider metadata.
Module spoon_ai.llm.providers.anthropic_provider
Anthropic Provider implementation for the unified LLM interface.
AnthropicProvider Objects
@register_provider("anthropic", [
ProviderCapability.CHAT,
ProviderCapability.COMPLETION,
ProviderCapability.TOOLS,
ProviderCapability.STREAMING
])
class AnthropicProvider(LLMProviderInterface)
Anthropic provider implementation.
initialize
async def initialize(config: Dict[str, Any]) -> None
Initialize the Anthropic provider with configuration.
get_cache_metrics
def get_cache_metrics() -> Dict[str, int]
Get current cache performance metrics.
chat
async def chat(messages: List[Message], **kwargs) -> LLMResponse
Send chat request to Anthropic.
chat_stream
async def chat_stream(messages: List[Message],
callbacks: Optional[List] = None,
**kwargs) -> AsyncIterator[LLMResponseChunk]
Send streaming chat request to Anthropic with callback support.
Yields:
LLMResponseChunk- Structured streaming response chunks
completion
async def completion(prompt: str, **kwargs) -> LLMResponse
Send completion request to Anthropic.
chat_with_tools
async def chat_with_tools(messages: List[Message], tools: List[Dict],
**kwargs) -> LLMResponse
Send chat request with tools to Anthropic.
get_metadata
def get_metadata() -> ProviderMetadata
Get Anthropic provider metadata.
health_check
async def health_check() -> bool
Check if Anthropic provider is healthy.
cleanup
async def cleanup() -> None
Cleanup Anthropic provider resources.
Module spoon_ai.llm.providers.openrouter_provider
OpenRouter Provider implementation for the unified LLM interface. OpenRouter provides access to multiple LLM models through a unified API compatible with OpenAI.
OpenRouterProvider Objects
@register_provider("openrouter", [
ProviderCapability.CHAT,
ProviderCapability.COMPLETION,
ProviderCapability.TOOLS,
ProviderCapability.STREAMING
])
class OpenRouterProvider(OpenAICompatibleProvider)
OpenRouter provider implementation using OpenAI-compatible API.
get_additional_headers
def get_additional_headers(config: Dict[str, Any]) -> Dict[str, str]
Get OpenRouter-specific headers.
get_metadata
def get_metadata() -> ProviderMetadata
Get OpenRouter provider metadata.
Module spoon_ai.llm.providers
LLM Provider implementations.
Module spoon_ai.llm.manager
LLM Manager - Central orchestrator for managing providers, fallback, and load balancing.
ProviderState Objects
@dataclass
class ProviderState()
Track provider initialization and health state.
can_retry_initialization
def can_retry_initialization() -> bool
Check if provider initialization can be retried.
record_initialization_failure
def record_initialization_failure(error: Exception) -> None
Record initialization failure with exponential backoff.
record_initialization_success
def record_initialization_success() -> None
Record successful initialization.
FallbackStrategy Objects
class FallbackStrategy()
Handles fallback logic between providers.
execute_with_fallback
async def execute_with_fallback(providers: List[str], operation, *args,
**kwargs) -> LLMResponse
Execute operation with fallback chain.
Arguments:
providers- List of provider names in fallback orderoperation- Async operation to execute *args, **kwargs: Arguments for the operation
Returns:
LLMResponse- Response from successful provider
Raises:
ProviderError- If all providers fail
LoadBalancer Objects
class LoadBalancer()
Handles load balancing between multiple provider instances.
select_provider
def select_provider(providers: List[str],
strategy: str = "round_robin") -> str
Select a provider based on load balancing strategy.
Arguments:
providers- List of available providersstrategy- Load balancing strategy ('round_robin', 'weighted', 'random')
Returns:
str- Selected provider name
update_provider_health
def update_provider_health(provider: str, is_healthy: bool) -> None
Update provider health status.
set_provider_weight
def set_provider_weight(provider: str, weight: float) -> None
Set provider weight for weighted load balancing.
LLMManager Objects
class LLMManager()
Central orchestrator for LLM providers with fallback and load balancing.
__init__
def __init__(config_manager: Optional[ConfigurationManager] = None,
debug_logger: Optional[DebugLogger] = None,
metrics_collector: Optional[MetricsCollector] = None,
response_normalizer: Optional[ResponseNormalizer] = None,
registry: Optional[LLMProviderRegistry] = None)
Initialize LLM Manager with enhanced provider state tracking.
cleanup
async def cleanup() -> None
Enhanced cleanup with proper resource management.
get_provider_status
def get_provider_status() -> Dict[str, Dict[str, Any]]
Get detailed status of all providers.
reset_provider
async def reset_provider(provider_name: str) -> bool
Reset a provider's state and force reinitialization.
Arguments:
provider_name- Name of provider to reset
Returns:
bool- True if reset successful
chat
async def chat(messages: List[Message],
provider: Optional[str] = None,
**kwargs) -> LLMResponse
Send chat request with automatic provider selection and fallback.
Arguments:
messages- List of conversation messagesprovider- Specific provider to use (optional)**kwargs- Additional parameters
Returns:
LLMResponse- Normalized response
chat_stream
async def chat_stream(messages: List[Message],
provider: Optional[str] = None,
callbacks: Optional[List[BaseCallbackHandler]] = None,
**kwargs) -> AsyncGenerator[LLMResponseChunk, None]
Send streaming chat request with callback support.
Arguments:
messages- List of conversation messagesprovider- Specific provider to use (optional)callbacks- Optional callback handlers for monitoring**kwargs- Additional parameters
Yields:
LLMResponseChunk- Structured streaming response chunks
completion
async def completion(prompt: str,
provider: Optional[str] = None,
**kwargs) -> LLMResponse
Send completion request.
Arguments:
prompt- Text promptprovider- Specific provider to use (optional)**kwargs- Additional parameters
Returns:
LLMResponse- Normalized response
chat_with_tools
async def chat_with_tools(messages: List[Message],
tools: List[Dict],
provider: Optional[str] = None,
**kwargs) -> LLMResponse
Send tool-enabled chat request.
Arguments:
messages- List of conversation messagestools- List of available toolsprovider- Specific provider to use (optional)**kwargs- Additional parameters
Returns:
LLMResponse- Normalized response
set_fallback_chain
def set_fallback_chain(providers: List[str]) -> None
Set fallback provider chain.
Arguments:
providers- List of provider names in fallback order
enable_load_balancing
def enable_load_balancing(strategy: str = "round_robin") -> None
Enable load balancing with specified strategy.
Arguments:
strategy- Load balancing strategy ('round_robin', 'weighted', 'random')
disable_load_balancing
def disable_load_balancing() -> None
Disable load balancing.
health_check_all
async def health_check_all() -> Dict[str, bool]
Check health of all registered providers.
Returns:
Dict[str, bool]: Provider health status
get_stats
def get_stats() -> Dict[str, Any]
Get comprehensive statistics.
Returns:
Dict[str, Any]: Manager and provider statistics
get_llm_manager
def get_llm_manager() -> LLMManager
Get global LLM manager instance.
Returns:
LLMManager- Global manager instance
set_llm_manager
def set_llm_manager(manager: LLMManager) -> None
Set global LLM manager instance.
Arguments:
manager- Manager instance to set as global
Module spoon_ai.llm.registry
LLM Provider Registry for dynamic provider registration and discovery.
LLMProviderRegistry Objects
class LLMProviderRegistry()
Registry for managing LLM provider classes and instances.
register
def register(name: str, provider_class: Type[LLMProviderInterface]) -> None
Register a provider class.
Arguments:
name- Unique provider nameprovider_class- Provider class implementing LLMProviderInterface
Raises:
ConfigurationError- If provider name already exists or class is invalid
get_provider
def get_provider(
name: str,
config: Optional[Dict[str, Any]] = None) -> LLMProviderInterface
Get or create provider instance.
Arguments:
name- Provider nameconfig- Provider configuration (optional if already configured)
Returns:
LLMProviderInterface- Provider instance
Raises:
ConfigurationError- If provider not found or configuration invalid
list_providers
def list_providers() -> List[str]
List all registered provider names.
Returns:
List[str]- List of provider names
get_capabilities
def get_capabilities(name: str) -> List[ProviderCapability]
Get provider capabilities.
Arguments:
name- Provider name
Returns:
List[ProviderCapability]- List of supported capabilities
Raises:
ConfigurationError- If provider not found
is_registered
def is_registered(name: str) -> bool
Check if a provider is registered.
Arguments:
name- Provider name
Returns:
bool- True if provider is registered
unregister
def unregister(name: str) -> None
Unregister a provider.
Arguments:
name- Provider name
clear
def clear() -> None
Clear all registered providers and instances.
register_provider
def register_provider(name: str,
capabilities: Optional[List[ProviderCapability]] = None)
Decorator for automatic provider registration.
Arguments:
name- Provider namecapabilities- List of supported capabilities (optional)
Returns:
Decorator function
get_global_registry
def get_global_registry() -> LLMProviderRegistry
Get the global provider registry instance.
Returns:
LLMProviderRegistry- Global registry instance
Module spoon_ai.llm.config
Configuration management for LLM providers using environment variables.
ProviderConfig Objects
@dataclass
class ProviderConfig()
Configuration for a specific LLM provider.
__post_init__
def __post_init__()
Validate configuration after initialization.
model_dump
def model_dump() -> Dict[str, Any]
Convert the configuration to a dictionary.
Returns:
Dict[str, Any]: Configuration as dictionary
ConfigurationManager Objects
class ConfigurationManager()
Manages environment-driven configuration for LLM providers.
__init__
def __init__() -> None
Initialize configuration manager and load environment variables.
load_provider_config
def load_provider_config(provider_name: str) -> ProviderConfig
Load and validate provider configuration.
Arguments:
provider_name- Name of the provider
Returns:
ProviderConfig- Validated provider configuration
Raises:
ConfigurationError- If configuration is invalid or missing
validate_config
def validate_config(config: ProviderConfig) -> bool
Validate provider configuration.
Arguments:
config- Provider configuration to validate
Returns:
bool- True if configuration is valid
Raises:
ConfigurationError- If configuration is invalid
get_default_provider
def get_default_provider() -> str
Get default provider from configuration with intelligent selection.
Returns:
str- Default provider name
get_fallback_chain
def get_fallback_chain() -> List[str]
Get fallback chain from configuration.
Returns:
List[str]- List of provider names in fallback order
list_configured_providers
def list_configured_providers() -> List[str]
List all configured providers.
Returns:
List[str]- List of provider names that have configuration
get_available_providers_by_priority
def get_available_providers_by_priority() -> List[str]
Get available providers ordered by priority and quality.
Returns:
List[str]- List of available provider names in priority order
get_provider_info
def get_provider_info() -> Dict[str, Dict[str, Any]]
Get information about all providers and their availability.
Returns:
Dict[str, Dict[str, Any]]: Provider information including availability
reload_config
def reload_config() -> None
Reload configuration from file.
Module spoon_ai.llm
Unified LLM infrastructure package.
This package provides a unified interface for working with different LLM providers, including comprehensive configuration management, monitoring, and error handling.
Module spoon_ai.llm.errors
Standardized error hierarchy for LLM operations.
LLMError Objects
class LLMError(Exception)
Base exception for all LLM-related errors.
ProviderError Objects
class ProviderError(LLMError)
Provider-specific error with detailed context.
ConfigurationError Objects
class ConfigurationError(LLMError)
Configuration validation or loading error.
RateLimitError Objects
class RateLimitError(ProviderError)
Rate limit exceeded error.
AuthenticationError Objects
class AuthenticationError(ProviderError)
Authentication failed error.
ModelNotFoundError Objects
class ModelNotFoundError(ProviderError)
Model not found or not available error.
TokenLimitError Objects
class TokenLimitError(ProviderError)
Token limit exceeded error.
NetworkError Objects
class NetworkError(ProviderError)
Network connectivity or timeout error.
ProviderUnavailableError Objects
class ProviderUnavailableError(ProviderError)
Provider service unavailable error.
ValidationError Objects
class ValidationError(LLMError)
Input validation error.
Module spoon_ai.llm.base
LLMBase Objects
class LLMBase(ABC)
Base abstract class for LLM, defining interfaces that all LLM providers must implement
__init__
def __init__(config_path: str = "config/config.toml",
config_name: str = "llm")
Initialize LLM interface
Arguments:
config_path- Configuration file pathconfig_name- Configuration name
chat
@abstractmethod
async def chat(messages: List[Message],
system_msgs: Optional[List[Message]] = None,
**kwargs) -> LLMResponse
Send chat request to LLM and get response
Arguments:
messages- List of messagessystem_msgs- List of system messages**kwargs- Other parameters
Returns:
LLMResponse- LLM response
completion
@abstractmethod
async def completion(prompt: str, **kwargs) -> LLMResponse
Send text completion request to LLM and get response
Arguments:
prompt- Prompt text**kwargs- Other parameters
Returns:
LLMResponse- LLM response
chat_with_tools
@abstractmethod
async def chat_with_tools(messages: List[Message],
system_msgs: Optional[List[Message]] = None,
tools: Optional[List[Dict]] = None,
tool_choice: Literal["none", "auto",
"required"] = "auto",
**kwargs) -> LLMResponse
Send chat request that may contain tool calls to LLM and get response
Arguments:
messages- List of messagessystem_msgs- List of system messagestools- List of toolstool_choice- Tool selection mode**kwargs- Other parameters
Returns:
LLMResponse- LLM response
generate_image
async def generate_image(prompt: str, **kwargs) -> Union[str, List[str]]
Generate image (optional implementation)
Arguments:
prompt- Prompt text**kwargs- Other parameters
Returns:
Union[str, List[str]]: Image URL or list of URLs
reset_output_handler
def reset_output_handler()
Reset output handler
Module spoon_ai.utils.utils
Module spoon_ai.utils.config_manager
ConfigManager Objects
class ConfigManager()
Environment-based configuration helper for core usage.
__init__
def __init__() -> None
Initialize manager with environment-backed cache.
refresh
def refresh() -> None
Reload configuration snapshot from environment variables.
get
def get(key: str, default: Any = None) -> Any
Get configuration item from environment snapshot.
set
def set(key: str, value: Any) -> None
Set configuration item by exporting to environment variables.
list_config
def list_config() -> Dict[str, Any]
List configuration snapshot without persisting secrets.
get_api_key
def get_api_key(provider: str) -> Optional[str]
Get API key for specified provider with environment priority.
set_api_key
def set_api_key(provider: str, api_key: str) -> None
Set API key by exporting to environment variables.
get_model_name
def get_model_name() -> Optional[str]
Get model name override from environment.
get_base_url
def get_base_url() -> Optional[str]
Get base URL override from environment.
get_llm_provider
def get_llm_provider() -> Optional[str]
Determine LLM provider from environment variables.
Module spoon_ai.utils.config
Module spoon_ai.utils
Module spoon_ai.utils.streaming
StreamOutcome Objects
@dataclass
class StreamOutcome()
Accumulator for streaming output state.
Module spoon_ai.runnables.events
StreamEventBuilder Objects
class StreamEventBuilder()
chain_start
@staticmethod
def chain_start(run_id: UUID, name: str, inputs: Any,
**kwargs: Any) -> StreamEvent
Build chain start event.
chain_stream
@staticmethod
def chain_stream(run_id: UUID, name: str, chunk: Any,
**kwargs: Any) -> StreamEvent
Build chain stream event.
chain_end
@staticmethod
def chain_end(run_id: UUID, name: str, output: Any,
**kwargs: Any) -> StreamEvent
Build chain end event.
chain_error
@staticmethod
def chain_error(run_id: UUID, name: str, error: Exception,
**kwargs: Any) -> StreamEvent
Build chain error event.
llm_stream
@staticmethod
def llm_stream(run_id: UUID,
name: str,
token: str,
chunk: Optional[Any] = None,
**kwargs: Any) -> StreamEvent
Build LLM stream event.
Module spoon_ai.runnables
Runnable interface and utilities for composable AI components.
This module provides the foundational Runnable interface that all Spoon AI components implement, enabling streaming, composition, and standardized execution.
Module spoon_ai.runnables.base
log_patches_from_events
async def log_patches_from_events(
event_iter: AsyncIterator[Dict[str, Any]],
*,
diff: bool = True) -> AsyncIterator[RunLogPatch]
Convert a stream of events into run log patches.
Runnable Objects
class Runnable(ABC, Generic[Input, Output])
astream_log
async def astream_log(input: Input,
config: Optional[RunnableConfig] = None,
*,
diff: bool = True) -> AsyncIterator[RunLogPatch]
Asynchronously stream structured log patches derived from execution events.
astream_events
async def astream_events(
input: Input,
config: Optional[RunnableConfig] = None
) -> AsyncIterator[Dict[str, Any]]
Asynchronously stream structured execution events.
Module spoon_ai.payments.server
create_paywalled_router
def create_paywalled_router(
service: Optional[X402PaymentService] = None,
agent_factory: AgentFactory = _default_agent_factory,
payment_message: str = "Payment required to invoke this agent."
) -> APIRouter
Build a FastAPI router that protects agent invocations behind an x402 paywall.
Arguments:
service- Optional pre-configured payment service.agent_factory- Coroutine that returns an initialized agent given its name.payment_message- Message displayed when payment is required.
Returns:
APIRouter- Router with/invoke/{agent_name}endpoint ready to mount.
Module spoon_ai.payments.cli
Module spoon_ai.payments.facilitator_client
X402FacilitatorClient Objects
class X402FacilitatorClient()
Thin wrapper over the upstream facilitator client with async header hooks.
Module spoon_ai.payments.exceptions
X402PaymentError Objects
class X402PaymentError(Exception)
Base exception for x402 payment operations.
X402ConfigurationError Objects
class X402ConfigurationError(X402PaymentError)
Raised when integration configuration is invalid or incomplete.
X402VerificationError Objects
class X402VerificationError(X402PaymentError)
Raised when a payment header fails verification against the facilitator.
X402SettlementError Objects
class X402SettlementError(X402PaymentError)
Raised when settlement fails or returns an error response.
Module spoon_ai.payments.x402_service
X402PaymentService Objects
class X402PaymentService()
High level service that aligns the x402 SDK with SpoonOS conventions.
discover_resources
async def discover_resources(
*,
resource_type: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None) -> ListDiscoveryResourcesResponse
Query the facilitator discovery endpoint for registered paywalled resources.
render_paywall_html
def render_paywall_html(error: str,
request: Optional[X402PaymentRequest] = None,
headers: Optional[Dict[str, Any]] = None) -> str
Render the embedded paywall HTML with payment requirements.
build_payment_header
def build_payment_header(requirements: PaymentRequirements,
*,
max_value: Optional[int] = None) -> str
Create a signed X-PAYMENT header for outbound requests.
decode_payment_response
def decode_payment_response(header_value: str) -> X402PaymentReceipt
Decode an X-PAYMENT-RESPONSE header into a structured receipt.
Module spoon_ai.payments.config
X402ConfigurationError Objects
class X402ConfigurationError(Exception)
Raised when required x402 configuration is missing or invalid.
X402PaywallBranding Objects
class X402PaywallBranding(BaseModel)
Optional branding customisations for the embedded paywall template.
X402ClientConfig Objects
class X402ClientConfig(BaseModel)
Holds client-side signing configuration used for outbound payments.
X402Settings Objects
class X402Settings(BaseModel)
Resolved configuration view for x402 payments inside SpoonOS.
amount_in_atomic_units
@property
def amount_in_atomic_units() -> str
Return the configured maximum amount encoded as atomic units (string).
build_asset_extra
def build_asset_extra() -> Dict[str, Any]
Construct the extra payload for the payment requirements.
load
@classmethod
def load(cls,
config_manager: Optional[ConfigManager] = None) -> "X402Settings"
Load settings from config.json with .env fallbacks.
Module spoon_ai.payments.app
Module spoon_ai.payments
Payment utilities for integrating the SpoonOS core with the x402 payments protocol.
This package wraps the upstream x402 Python SDK with configuration and service
abstractions that align to SpoonOS conventions (config.json priority, .env overrides,
and async-friendly helper utilities).
Module spoon_ai.payments.models
X402PaymentRequest Objects
class X402PaymentRequest(BaseModel)
Describes a payment requirement that should be issued for a resource.
X402VerifyResult Objects
class X402VerifyResult(BaseModel)
Captures the facilitator verification response.
X402SettleResult Objects
class X402SettleResult(BaseModel)
Captures settlement details.
X402PaymentOutcome Objects
class X402PaymentOutcome(BaseModel)
Aggregates verification and settlement outcomes.
X402PaymentReceipt Objects
class X402PaymentReceipt(BaseModel)
Decoded representation of the X-PAYMENT-RESPONSE header.
Module spoon_ai.chat
ShortTermMemoryConfig Objects
class ShortTermMemoryConfig(BaseModel)
Configuration for short-term memory management.
enabled
Enable automatic short-term memory management.
max_tokens
Maximum token count before triggering trimming/summarization.
strategy
Strategy to use when exceeding max_tokens: 'summarize' or 'trim'.
messages_to_keep
Number of recent messages to keep when summarizing.
trim_strategy
Trimming strategy when using 'trim' mode.
keep_system_messages
Always keep system messages during trimming.
auto_checkpoint
Automatically save checkpoints before trimming/summarization.
checkpoint_thread_id
Thread ID for checkpoint management.
summary_model
Model to use for summarization (defaults to ChatBot's model).
ChatBot Objects
class ChatBot()
__init__
def __init__(use_llm_manager: bool = True,
model_name: str = None,
llm_provider: str = None,
api_key: str = None,
base_url: str = None,
enable_short_term_memory: bool = True,
short_term_memory_config: Optional[Union[Dict[
str, Any], ShortTermMemoryConfig]] = None,
token_counter: Optional[MessageTokenCounter] = None,
enable_long_term_memory: bool = False,
mem0_config: Optional[Dict[str, Any]] = None,
callbacks: Optional[List[BaseCallbackHandler]] = None,
**kwargs)
Initialize ChatBot with hierarchical configuration priority system.
Configuration Priority System:
- Full manual override (highest priority) - all params provided
- Partial override with config fallback - llm_provider provided, credentials pulled from environment (or config files if explicitly enabled)
- Full environment-based loading - only use_llm_manager=True, reads from environment variables
Arguments:
use_llm_manager- Enable LLM manager architecture (default: True)model_name- Model name overridellm_provider- Provider name overrideapi_key- API key overridebase_url- Base URL overrideenable_short_term_memory- Enable short-term memory management (default: True)short_term_memory_config- Configuration dict or ShortTermMemoryConfig instancetoken_counter- Optional custom token counter instanceenable_long_term_memory- Enable Mem0-backed long-term memory retrieval/storagemem0_config- Configuration dict for Mem0 (api_key, user_id/agent_id, collection, etc.)callbacks- Optional list of callback handlers for monitoring**kwargs- Additional parameters
update_mem0_config
def update_mem0_config(config: Optional[Dict[str, Any]] = None,
enable: Optional[bool] = None) -> None
Update Mem0 configuration and re-initialize the client if needed.
ask
async def ask(messages: List[Union[dict, Message]],
system_msg: Optional[str] = None,
output_queue: Optional[asyncio.Queue] = None) -> str
Ask method using the LLM manager architecture.
Automatically applies short-term memory strategy if enabled.
ask_tool
async def ask_tool(messages: List[Union[dict, Message]],
system_msg: Optional[str] = None,
tools: Optional[List[dict]] = None,
tool_choice: Optional[str] = None,
output_queue: Optional[asyncio.Queue] = None,
**kwargs) -> LLMResponse
Ask tool method using the LLM manager architecture.
Automatically applies short-term memory strategy if enabled.
trim_messages
async def trim_messages(messages: List[Message],
max_tokens: int,
strategy: TrimStrategy = TrimStrategy.FROM_END,
keep_system: bool = True,
model: Optional[str] = None) -> List[Message]
Trim messages to stay within the token budget.
Arguments:
messages- List of messages to trimmax_tokens- Maximum token count to retainstrategy- Trimming strategy (from_start or from_end)keep_system- Whether to always keep the leading system messagemodel- Model name for token counting
Returns:
List[Message]- Trimmed messages list
remove_message
def remove_message(message_id: str, **kwargs: Any) -> "RemoveMessage"
Construct a removal instruction for the message with the given ID.
remove_all_messages
def remove_all_messages() -> "RemoveMessage"
Construct a removal instruction that clears the entire history.
summarize_messages
async def summarize_messages(
messages: List[Message],
max_tokens_before_summary: int,
messages_to_keep: int = 5,
summary_model: Optional[str] = None,
existing_summary: str = ""
) -> Tuple[List[Message], List[RemoveMessage], Optional[str]]
Summarize earlier messages and emit removal directives.
Returns a tuple (messages_for_llm, removals, summary_text) where
messages_for_llm are the messages that should be sent to the language
model for the next turn, removals contains RemoveMessage
directives that should be applied to the stored history, and
summary_text is the newly generated summary (if any).
Arguments:
messages- List of messages to processmax_tokens_before_summary- Token threshold for triggering summarymessages_to_keep- Number of recent messages to keep uncompressedsummary_model- Model to use for summarizationexisting_summary- Previously stored summary text
latest_summary
@property
def latest_summary() -> Optional[str]
Return the most recent summary generated by short-term memory.
latest_removals
@property
def latest_removals() -> List[RemoveMessage]
Return the most recent removal directives emitted by summarization.
save_checkpoint
def save_checkpoint(thread_id: str,
messages: List[Message],
metadata: Optional[dict] = None) -> str
Save current message state to checkpoint.
Arguments:
thread_id- Thread identifiermessages- Messages to savemetadata- Optional metadata to store
Returns:
str- Checkpoint ID
restore_checkpoint
def restore_checkpoint(
thread_id: str,
checkpoint_id: Optional[str] = None) -> Optional[List[Message]]
Restore messages from checkpoint.
Arguments:
thread_id- Thread identifiercheckpoint_id- Optional specific checkpoint ID
Returns:
Optional[List[Message]]- Restored messages, or None if checkpoint not found
list_checkpoints
def list_checkpoints(thread_id: str) -> List[dict]
List all checkpoints for a thread.
Arguments:
thread_id- Thread identifier
Returns:
List[dict]- List of checkpoint metadata
clear_checkpoints
def clear_checkpoints(thread_id: str) -> None
Clear all checkpoints for a thread.
Arguments:
thread_id- Thread identifier
astream
async def astream(messages: List[Union[dict, Message]],
system_msg: Optional[str] = None,
callbacks: Optional[List[BaseCallbackHandler]] = None,
**kwargs: Any) -> AsyncIterator[LLMResponseChunk]
Stream LLM responses chunk by chunk.
astream_events
async def astream_events(messages: List[Union[dict, Message]],
system_msg: Optional[str] = None,
callbacks: Optional[List[BaseCallbackHandler]] = None,
**kwargs) -> AsyncIterator[dict]
Stream structured events during LLM execution.
This method yields detailed events tracking the execution flow, useful for monitoring and debugging.
Arguments:
messages- List of messages or dictssystem_msg- Optional system messagecallbacks- Optional callback handlers**kwargs- Additional provider parameters
Yields:
Event dictionaries with structure: {
"event"- event_type,"run_id"- str,"timestamp"- ISO datetime string,"data"- {event-specific data} }
astream_log
async def astream_log(messages: List[Union[dict, Message]],
system_msg: Optional[str] = None,
callbacks: Optional[List[BaseCallbackHandler]] = None,
*,
diff: bool = True,
**kwargs: Any) -> AsyncIterator[RunLogPatch]
Stream run log patches describing ChatBot execution.
Module spoon_ai.identity.did_models
DID Data Models for SpoonOS Agents Following W3C DID Core specification and ERC-8004 standard
VerificationMethodType Objects
class VerificationMethodType(str, Enum)
Supported verification method types
ServiceType Objects
class ServiceType(str, Enum)
Agent service endpoint types
VerificationMethod Objects
class VerificationMethod(BaseModel)
Cryptographic verification method for DID authentication
ServiceEndpoint Objects
class ServiceEndpoint(BaseModel)
Service endpoint for agent interaction
ReputationScore Objects
class ReputationScore(BaseModel)
Aggregated reputation score
Attestation Objects
class Attestation(BaseModel)
Verifiable attestation about an agent
AgentCard Objects
class AgentCard(BaseModel)
Agent Card following Google's A2A protocol Provides human-readable agent information
AgentDID Objects
class AgentDID(BaseModel)
Complete W3C DID Document for SpoonOS Agent
to_did_document
def to_did_document() -> Dict[str, Any]
Export as standard W3C DID Document
to_agent_card
def to_agent_card() -> Dict[str, Any]
Export agent card separately
DIDResolutionResult Objects
class DIDResolutionResult(BaseModel)
Result of DID resolution
Module spoon_ai.identity.attestation
Attestation and Trust Score Management Handles verifiable credentials and reputation calculations
AttestationManager Objects
class AttestationManager()
Manages verifiable attestations for agents
create_attestation
def create_attestation(issuer_did: str,
subject_did: str,
claim: Dict,
evidence: Optional[str] = None) -> Attestation
Create a verifiable attestation
Arguments:
issuer_did- DID of the attestation issuersubject_did- DID of the agent being attestedclaim- Attestation claim dataevidence- Optional supporting evidence
Returns:
Signed Attestation object
verify_attestation
def verify_attestation(attestation: Attestation) -> bool
Verify attestation signature
submit_reputation_on_chain
def submit_reputation_on_chain(subject_did: str, score: int,
evidence: str) -> str
Submit reputation score to on-chain registry
Arguments:
subject_did- DID of agent being ratedscore- Score between -100 and 100evidence- Evidence for the score
Returns:
Transaction hash
submit_validation_on_chain
def submit_validation_on_chain(subject_did: str, is_valid: bool,
reason: str) -> str
Submit validation for an agent
Arguments:
subject_did- DID of agent being validatedis_valid- Whether agent is validreason- Reason for validation decision
Returns:
Transaction hash
TrustScoreCalculator Objects
class TrustScoreCalculator()
Calculates trust scores for agents
calculate_trust_score
def calculate_trust_score(did: str) -> Dict
Calculate comprehensive trust score
Returns:
Dict with trust score components:
- reputation_score: -100 to 100
- validation_status: bool
- trust_level: "high" | "medium" | "low" | "untrusted"
- confidence: 0 to 1
get_reputation_breakdown
def get_reputation_breakdown(did: str, limit: int = 10) -> List[Dict]
Get detailed reputation submissions
get_validation_breakdown
def get_validation_breakdown(did: str, limit: int = 10) -> List[Dict]
Get detailed validation submissions
Module spoon_ai.identity.storage_client
Storage clients for DID documents and agent cards Supports NeoFS (primary) and IPFS (backup replication)
DIDStorageClient Objects
class DIDStorageClient()
Unified storage client for DID documents NeoFS primary with IPFS replication
publish_did_document
def publish_did_document(agent_id: str, did_document: Dict,
agent_card: Dict) -> Tuple[str, str]
Publish DID document and agent card to storage Returns (didDocURI, agentCardURI)
fetch_did_document
def fetch_did_document(uri: str) -> Dict
Fetch DID document from URI (NeoFS or IPFS)
publish_credential
def publish_credential(agent_id: str, credential: Dict) -> str
Publish verifiable credential
close
def close()
Close HTTP clients
Module spoon_ai.identity.did_resolver
DID Resolver for SpoonOS Agents Implements unified DID resolution with NeoFS-first policy
DIDResolver Objects
class DIDResolver()
Unified DID resolver for SpoonOS agents Resolution flow: On-chain anchor → NeoFS (primary) → IPFS (fallback)
resolve
def resolve(did: str) -> DIDResolutionResult
Resolve DID to complete DID document
Arguments:
did- DID string (did🥄agent:<identifier>)
Returns:
DIDResolutionResult with document and metadata
resolve_metadata_only
def resolve_metadata_only(did: str) -> Dict
Resolve only on-chain metadata (fast path)
verify_did
def verify_did(did: str) -> bool
Verify DID exists and is resolvable
Module spoon_ai.identity.erc8004_abi
Shared ERC-8004 ABI fragments (minimal, artifact-free).
These ABIs cover the common calls used by the Python SDK and demos.
Module spoon_ai.identity
SpoonOS Agent DID Identity Module Implements ERC-8004 compliant decentralized identity for agents
Module spoon_ai.identity.erc8004_client
ERC-8004 Smart Contract Client Handles on-chain interactions with agent registries
ERC8004Client Objects
class ERC8004Client()
Client for interacting with ERC-8004 agent registries
calculate_did_hash
def calculate_did_hash(did: str) -> bytes
Calculate keccak256 hash of DID string
create_eip712_signature
def create_eip712_signature(did_hash: bytes, agent_card_uri: str,
did_doc_uri: str) -> str
Create EIP-712 signature for agent registration
register_agent
def register_agent(did: str, agent_card_uri: str, did_doc_uri: str) -> str
Register agent on-chain
resolve_agent
def resolve_agent(did: str) -> Dict
Resolve agent metadata from on-chain registry
update_capabilities
def update_capabilities(did: str, capabilities: List[str]) -> str
Update agent capabilities on-chain
build_feedback_auth
def build_feedback_auth(agent_id: Union[int, bytes],
client_address: str,
index_limit: int,
expiry: int,
signer_address: Optional[str] = None,
identity_registry: Optional[str] = None,
chain_id: Optional[int] = None) -> bytes
Build and sign feedbackAuth payload required by chaoschain ERC8004 reputation registry. Returns abi.encode(struct) ++ signature (65 bytes).
give_feedback
def give_feedback(did: str,
score: int,
tag1: bytes = b"",
tag2: bytes = b"",
fileuri: str = "",
filehash: bytes = b"\x00" * 32,
index_limit: int = 10,
expiry: Optional[int] = None,
client_address: Optional[str] = None) -> str
Submit feedback using ERC8004 giveFeedback with feedbackAuth.
get_reputation_summary
def get_reputation_summary(did: str,
client_addresses: Optional[List[str]] = None,
tag1: bytes = b"\x00" * 32,
tag2: bytes = b"\x00" * 32) -> Tuple[int, int]
Return (count, averageScore 0-100).
get_reputation
def get_reputation(did: str) -> Tuple[int, int]
Backward compatible: (averageScore, count).
validation_request
def validation_request(
did: str,
validator: str,
request_uri: str,
request_hash: Optional[bytes] = None) -> Tuple[str, bytes]
Create validation request; returns tx hash and requestHash used.
get_validation_status
def get_validation_status(request_hash: bytes) -> Dict
Get per-request validation status.
submit_validation
def submit_validation(did: str, is_valid: bool, reason: str = "") -> str
Map boolean into 0/100 scale response.
register_agent
def register_agent(token_uri: str,
metadata: Optional[List[Tuple[str, bytes]]] = None) -> int
Register agent on IdentityRegistry; returns agentId.
Module spoon_ai.bridge.eth_neofs_indexer
Ethereum to NeoFS/IPFS Event Indexer Listens to ERC-8004 registry events and ensures off-chain storage is synchronized
EthereumNeoFSIndexer Objects
class EthereumNeoFSIndexer()
Event indexer that syncs Ethereum registry events to NeoFS/IPFS Ensures content hash verification and storage consistency
register_event_handler
def register_event_handler(event_name: str, handler: Callable)
Register custom event handler
start_indexing
def start_indexing(block_limit: Optional[int] = None)
Start indexing events
Arguments:
block_limit- Optional block limit for testing (stops after N blocks)
stop_indexing
def stop_indexing()
Stop the indexer
get_indexer_status
def get_indexer_status() -> Dict
Get current indexer status
Module spoon_ai.bridge
Cross-chain bridge module for DID synchronization Ethereum ← → NeoFS/IPFS event indexing
Module spoon_ai.tools.turnkey_tools
Turnkey Tools - Secure Blockchain Operations
This module provides Turnkey SDK tools for secure blockchain operations including:
- Transaction signing and broadcasting
- Message and EIP-712 signing
- Multi-account management
- Activity audit and monitoring
- Wallet and account operations
TurnkeyBaseTool Objects
class TurnkeyBaseTool(BaseTool)
Base class for Turnkey tools with shared client initialization
client
@property
def client()
Lazy initialization of Turnkey client
SignEVMTransactionTool Objects
class SignEVMTransactionTool(TurnkeyBaseTool)
Sign EVM transaction using Turnkey
execute
async def execute(sign_with: str, unsigned_tx: str, **kwargs) -> str
Sign EVM transaction
SignMessageTool Objects
class SignMessageTool(TurnkeyBaseTool)
Sign arbitrary message using Turnkey
execute
async def execute(sign_with: str,
message: str,
use_keccak256: bool = True,
**kwargs) -> str
Sign message
SignTypedDataTool Objects
class SignTypedDataTool(TurnkeyBaseTool)
Sign EIP-712 structured data using Turnkey
execute
async def execute(sign_with: str, typed_data: dict, **kwargs) -> str
Sign EIP-712 typed data
BroadcastTransactionTool Objects
class BroadcastTransactionTool(TurnkeyBaseTool)
Broadcast signed transaction to blockchain
execute
async def execute(signed_tx: str, rpc_url: str = None, **kwargs) -> str
Broadcast transaction
ListWalletsTool Objects
class ListWalletsTool(TurnkeyBaseTool)
List all wallets in the organization
execute
async def execute(**kwargs) -> str
List wallets
ListWalletAccountsTool Objects
class ListWalletAccountsTool(TurnkeyBaseTool)
List accounts for a specific wallet
execute
async def execute(wallet_id: str,
limit: str = None,
before: str = None,
after: str = None,
**kwargs) -> str
List wallet accounts
GetActivityTool Objects
class GetActivityTool(TurnkeyBaseTool)
Get activity details by ID
execute
async def execute(activity_id: str, **kwargs) -> str
Get activity details
ListActivitiesTool Objects
class ListActivitiesTool(TurnkeyBaseTool)
List recent activities in the organization
execute
async def execute(limit: str = "10",
before: str = None,
after: str = None,
filter_by_status: list = None,
filter_by_type: list = None,
**kwargs) -> str
List activities
WhoAmITool Objects
class WhoAmITool(TurnkeyBaseTool)
Get organization information
execute
async def execute(**kwargs) -> str
Get organization info
BuildUnsignedEIP1559TxTool Objects
class BuildUnsignedEIP1559TxTool(BaseTool)
Build unsigned EIP-1559 transaction (supports NeoX)
execute
async def execute(from_addr: str,
to_addr: str = None,
value_wei: str = "0",
data_hex: str = "0x",
priority_gwei: str = "1",
max_fee_gwei: str = None,
gas_limit: str = None,
rpc_url: str = None,
**kwargs) -> str
Build unsigned transaction (auto-detects NeoX)
ListAllAccountsTool Objects
class ListAllAccountsTool(TurnkeyBaseTool)
List all accounts across all wallets in the organization
execute
async def execute(limit: str = "50", **kwargs) -> str
List all accounts across all wallets
BatchSignTransactionsTool Objects
class BatchSignTransactionsTool(TurnkeyBaseTool)
Batch sign transactions for multiple accounts
execute
async def execute(to_address: str,
value_wei: str,
data_hex: str = "0x",
max_accounts: str = "3",
enable_broadcast: bool = False,
rpc_url: str = None,
**kwargs) -> str
Batch sign transactions for multiple accounts
CreateWalletTool Objects
class CreateWalletTool(TurnkeyBaseTool)
Create a new wallet
execute
async def execute(wallet_name: str,
accounts_json: str = None,
mnemonic_length: str = "24",
**kwargs) -> str
Create a new wallet
GetWalletTool Objects
class GetWalletTool(TurnkeyBaseTool)
Get wallet information by wallet ID
execute
async def execute(wallet_id: str, **kwargs) -> str
Get wallet information
CreateWalletAccountsTool Objects
class CreateWalletAccountsTool(TurnkeyBaseTool)
Add accounts to an existing wallet
execute
async def execute(wallet_id: str, accounts_json: str, **kwargs) -> str
Add accounts to existing wallet
CompleteTransactionWorkflowTool Objects
class CompleteTransactionWorkflowTool(TurnkeyBaseTool)
Complete transaction workflow: build, sign, and optionally broadcast
execute
async def execute(sign_with: str,
to_address: str,
value_wei: str,
data_hex: str = "0x",
enable_broadcast: bool = False,
rpc_url: str = None,
**kwargs) -> str
Complete transaction workflow
get_turnkey_tools
def get_turnkey_tools() -> List[BaseTool]
Get all Turnkey tools
Module spoon_ai.tools.tool_manager
ToolManager Objects
class ToolManager()
reindex
def reindex() -> None
Rebuild the internal name->tool mapping. Useful if tools have been renamed dynamically.
Module spoon_ai.tools.neofs_tools
NeoFS Tools for spoon_ai framework
Simple wrappers around NeoFS client methods. Tools do NOT auto-create bearer tokens - Agent manages tokens. All parameters map directly to client method parameters.
get_shared_neofs_client
def get_shared_neofs_client() -> NeoFSClient
Get shared NeoFSClient instance for all NeoFS tools.
Returns the same client instance across all tool calls to ensure bearer token authentication works correctly.
CreateBearerTokenTool Objects
class CreateBearerTokenTool(BaseTool)
Create a bearer token for NeoFS operations
CreateContainerTool Objects
class CreateContainerTool(BaseTool)
Create a NeoFS container
UploadObjectTool Objects
class UploadObjectTool(BaseTool)
Upload object to container
DownloadObjectByIdTool Objects
class DownloadObjectByIdTool(BaseTool)
Download object by ID
GetObjectHeaderByIdTool Objects
class GetObjectHeaderByIdTool(BaseTool)
Get object header by ID
DownloadObjectByAttributeTool Objects
class DownloadObjectByAttributeTool(BaseTool)
Download object by attribute
GetObjectHeaderByAttributeTool Objects
class GetObjectHeaderByAttributeTool(BaseTool)
Get object header by attribute
DeleteObjectTool Objects
class DeleteObjectTool(BaseTool)
Delete an object
SearchObjectsTool Objects
class SearchObjectsTool(BaseTool)
Search objects in container
SetContainerEaclTool Objects
class SetContainerEaclTool(BaseTool)
Set eACL for container
GetContainerEaclTool Objects
class GetContainerEaclTool(BaseTool)
Get eACL for container
ListContainersTool Objects
class ListContainersTool(BaseTool)
List all containers
GetContainerInfoTool Objects
class GetContainerInfoTool(BaseTool)
Get container info
DeleteContainerTool Objects
class DeleteContainerTool(BaseTool)
Delete container
GetNetworkInfoTool Objects
class GetNetworkInfoTool(BaseTool)
Get network info
GetBalanceTool Objects
class GetBalanceTool(BaseTool)
Get balance for an address
Module spoon_ai.tools.rag_tools
Module spoon_ai.tools.x402_payment
X402PaymentHeaderTool Objects
class X402PaymentHeaderTool(BaseTool)
Create a signed X-PAYMENT header for a given resource.
X402PaywalledRequestTool Objects
class X402PaywalledRequestTool(BaseTool)
Fetch a paywalled resource, handling the x402 402 negotiation automatically.
Module spoon_ai.tools
Module spoon_ai.tools.mcp_tool
MCPTool Objects
class MCPTool(BaseTool, MCPClientMixin)
call_mcp_tool
async def call_mcp_tool(tool_name: str, **kwargs)
Override the mixin method to add tool-specific error handling.
list_available_tools
async def list_available_tools() -> list
List available tools from the MCP server.
Module spoon_ai.tools.base
ToolFailure Objects
class ToolFailure(Exception)
Exception to indicate a tool execution failure.
Module spoon_ai.graph.agent
GraphAgent implementation for the graph package.
Memory Objects
class Memory()
Memory implementation with persistent storage
clear
def clear()
Clear all messages and reset memory
add_message
def add_message(msg)
Add a message to memory
get_messages
def get_messages(limit: Optional[int] = None) -> List[Dict[str, Any]]
Get messages from memory
get_recent_messages
def get_recent_messages(hours: int = 24) -> List[Dict[str, Any]]
Get messages from the last N hours
search_messages
def search_messages(query: str, limit: int = 10) -> List[Dict[str, Any]]
Search messages containing the query
get_statistics
def get_statistics() -> Dict[str, Any]
Get memory statistics
set_metadata
def set_metadata(key: str, value: Any)
Set metadata
get_metadata
def get_metadata(key: str, default: Any = None) -> Any
Get metadata
MockMemory Objects
class MockMemory(Memory)
Alias for backward compatibility - now uses persistent memory
GraphAgent Objects
class GraphAgent()
search_memory
def search_memory(query: str, limit: int = 10) -> List[Dict[str, Any]]
Search memory for messages containing the query
get_recent_memory
def get_recent_memory(hours: int = 24) -> List[Dict[str, Any]]
Get recent messages from memory
get_memory_statistics
def get_memory_statistics() -> Dict[str, Any]
Get memory statistics
set_memory_metadata
def set_memory_metadata(key: str, value: Any)
Set memory metadata
get_memory_metadata
def get_memory_metadata(key: str, default: Any = None) -> Any
Get memory metadata
save_session
def save_session()
Manually save current session
load_session
def load_session(session_id: str)
Load a specific session
Module spoon_ai.graph.types
Typed structures for the graph package.
Module spoon_ai.graph.checkpointer
In-memory checkpointer for the graph package.
InMemoryCheckpointer Objects
class InMemoryCheckpointer()
iter_checkpoint_history
def iter_checkpoint_history(
config: Dict[str, Any]) -> Iterable[CheckpointTuple]
Return checkpoint tuples for the specified thread, newest last.
Module spoon_ai.graph.builder
Declarative builders and helpers for SpoonAI graphs.
Intent Objects
@dataclass
class Intent()
Result of intent analysis.
IntentAnalyzer Objects
class IntentAnalyzer()
LLM-powered intent analyzer.
Core stays generic; concrete prompts/parsers are supplied by callers.
AdaptiveStateBuilder Objects
class AdaptiveStateBuilder()
Construct initial graph state using query intent and optional parameters.
ParameterInferenceEngine Objects
class ParameterInferenceEngine()
LLM delegator for parameter extraction.
Core keeps this generic; applications provide formatting/parsing via options.
NodeSpec Objects
@dataclass
class NodeSpec()
Declarative node specification.
EdgeSpec Objects
@dataclass
class EdgeSpec()
Declarative edge specification.
end
target name or callable router
ParallelGroupSpec Objects
@dataclass
class ParallelGroupSpec()
Parallel group specification.
GraphTemplate Objects
@dataclass
class GraphTemplate()
Complete declarative template for a graph.
DeclarativeGraphBuilder Objects
class DeclarativeGraphBuilder()
Build StateGraph instances from declarative templates.
NodePlugin Objects
class NodePlugin()
Pluggable node provider.
NodePluginSystem Objects
class NodePluginSystem()
Registry and discovery for node plugins.
HighLevelGraphAPI Objects
class HighLevelGraphAPI()
Convenience facade for building graphs per query.
Module spoon_ai.graph.mcp_integration
Utility classes for intelligent MCP tool discovery and configuration.
Core graph components no longer hard-code external tools; instead, user code registers tool specifications and optional transport/configuration details via these helpers.
MCPToolSpec Objects
@dataclass
class MCPToolSpec()
Specification describing a desired MCP tool.
MCPConfigManager Objects
class MCPConfigManager()
Centralised configuration loader for MCP tools.
MCPToolDiscoveryEngine Objects
class MCPToolDiscoveryEngine()
Discover MCP tools based on registered intent mappings.
MCPIntegrationManager Objects
class MCPIntegrationManager()
High level coordinator for MCP tool usage within graphs.
Module spoon_ai.graph.exceptions
Graph engine exception definitions (public within graph package).
Module spoon_ai.graph.reducers
Reducers and validators for the graph package.
Module spoon_ai.graph.decorators
Decorators and executor for the graph package.
Module spoon_ai.graph.config
Configuration primitives for the SpoonAI graph engine.
RouterConfig Objects
@dataclass
class RouterConfig()
Controls how the graph chooses the next node after each execution step.
ParallelRetryPolicy Objects
@dataclass
class ParallelRetryPolicy()
Retry policy for individual nodes inside a parallel group.
ParallelGroupConfig Objects
@dataclass
class ParallelGroupConfig()
Controls how a parallel group executes and aggregates results.
quorum
floats in (0, 1] treated as ratio, ints as absolute
error_strategy
fail_fast, collect_errors, ignore_errors
GraphConfig Objects
@dataclass
class GraphConfig()
Top-level configuration applied to an entire graph instance.
Module spoon_ai.graph.engine
Graph engine: StateGraph, CompiledGraph, and interrupt API implementation.
BaseNode Objects
class BaseNode(ABC, Generic[State])
Base class for all graph nodes
__call__
@abstractmethod
async def __call__(state: State,
config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]
Execute the node logic
RunnableNode Objects
class RunnableNode(BaseNode[State])
Runnable node that wraps a function
__call__
async def __call__(state: State,
config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]
Execute the wrapped function
ToolNode Objects
class ToolNode(BaseNode[State])
Tool node for executing tools
__call__
async def __call__(state: State,
config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]
Execute tools based on state
ConditionNode Objects
class ConditionNode(BaseNode[State])
Conditional node for routing decisions
__call__
async def __call__(state: State,
config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]
Execute condition and return routing decision
interrupt
def interrupt(data: Dict[str, Any]) -> Any
Interrupt execution and wait for human input.
RouteRule Objects
class RouteRule()
Advanced routing rule for automatic path selection
matches
def matches(state: Dict[str, Any], query: str = "") -> bool
Check if this rule matches the current state/query
RunningSummary Objects
@dataclass
class RunningSummary()
Rolling conversation summary used by the summarisation node.
SummarizationNode Objects
class SummarizationNode(BaseNode[Dict[str, Any]])
Node that summarises conversation history before model invocation.
StateGraph Objects
class StateGraph(Generic[State])
add_node
def add_node(
node_name: str, node: Union[BaseNode[State],
Callable[[State], Any]]) -> "StateGraph"
Add a node to the graph
add_edge
def add_edge(
start_node: str,
end_node: str,
condition: Optional[Callable[[State], bool]] = None) -> "StateGraph"
Add an edge. When condition is provided, edge becomes conditional.
add_conditional_edges
def add_conditional_edges(start_node: str, condition: Callable[[State], str],
path_map: Dict[str, str]) -> "StateGraph"
Add conditional edges
set_entry_point
def set_entry_point(node_name: str) -> "StateGraph"
Set the entry point
add_tool_node
def add_tool_node(tools: List[Any], name: str = "tools") -> "StateGraph"
Add a tool node
add_conditional_node
def add_conditional_node(condition_func: Callable[[State], str],
name: str = "condition") -> "StateGraph"
Add a conditional node
add_parallel_group
def add_parallel_group(
group_name: str,
nodes: List[str],
config: Optional[Union[Dict[str, Any], ParallelGroupConfig]] = None
) -> "StateGraph"
Add a parallel execution group
add_routing_rule
def add_routing_rule(source_node: str,
condition: Union[str, Callable[[State, str], bool]],
target_node: str,
priority: int = 0) -> "StateGraph"
Add an intelligent routing rule
get_state
def get_state(
config: Optional[Dict[str, Any]] = None) -> Optional[StateSnapshot]
Fetch the latest (or specified) checkpoint snapshot for a thread.
get_state_history
def get_state_history(
config: Optional[Dict[str, Any]] = None) -> Iterable[StateSnapshot]
Return all checkpoints for the given thread, ordered by creation time.
add_pattern_routing
def add_pattern_routing(source_node: str,
pattern: str,
target_node: str,
priority: int = 0) -> "StateGraph"
Add pattern-based routing rule
set_intelligent_router
def set_intelligent_router(
router_func: Callable[[Dict[str, Any], str], str]) -> "StateGraph"
Set the intelligent router function
set_llm_router
def set_llm_router(router_func: Optional[Callable[[Dict[str, Any], str],
str]] = None,
config: Optional[Dict[str, Any]] = None) -> "StateGraph"
Set the LLM-powered router function
Arguments:
router_func- Custom LLM router function. If None, uses default LLM router.config- Configuration for LLM router (model, temperature, max_tokens, etc.)
enable_llm_routing
def enable_llm_routing(
config: Optional[Dict[str, Any]] = None) -> "StateGraph"
Enable LLM-powered natural language routing
This automatically sets up LLM routing for the graph entry point.
compile
def compile(checkpointer: Optional[Any] = None) -> "CompiledGraph"
Compile the graph
get_graph
def get_graph() -> Dict[str, Any]
Get graph structure for visualization/debugging
CompiledGraph Objects
class CompiledGraph(Generic[State])
Compiled graph for execution
get_execution_metrics
def get_execution_metrics() -> Dict[str, Any]
Get aggregated execution metrics
Module spoon_ai.neofs.utils
SignatureError Objects
class SignatureError(Exception)
Raised when signature payload construction fails.
sign_bearer_token
def sign_bearer_token(bearer_token: str,
private_key_wif: str,
*,
wallet_connect: bool = True) -> tuple[str, str]
Returns (signature_hex, compressed_pubkey_hex)
- wallet_connect=True: msg = WC format (with prefix/len/salt/postfix), hash=SHA-256 X-Bearer-Signature = <DER signature hex> + <16B salt hex> X-Bearer-Signature-Key = <compressed public key hex> URL needs to append ?walletConnect=true
Module spoon_ai.neofs.client
NeoFSClient Objects
class NeoFSClient()
set_container_eacl
def set_container_eacl(container_id: str,
eacl: Eacl,
*,
bearer_token: Optional[str] = None,
wallet_connect: bool = True) -> SuccessResponse
Set container eACL.
Arguments:
container_id- Container IDeacl- eACL objectbearer_token- Optional Bearer Token (recommended for eACL operations)wallet_connect- Whether to use wallet_connect mode (default True)
download_object_by_id
def download_object_by_id(container_id: str,
object_id: str,
*,
bearer_token: Optional[str] = None,
download: bool | None = None,
range_header: str | None = None) -> httpx.Response
Download object by ID. Bearer token is optional for public containers.
get_object_header_by_id
def get_object_header_by_id(container_id: str,
object_id: str,
*,
bearer_token: Optional[str] = None,
range_header: str | None = None) -> httpx.Response
Get object header by ID. Bearer token is optional for public containers.
download_object_by_attribute
def download_object_by_attribute(
container_id: str,
attr_key: str,
attr_val: str,
*,
bearer_token: Optional[str] = None,
download: bool | None = None,
range_header: str | None = None) -> httpx.Response
Download object by attribute. Bearer token is optional for public containers.
get_object_header_by_attribute
def get_object_header_by_attribute(
container_id: str,
attr_key: str,
attr_val: str,
*,
bearer_token: Optional[str] = None,
range_header: str | None = None) -> httpx.Response
Get object header by attribute. Bearer token is optional for public containers.
delete_object
def delete_object(container_id: str,
object_id: str,
*,
bearer_token: Optional[str] = None) -> SuccessResponse
Delete object. Bearer token is optional for public containers, required for eACL containers with DENY DELETE rule.
search_objects
def search_objects(container_id: str,
search_request: SearchRequest,
*,
bearer_token: Optional[str] = None,
cursor: str = "",
limit: int = 100) -> ObjectListV2
Search objects. Bearer token is optional for public containers.
NeoFSException Objects
class NeoFSException(Exception)
Base exception for the NeoFS client.
NeoFSAPIException Objects
class NeoFSAPIException(NeoFSException)
Raised when the API returns an error.
Module spoon_ai.neofs
NeoFS integration for Spoon Core.
Module spoon_ai.neofs.models
Pydantic models describing NeoFS REST API payloads.
NetworkInfo Objects
class NetworkInfo(BaseModel)
Describes network configuration fees reported by the gateway.
Module spoon_ai.agents.toolcall
ToolCallAgent Objects
class ToolCallAgent(ReActAgent)
tool_choices
type: ignore
mcp_tools_cache_ttl
5 minutes TTL
run
async def run(request: Optional[str] = None) -> str
Override run method to handle finish_reason termination specially.
step
async def step() -> str
Override the step method to handle finish_reason termination properly.
Module spoon_ai.agents.react
Module spoon_ai.agents.mcp_client_mixin
MCPClientMixin Objects
class MCPClientMixin()
get_session
@asynccontextmanager
async def get_session()
Get a session with robust resource management and cleanup.
Features:
- Automatic session reuse per task
- Resource limits to prevent exhaustion
- Proper cleanup on cancellation/failure
- Periodic cleanup of stale sessions
list_mcp_tools
async def list_mcp_tools()
Get the list of available tools from the MCP server
call_mcp_tool
async def call_mcp_tool(tool_name: str, **kwargs)
Call a tool on the MCP server
send_mcp_message
async def send_mcp_message(recipient: str,
message: Union[str, Dict[str, Any]],
topic: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None) -> bool
Send a message to the MCP system
Arguments:
recipient- Recipient IDmessage- Message content (string or dictionary)topic- Message topicmetadata- Additional metadata
Returns:
bool- Whether the message was sent successfully
cleanup
async def cleanup()
Enhanced cleanup method with comprehensive resource cleanup.
get_session_stats
def get_session_stats() -> Dict[str, Any]
Get session statistics for monitoring.
Module spoon_ai.agents.spoon_react_mcp
SpoonReactMCP Objects
class SpoonReactMCP(SpoonReactAI)
list_mcp_tools
async def list_mcp_tools()
Return MCP tools from available_tools manager
Module spoon_ai.agents.monitor
Module spoon_ai.agents.rag
RetrievalMixin Objects
class RetrievalMixin()
Mixin class for retrieval-augmented generation functionality
initialize_retrieval_client
def initialize_retrieval_client(backend: str = 'chroma', **kwargs)
Initialize the retrieval client if it doesn't exist
add_documents
def add_documents(documents, backend: str = 'chroma', **kwargs)
Add documents to the retrieval system
retrieve_relevant_documents
def retrieve_relevant_documents(query, k=5, backend: str = 'chroma', **kwargs)
Retrieve relevant documents for a query
get_context_from_query
def get_context_from_query(query)
Get context string from relevant documents for a query
Module spoon_ai.agents.custom_agent
CustomAgent Objects
class CustomAgent(ToolCallAgent)
Custom Agent class allowing users to create their own agents and add custom tools
Usage: Create custom agent and add tools: agent = CustomAgent(name="my_agent", description="My custom agent") agent.add_tool(MyCustomTool()) result = await agent.run("Use my custom tool")
add_tool
def add_tool(tool: BaseTool) -> None
Add a tool to the agent with validation.
Arguments:
tool- Tool instance to add
Raises:
ValueError- If tool is invalid or already exists
add_tools
def add_tools(tools: List[BaseTool]) -> None
Add multiple tools to the agent with atomic operation.
Arguments:
tools- List of tool instances to add
Raises:
ValueError- If any tool is invalid
remove_tool
def remove_tool(tool_name: str) -> bool
Remove a tool from the agent.
Arguments:
tool_name- Name of the tool to remove
Returns:
bool- True if tool was removed, False if not found
list_tools
def list_tools() -> List[str]
List all available tools in the agent.
Returns:
List of tool names, empty list if no tools
get_tool_info
def get_tool_info() -> Dict[str, Dict[str, Any]]
Get detailed information about all tools.
Returns:
Dictionary with tool names as keys and tool info as values
validate_tools
def validate_tools() -> Dict[str, Any]
Validate all current tools and return validation report.
Returns:
Dictionary with validation results
run
async def run(request: Optional[str] = None) -> str
Run the agent with enhanced tool validation.
Arguments:
request- User request
Returns:
Processing result
clear
def clear()
Enhanced clear method with proper tool state management.
Module spoon_ai.agents.graph_agent
Graph-based agent implementation for SpoonOS.
This module provides the GraphAgent class that executes StateGraph workflows, integrating the graph execution system with the existing agent architecture.
GraphAgent Objects
class GraphAgent(BaseAgent)
An agent that executes StateGraph workflows.
This agent provides a bridge between the existing SpoonOS agent architecture and the new graph-based execution system. It allows complex, stateful workflows to be defined as graphs and executed with proper state management.
Key Features:
- Executes StateGraph workflows
- Maintains compatibility with existing agent interfaces
- Provides detailed execution logging and error handling
- Supports both sync and async node functions
__init__
def __init__(**kwargs)
Initialize the GraphAgent.
Arguments:
graph- StateGraph instance to execute**kwargs- Additional arguments passed to BaseAgent
Raises:
ValueError- If no graph is provided
validate_graph
@validator('graph')
def validate_graph(cls, v)
Validate that the provided graph is a StateGraph instance.
run
async def run(request: Optional[str] = None) -> str
Execute the graph workflow.
This method overrides the base run method to invoke the compiled graph instead of the traditional step-based execution loop.
Arguments:
request- Optional input request to include in initial state
Returns:
String representation of the execution result
Raises:
RuntimeError- If agent is not in IDLE stateGraphExecutionError- If graph execution fails
step
async def step() -> str
Step method for compatibility with BaseAgent.
Since GraphAgent uses graph execution instead of step-based execution, this method is not used in normal operation but is required by the BaseAgent interface.
Returns:
Status message indicating graph-based execution
get_execution_history
def get_execution_history() -> list
Get the execution history from the last graph run.
Returns:
List of execution steps with metadata
get_execution_metadata
def get_execution_metadata() -> Dict[str, Any]
Get metadata from the last execution.
Returns:
Dictionary containing execution metadata
clear_state
def clear_state()
Clear preserved state and execution history.
update_initial_state
def update_initial_state(updates: Dict[str, Any])
Update the initial state for future executions.
Arguments:
updates- Dictionary of state updates to merge
set_preserve_state
def set_preserve_state(preserve: bool)
Enable or disable state preservation between runs.
Arguments:
preserve- Whether to preserve state between runs
Module spoon_ai.agents
Module spoon_ai.agents.spoon_react
create_configured_chatbot
def create_configured_chatbot()
Create a ChatBot instance with intelligent provider selection.
SpoonReactAI Objects
class SpoonReactAI(ToolCallAgent)
__init__
def __init__(**kwargs)
Initialize SpoonReactAI with both ToolCallAgent and MCPClientMixin initialization
initialize
async def initialize(__context: Any = None)
Initialize async components and subscribe to topics
run
async def run(request: Optional[str] = None) -> str
Ensure prompts reflect current tools before running.
Module spoon_ai.agents.base
ThreadSafeOutputQueue Objects
class ThreadSafeOutputQueue()
Thread-safe output queue with fair access and timeout protection
get
async def get(timeout: Optional[float] = 30.0) -> Any
Get item with timeout and fair access
BaseAgent Objects
class BaseAgent(BaseModel, ABC)
Thread-safe base class for all agents with proper concurrency handling.
add_message
async def add_message(role: Literal["user", "assistant", "tool"],
content: str,
tool_call_id: Optional[str] = None,
tool_calls: Optional[List[ToolCall]] = None,
tool_name: Optional[str] = None,
timeout: Optional[float] = None) -> None
Thread-safe message addition with timeout protection
state_context
@asynccontextmanager
async def state_context(new_state: AgentState,
timeout: Optional[float] = None)
Thread-safe state context manager with deadlock prevention. Acquires the state lock only to perform quick transitions, not for the duration of the work inside the context, avoiding long-held locks and false timeouts during network calls.
run
async def run(request: Optional[str] = None,
timeout: Optional[float] = None) -> str
Thread-safe run method with proper concurrency control and callback support.
step
async def step(run_id: Optional[uuid.UUID] = None) -> str
Override this method in subclasses - now with step-level locking and callback support.
is_stuck
async def is_stuck() -> bool
Thread-safe stuck detection
handle_stuck_state
async def handle_stuck_state()
Thread-safe stuck state handling
add_documents
def add_documents(documents) -> None
Store documents on the agent so CLI load-docs works without RAG mixin.
This default implementation keeps the documents in-memory under self._loaded_documents. Agents that support retrieval should override this method to index documents into their vector store.
save_chat_history
def save_chat_history()
Thread-safe chat history saving
stream
async def stream(timeout: Optional[float] = None)
Thread-safe streaming with proper cleanup and timeout
process_mcp_message
async def process_mcp_message(content: Any,
sender: str,
message: Dict[str, Any],
agent_id: str,
timeout: Optional[float] = None)
Thread-safe MCP message processing with timeout protection
shutdown
async def shutdown(timeout: float = 30.0)
Graceful shutdown with cleanup of active operations
get_diagnostics
def get_diagnostics() -> Dict[str, Any]
Get diagnostic information about the agent's state
Module spoon_ai.rag.embeddings
HashEmbeddingClient Objects
class HashEmbeddingClient(EmbeddingClient)
Deterministic offline embedding via hashing.
Produces fixed-length vectors in [0,1] normalized range. Not semantically meaningful but stable for tests and offline demos.
get_embedding_client
def get_embedding_client(
provider: Optional[str],
*,
openai_api_key: Optional[str] = None,
openai_model: str = "text-embedding-3-small") -> EmbeddingClient
Create an embedding client.
Provider selection rules:
- provider is None/"auto": pick the first configured embeddings provider using a dedicated priority order (OpenAI > OpenRouter > Gemini).
- provider is "openai" / "openrouter" / "gemini" / "ollama": force that provider (uses core env config when applicable).
- provider is "openai_compatible": use OpenAI-compatible embeddings via RAG_EMBEDDINGS_* env vars.
- otherwise: deterministic hash embeddings (offline).
Module spoon_ai.rag.loader
Module spoon_ai.rag.retriever
Module spoon_ai.rag.qa
Module spoon_ai.rag.vectorstores.faiss_store
FaissVectorStore Objects
class FaissVectorStore(VectorStore)
FAISS-backed local vector store (cosine via inner product + L2 norm).
Module spoon_ai.rag.vectorstores.chroma_store
Module spoon_ai.rag.vectorstores.pinecone_store
Module spoon_ai.rag.vectorstores.qdrant_store
Module spoon_ai.rag.vectorstores.registry
get_vector_store
def get_vector_store(backend: Optional[str] = None) -> VectorStore
Return a vector store by backend name.
Backends:
- faiss: local/offline (mapped to in-memory cosine store)
- pinecone: cloud Pinecone (requires PINECONE_API_KEY)
- qdrant: local/cloud Qdrant (requires qdrant-client, default http://localhost:6333)
- chroma: local Chroma (requires chromadb)
Module spoon_ai.rag.vectorstores
Module spoon_ai.rag.vectorstores.base
VectorStore Objects
class VectorStore(ABC)
query
@abstractmethod
def query(
*,
collection: str,
query_embeddings: List[List[float]],
top_k: int = 5,
filter: Optional[Dict] = None) -> List[List[Tuple[str, float, Dict]]]
Return per-query list of (id, score, metadata). Higher score is better.
Module spoon_ai.rag.config
RagConfig Objects
@dataclass
class RagConfig()
backend
faiss|pinecone|qdrant|chroma
Module spoon_ai.rag
Module spoon_ai.rag.index
Module spoon_ai.prompts.toolcall
Module spoon_ai.prompts
Module spoon_ai.prompts.spoon_react
Module spoon_ai.turnkey.client
Turnkey Objects
class Turnkey()
Turnkey API client class for managing blockchain private keys and wallet operations.
__init__
def __init__(base_url=None,
api_public_key=None,
api_private_key=None,
org_id=None)
Initialize Turnkey client.
Arguments:
base_urlstr - Turnkey API base URL (defaults from .env or default value).api_public_keystr - Turnkey API public key.api_private_keystr - Turnkey API private key.org_idstr - Turnkey organization ID.
Raises:
ValueError- If required configuration parameters are missing.
whoami
def whoami()
Call whoami API to get organization information.
Returns:
dict- JSON response containing organization information.
import_private_key
def import_private_key(user_id,
private_key_name,
encrypted_bundle,
curve="CURVE_SECP256K1",
address_formats=["ADDRESS_FORMAT_ETHEREUM"])
Import private key to Turnkey.
Arguments:
user_idstr - User ID.private_key_namestr - Private key name.encrypted_bundlestr - Encrypted private key bundle.curvestr - Elliptic curve type, defaults to CURVE_SECP256K1.address_formatslist - Address format list, defaults to ["ADDRESS_FORMAT_ETHEREUM"].
Returns:
dict- JSON response containing imported private key information.
sign_evm_transaction
def sign_evm_transaction(sign_with, unsigned_tx)
Sign EVM transaction using Turnkey.
Arguments:
sign_withstr - Signing identity (wallet account address / private key address / private key ID).unsigned_txstr - Raw unsigned transaction (hex string).
Returns:
-
dict- JSON response containing signing result, see signTransactionResult.signedTransaction.Reference: https://docs.turnkey.com/api-reference/activities/sign-transaction
sign_typed_data
def sign_typed_data(sign_with, typed_data)
Sign EIP-712 structured data.
Arguments:
sign_withstr - Signing identity (wallet account address / private key address / private key ID).typed_datadict|str - EIP-712 structure (domain/types/message) or its JSON string.
Returns:
dict- Activity response, result contains r/s/v.
Notes:
- encoding uses PAYLOAD_ENCODING_EIP712
- hashFunction uses HASH_FUNCTION_NOT_APPLICABLE (server completes EIP-712 spec hashing)
sign_message
def sign_message(sign_with, message, use_keccak256=True)
Sign arbitrary message (defaults to KECCAK256 following Ethereum convention).
Arguments:
sign_withstr - Signing identity (wallet account address / private key address / private key ID).messagestr|bytes - Text to be signed; bytes will be decoded as UTF-8.use_keccak256bool - Whether to use KECCAK256 as hash function (default True).
Returns:
dict- Activity response, result contains r/s/v.
get_activity
def get_activity(activity_id)
Query Activity details.
Arguments:
activity_idstr - Activity ID.
Returns:
-
dict- Activity details.Reference: https://docs.turnkey.com/api-reference/queries/get-activity
list_activities
def list_activities(limit=None,
before=None,
after=None,
filter_by_status=None,
filter_by_type=None)
List activities within organization (paginated).
Arguments:
limitstr|None - Number per page.beforestr|None - Pagination cursor (before).afterstr|None - Pagination cursor (after).filter_by_statuslist|None - Filter by activity status (e.g., ['ACTIVITY_STATUS_COMPLETED']).filter_by_typelist|None - Filter by activity type (e.g., ['ACTIVITY_TYPE_SIGN_TRANSACTION_V2']).
Returns:
-
dict- Activity list.Reference: https://docs.turnkey.com/api-reference/queries/list-activities
get_policy_evaluations
def get_policy_evaluations(activity_id)
Query policy evaluation results for an Activity (if available).
Arguments:
activity_idstr - Activity ID.
Returns:
-
dict- Policy evaluation details.Reference: https://docs.turnkey.com/api-reference/queries/get-policy-evaluations
get_private_key
def get_private_key(private_key_id)
Query information for specified private key.
Arguments:
private_key_idstr - Private key ID.
Returns:
dict- JSON response containing private key information.
create_wallet
def create_wallet(wallet_name, accounts, mnemonic_length=24)
Create new wallet.
Arguments:
wallet_namestr - Wallet name.accountslist - Account configuration list, each account contains curve, pathFormat, path, addressFormat.mnemonic_lengthint - Mnemonic length (default 24).
Returns:
dict- JSON response containing new wallet information.
create_wallet_accounts
def create_wallet_accounts(wallet_id, accounts)
Add accounts to existing wallet.
Arguments:
wallet_idstr - Wallet ID.accountslist - New account configuration list, each account contains curve, pathFormat, path, addressFormat.
Returns:
dict- JSON response containing new account information.
get_wallet
def get_wallet(wallet_id)
Query information for specified wallet.
Arguments:
wallet_idstr - Wallet ID.
Returns:
dict- JSON response containing wallet information.
get_wallet_account
def get_wallet_account(wallet_id, address=None, path=None)
Query information for specified wallet account.
Arguments:
wallet_idstr - Wallet ID.addressstr, optional - Account address.pathstr, optional - Account path (e.g., m/44'/60'/0'/0/0).
Returns:
dict- JSON response containing account information.
Raises:
ValueError- If neither address nor path is provided.
list_wallets
def list_wallets()
List all wallets in the organization.
Returns:
dict- JSON response containing wallet list.
list_wallet_accounts
def list_wallet_accounts(wallet_id, limit=None, before=None, after=None)
List account list for specified wallet.
Arguments:
wallet_idstr - Wallet ID.limitstr, optional - Number of accounts returned per page.beforestr, optional - Pagination cursor, returns accounts before this ID.afterstr, optional - Pagination cursor, returns accounts after this ID.
Returns:
dict- JSON response containing account list.
init_import_wallet
def init_import_wallet(user_id)
Initialize wallet import process, generate import_bundle.
Arguments:
user_idstr - User ID.
Returns:
dict- JSON response containing import_bundle.
encrypt_wallet
def encrypt_wallet(mnemonic,
user_id,
import_bundle,
encryption_key_name="demo-encryption-key")
Encrypt mnemonic using Turnkey CLI, generate encrypted_bundle.
Arguments:
mnemonicstr - Mnemonic phrase (12/15/18/21/24 words).user_idstr - User ID.import_bundlestr - import_bundle obtained from init_import_wallet.encryption_key_namestr - Encryption key name, defaults to demo-encryption-key.
Returns:
str- Encrypted encrypted_bundle.
Raises:
RuntimeError- If CLI command fails or turnkey CLI is not installed.
encrypt_private_key
def encrypt_private_key(private_key,
user_id,
import_bundle,
key_format="hexadecimal",
encryption_key_name="demo-encryption-key")
Encrypt private key using Turnkey CLI, generate encrypted_bundle, equivalent to:
turnkey encrypt --import-bundle-input "./import_bundle.txt" --plaintext-input /dev/fd/3 --key-format "hexadecimal" --encrypted-bundle-output "./encrypted_bundle.txt"
Arguments:
private_keystr - Private key string (hexadecimal or Solana format).user_idstr - User ID.import_bundlestr - import_bundle obtained from init_import_private_key.key_formatstr - Private key format, defaults to "hexadecimal" (supports "hexadecimal", "solana").encryption_key_namestr - Encryption key name, defaults to "demo-encryption-key".
Returns:
str- Encrypted encrypted_bundle (Base64 encoded string).
Raises:
ValueError- If private_key, user_id, import_bundle is empty or key_format is invalid.RuntimeError- If CLI command fails or turnkey CLI is not installed.
init_import_private_key
def init_import_private_key(user_id)
Initialize private key import process, generate import_bundle.
Arguments:
user_idstr - User ID.
Returns:
dict- JSON response containing import_bundle.
import_wallet
def import_wallet(user_id, wallet_name, encrypted_bundle, accounts=None)
Import wallet to Turnkey.
Arguments:
user_idstr - User ID.wallet_namestr - Wallet name.encrypted_bundlestr - Encrypted mnemonic bundle.accountslist, optional - Account configuration list, each account contains curve, pathFormat, path, addressFormat.
Returns:
dict- JSON response containing imported wallet information.
Module spoon_ai.turnkey
Turnkey client integration for SpoonAI.
Provides Turnkey for secure signing via Turnkey API.
Module spoon_ai.callbacks.streaming_stdout
StreamingStdOutCallbackHandler Objects
class StreamingStdOutCallbackHandler(BaseCallbackHandler)
Callback handler that streams tokens to standard output.
on_llm_new_token
def on_llm_new_token(token: str, **kwargs: Any) -> None
Print token to stdout immediately.
Arguments:
token- The new token to print**kwargs- Additional context (ignored)
on_llm_end
def on_llm_end(response: Any, **kwargs: Any) -> None
Print newline after LLM completes.
Arguments:
response- The complete LLM response (ignored)**kwargs- Additional context (ignored)
Module spoon_ai.callbacks.statistics
StreamingStatisticsCallback Objects
class StreamingStatisticsCallback(BaseCallbackHandler, LLMManagerMixin)
Collect simple throughput statistics during streaming runs.
By default, the callback prints summary metrics when the LLM finishes.
Consumers can provide a custom print_fn to redirect output, or disable
printing entirely and read the public attributes after execution.
Module spoon_ai.callbacks.stream_event
StreamEventCallbackHandler Objects
class StreamEventCallbackHandler(BaseCallbackHandler)
Translate callback invocations into standardized stream events.
Module spoon_ai.callbacks.manager
CallbackManager Objects
class CallbackManager()
Lightweight dispatcher for callback handlers.
Module spoon_ai.callbacks
Callback system for streaming and event handling in Spoon AI.
This module provides a comprehensive callback system similar to LangChain's callbacks, enabling real-time monitoring and event handling for LLM calls, agent execution, tool invocation, and graph workflows.
Module spoon_ai.callbacks.base
RetrieverManagerMixin Objects
class RetrieverManagerMixin()
Mixin providing retriever callback hooks.
on_retriever_start
def on_retriever_start(run_id: UUID, query: Any, **kwargs: Any) -> Any
Run when a retriever begins execution.
on_retriever_end
def on_retriever_end(run_id: UUID, documents: Any, **kwargs: Any) -> Any
Run when a retriever finishes successfully.
on_retriever_error
def on_retriever_error(error: BaseException, *, run_id: UUID,
**kwargs: Any) -> Any
Run when a retriever raises an error.
LLMManagerMixin Objects
class LLMManagerMixin()
Mixin providing large language model callback hooks.
on_llm_start
def on_llm_start(run_id: UUID, messages: List[Message], **kwargs: Any) -> Any
Run when an LLM or chat model begins execution.
on_llm_new_token
def on_llm_new_token(token: str,
*,
chunk: Optional[LLMResponseChunk] = None,
run_id: Optional[UUID] = None,
**kwargs: Any) -> Any
Run for each streamed token emitted by an LLM.
on_llm_end
def on_llm_end(response: LLMResponse, *, run_id: UUID, **kwargs: Any) -> Any
Run when an LLM finishes successfully.
on_llm_error
def on_llm_error(error: BaseException, *, run_id: UUID, **kwargs: Any) -> Any
Run when an LLM raises an error.
ChainManagerMixin Objects
class ChainManagerMixin()
Mixin providing chain-level callback hooks.
on_chain_start
def on_chain_start(run_id: UUID, inputs: Any, **kwargs: Any) -> Any
Run when a chain (Runnable) starts executing.
on_chain_end
def on_chain_end(run_id: UUID, outputs: Any, **kwargs: Any) -> Any
Run when a chain finishes successfully.
on_chain_error
def on_chain_error(error: BaseException, *, run_id: UUID,
**kwargs: Any) -> Any
Run when a chain raises an error.
ToolManagerMixin Objects
class ToolManagerMixin()
Mixin providing tool callback hooks.
on_tool_start
def on_tool_start(tool_name: str, tool_input: Any, *, run_id: UUID,
**kwargs: Any) -> Any
Run when a tool invocation begins.
on_tool_end
def on_tool_end(tool_name: str, tool_output: Any, *, run_id: UUID,
**kwargs: Any) -> Any
Run when a tool invocation succeeds.
on_tool_error
def on_tool_error(error: BaseException,
*,
run_id: UUID,
tool_name: Optional[str] = None,
**kwargs: Any) -> Any
Run when a tool invocation raises an error.
PromptManagerMixin Objects
class PromptManagerMixin()
Mixin providing prompt template callback hooks.
on_prompt_start
def on_prompt_start(run_id: UUID, inputs: Any, **kwargs: Any) -> Any
Run when a prompt template begins formatting.
on_prompt_end
def on_prompt_end(run_id: UUID, output: Any, **kwargs: Any) -> Any
Run when a prompt template finishes formatting.
on_prompt_error
def on_prompt_error(error: BaseException, *, run_id: UUID,
**kwargs: Any) -> Any
Run when prompt formatting raises an error.
BaseCallbackHandler Objects
class BaseCallbackHandler(LLMManagerMixin, ChainManagerMixin, ToolManagerMixin,
RetrieverManagerMixin, PromptManagerMixin, ABC)
Base class for SpoonAI callback handlers.
raise_error
Whether to re-raise exceptions originating from callbacks.
run_inline
Whether the callback prefers to run on the caller's event loop.
ignore_llm
@property
def ignore_llm() -> bool
Return True to skip LLM callbacks.
ignore_chain
@property
def ignore_chain() -> bool
Return True to skip chain callbacks.
ignore_tool
@property
def ignore_tool() -> bool
Return True to skip tool callbacks.
ignore_retriever
@property
def ignore_retriever() -> bool
Return True to skip retriever callbacks.
ignore_prompt
@property
def ignore_prompt() -> bool
Return True to skip prompt callbacks.
AsyncCallbackHandler Objects
class AsyncCallbackHandler(BaseCallbackHandler)
Async version of the callback handler base class.
Module spoon_ai.schema
Function Objects
class Function(BaseModel)
get_arguments_dict
def get_arguments_dict() -> dict
Parse arguments string to dictionary.
Returns:
dict- Parsed arguments as dictionary
create
@classmethod
def create(cls, name: str, arguments: Union[str, dict]) -> "Function"
Create Function with arguments as string or dict.
Arguments:
name- Function namearguments- Function arguments as string or dict
Returns:
Function- Function instance with arguments as JSON string
AgentState Objects
class AgentState(str, Enum)
The state of the agent.
ToolChoice Objects
class ToolChoice(str, Enum)
Tool choice options
Role Objects
class Role(str, Enum)
Message role options
ROLE_TYPE
type: ignore
Message Objects
class Message(BaseModel)
Represents a chat message in the conversation
role
type: ignore
SystemMessage Objects
class SystemMessage(Message)
role
type: ignore
TOOL_CHOICE_TYPE
type: ignore
LLMConfig Objects
class LLMConfig(BaseModel)
Configuration for LLM providers
LLMResponse Objects
class LLMResponse(BaseModel)
Unified LLM response model
text
Original text response
LLMResponseChunk Objects
class LLMResponseChunk(BaseModel)
Enhanced LLM streaming response chunk.
Module spoon_ai.memory.utils
Memory helpers shared across Mem0 demos and utilities.
extract_memories
def extract_memories(result: Any) -> List[str]
Normalize Mem0 search/get responses into a list of memory strings. Supports common shapes: {"memories": [...]}, {"results": [...]}, {"data": [...]}, list, or scalar.
extract_first_memory_id
def extract_first_memory_id(result: Any) -> Optional[str]
Pull the first memory id from Mem0 responses. Supports common id fields: id, _id, memory_id, uuid.
Module spoon_ai.memory.short_term_manager
Short-term memory management for conversation history.
TrimStrategy Objects
class TrimStrategy(str, Enum)
Strategy for trimming messages.
FROM_START
Remove oldest messages first
FROM_END
Remove newest messages first
MessageTokenCounter Objects
class MessageTokenCounter()
Approximate token counter aligned with LangChain semantics.
ShortTermMemoryManager Objects
class ShortTermMemoryManager()
Manager for short-term conversation memory with advanced operations.
trim_messages
async def trim_messages(messages: List[Message],
max_tokens: int,
strategy: TrimStrategy = TrimStrategy.FROM_END,
keep_system: bool = True,
model: Optional[str] = None) -> List[Message]
Trim messages using a LangChain-style heuristic.
summarize_messages
async def summarize_messages(
messages: List[Message],
max_tokens_before_summary: int,
messages_to_keep: int = 5,
summary_model: Optional[str] = None,
llm_manager=None,
llm_provider: Optional[str] = None,
existing_summary: str = ""
) -> Tuple[List[Message], List[RemoveMessage], Optional[str]]
Summarize earlier messages and emit removal directives.
Module spoon_ai.memory.remove_message
Helpers for emitting message-removal directives.
RemoveMessage Objects
class RemoveMessage(BaseModel)
Lightweight message that signals another message should be removed.
Module spoon_ai.memory
Short-term memory management for conversation history.
This module provides memory management utilities for maintaining and optimizing conversation history in chat applications.
Module spoon_ai.memory.mem0_client
SpoonMem0 Objects
class SpoonMem0()
Lightweight wrapper around Mem0's MemoryClient with safe defaults.
add_text
def add_text(data: str,
user_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None) -> None
Convenience helper for adding a single text memory.
get_all_memory
def get_all_memory(user_id: Optional[str] = None,
limit: Optional[int] = None) -> List[str]
Retrieve all memories for a user (subject to backend limits).