Base Node API Reference
The node system in SpoonOS provides the building blocks for graph execution. All nodes inherit from BaseNode
and implement specific execution patterns.
BaseNode Class​
from spoon_ai.graph import BaseNode
from abc import ABC
from typing import Generic, TypeVar, Dict, Any, Optional
State = TypeVar('State')
class BaseNode(ABC, Generic[State]):
def __init__(self, name: str)
self.name = name
@abstractmethod
async def __call__(self, state: State, config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Execute the node logic"""
pass
def __repr__(self) -> str:
return f"{self.__class__.__name__}(name='{self.name}')"
BaseNode Methods​
Constructor​
__init__(name: str)
​
Initialize a node with a unique name.
Parameters:
name
(str
): Unique identifier for the node
Abstract Methods​
async __call__(state: State, config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]
​
Execute the node's logic. Must be implemented by subclasses.
Parameters:
state
(State
): Current graph stateconfig
(Optional[Dict[str, Any]]
): Optional configuration parameters
Returns:
Dict[str, Any]
: State updates from node execution
Raises:
NodeExecutionError
: When node execution fails
RunnableNode Class​
from spoon_ai.graph import RunnableNode
from typing import Callable, Any
class RunnableNode(BaseNode[State]):
def __init__(self, name: str, func: Callable[[State], Any])
self.func = func
async def __call__(self, state: State, config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
# Execute wrapped function
RunnableNode Features​
- Wraps any callable (sync or async) function
- Handles multiple return types automatically
- Provides consistent error handling
- Supports both dict and tuple return formats
Usage Examples​
# Simple function node
async def analyze_sentiment(state: MyState) -> Dict[str, Any]:
text = state.get("text", "")
sentiment = "positive" if "good" in text.lower() else "negative"
return {"sentiment": sentiment, "confidence": 0.85}
node = RunnableNode("sentiment_analyzer", analyze_sentiment)
# Function returning tuple (updates, next_node)
def process_with_routing(state: MyState) -> tuple:
result = {"processed": True}
next_node = "success" if result["processed"] else "retry"
return result, next_node
node = RunnableNode("processor", process_with_routing)
ToolNode Class​
from spoon_ai.graph import ToolNode
from typing import List, Any
class ToolNode(BaseNode[State]):
def __init__(self, name: str, tools: List[Any])
self.tools = tools
async def __call__(self, state: State, config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
# Execute tools based on state
ToolNode Features​
- Executes multiple tools in sequence
- Extracts tool calls from state automatically
- Supports both sync and async tool execution
- Provides comprehensive error handling per tool
Usage Examples​
from spoon_ai.tools import CalculatorTool, SearchTool
# Create tools
calculator = CalculatorTool()
search = SearchTool()
tools = [calculator, search]
# Create tool node
tool_node = ToolNode("tool_executor", tools)
# State with tool calls
state = {
"tool_calls": [
{"name": "calculator", "args": {"operation": "add", "a": 10, "b": 5}},
{"name": "search", "args": {"query": "python tutorial", "limit": 5}}
]
}
# Execute tools
result = await tool_node(state)
# result["tool_results"] contains execution results
Tool Call Format​
# Expected tool call format in state
{
"tool_calls": [
{
"name": "tool_name",
"args": {
"param1": "value1",
"param2": "value2"
}
}
]
}
# Tool result format
{
"tool_results": [
{
"tool_call": {...}, # Original call
"result": {...}, # Tool execution result
"success": True, # Execution success flag
"error": None # Error message if failed
}
]
}
ConditionNode Class​
from spoon_ai.graph import ConditionNode
from typing import Callable
class ConditionNode(BaseNode[State]):
def __init__(self, name: str, condition_func: Callable[[State], str])
self.condition_func = condition_func
async def __call__(self, state: State, config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
# Execute condition and return routing decision
ConditionNode Features​
- Evaluates conditions for routing decisions
- Supports both sync and async condition functions
- Returns routing decision in standardized format
- Provides clear error messages for condition failures
Usage Examples​
# Simple condition function
def route_by_complexity(state: MyState) -> str:
query = state.get("query", "")
return "complex" if len(query) > 100 else "simple"
condition_node = ConditionNode("router", route_by_complexity)
# Async condition function
async def route_by_llm(state: MyState) -> str:
# Use LLM to decide routing
decision = await llm_manager.classify(state["query"])
return decision
condition_node = ConditionNode("llm_router", route_by_llm)
Condition Result Format​
# Condition node result
{
"condition_result": "complex", # The routing decision
"next_node": "complex" # Same as condition_result
}
Custom Node Implementation​
Basic Custom Node​
from spoon_ai.graph import BaseNode
class CustomAnalysisNode(BaseNode[MyState]):
def __init__(self, name: str, model_config: Dict[str, Any]):
super().__init__(name)
self.model_config = model_config
async def __call__(self, state: MyState, config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
# Custom logic here
data = state.get("data", [])
# Process data
analysis = await self._analyze_data(data)
# Return state updates
return {
"analysis_result": analysis,
"processed_at": datetime.now().isoformat(),
"model_used": self.model_config.get("model")
}
async def _analyze_data(self, data: List[Any]) -> Dict[str, Any]:
# Implementation details
pass
Node with Configuration​
class ConfigurableNode(BaseNode[MyState]):
def __init__(self, name: str, api_key: str, timeout: int = 30):
super().__init__(name)
self.api_key = api_key
self.timeout = timeout
async def __call__(self, state: MyState, config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
# Merge instance config with call config
effective_config = {**self.__dict__, **(config or {})}
# Use configuration in execution
result = await self._call_external_api(
state["query"],
api_key=effective_config["api_key"],
timeout=effective_config["timeout"]
)
return {"api_result": result}
Node Patterns and Best Practices​
State Transformation Pattern​
class TransformerNode(BaseNode[MyState]):
async def __call__(self, state: MyState, config=None) -> Dict[str, Any]:
# Transform input data
raw_data = state.get("raw_data", [])
transformed = [self._transform_item(item) for item in raw_data]
return {
"transformed_data": transformed,
"transformation_count": len(transformed),
"transformation_timestamp": datetime.now().isoformat()
}
Validation Node Pattern​
class ValidationNode(BaseNode[MyState]):
def __init__(self, name: str, validators: List[Callable]):
super().__init__(name)
self.validators = validators
async def __call__(self, state: MyState, config=None) -> Dict[str, Any]:
errors = []
for validator in self.validators:
try:
validator(state)
except Exception as e:
errors.append(str(e))
return {
"validation_passed": len(errors) == 0,
"validation_errors": errors,
"validated_at": datetime.now().isoformat()
}
Aggregation Node Pattern​
class AggregationNode(BaseNode[MyState]):
async def __call__(self, state: MyState, config=None) -> Dict[str, Any]:
# Aggregate results from parallel branches
branch_results = state.get("branch_results", [])
aggregated = self._aggregate_results(branch_results)
return {
"aggregated_result": aggregated,
"branch_count": len(branch_results),
"aggregation_method": "weighted_average"
}
Error Handling​
Node-Level Error Handling​
class ResilientNode(BaseNode[MyState]):
async def __call__(self, state: MyState, config=None) -> Dict[str, Any]:
try:
result = await self._execute_logic(state)
return {
"result": result,
"success": True,
"error": None
}
except Exception as e:
logger.error(f"Node {self.name} failed: {e}")
return {
"result": None,
"success": False,
"error": str(e),
"error_type": type(e).__name__
}
Retry Logic​
class RetryNode(BaseNode[MyState]):
def __init__(self, name: str, max_retries: int = 3):
super().__init__(name)
self.max_retries = max_retries
async def __call__(self, state: MyState, config=None) -> Dict[str, Any]:
for attempt in range(self.max_retries):
try:
return await self._execute_with_retry(state)
except Exception as e:
if attempt == self.max_retries - 1:
raise NodeExecutionError(
f"Failed after {self.max_retries} attempts",
node_name=self.name,
original_error=e,
state=state
) from e
await asyncio.sleep(2 ** attempt) # Exponential backoff
Testing Nodes​
Unit Testing Pattern​
import pytest
from unittest.mock import AsyncMock
@pytest.mark.asyncio
async def test_custom_node():
node = CustomAnalysisNode("test_node", {"model": "test"})
# Mock the internal method
node._analyze_data = AsyncMock(return_value={"score": 0.95})
state = {"data": [1, 2, 3]}
result = await node(state)
assert result["analysis_result"]["score"] == 0.95
assert result["success"] is True
Integration Testing​
@pytest.mark.asyncio
async def test_node_in_graph():
# Create a simple graph with the node
graph = StateGraph(MyState)
graph.add_node("test_node", CustomAnalysisNode("test", {}))
graph.set_entry_point("test_node")
graph.add_edge("test_node", END)
compiled = graph.compile()
result = await compiled.invoke({"data": [1, 2, 3]})
assert "analysis_result" in result
Performance Considerations​
Async Best Practices​
class OptimizedNode(BaseNode[MyState]):
async def __call__(self, state: MyState, config=None) -> Dict[str, Any]:
# Use asyncio.gather for concurrent operations
results = await asyncio.gather(
self._fetch_data_a(state),
self._fetch_data_b(state),
self._process_data(state)
)
return {
"combined_result": self._combine_results(results)
}
Memory Management​
class MemoryEfficientNode(BaseNode[MyState]):
async def __call__(self, state: MyState, config=None) -> Dict[str, Any]:
# Process data in chunks to manage memory
data = state.get("large_dataset", [])
chunk_size = 1000
results = []
for i in range(0, len(data), chunk_size):
chunk = data[i:i + chunk_size]
chunk_result = await self._process_chunk(chunk)
results.extend(chunk_result)
# Allow other tasks to run
await asyncio.sleep(0)
return {"processed_data": results}
See Also​
- StateGraph API - How to use nodes in graphs
- Graph Agent API - Agent-level node execution
- Graph System Overview - Node concepts and patterns