Skip to main content

Graph System

The Graph System in SpoonOS enables complex, multi-step workflows through a structured, node-and-edge execution engine called StateGraph.

What you get

  • Deterministic control flow: explicit nodes and edges
  • Intelligent routing: LLM router, rule-based, and conditional functions
  • Parallel execution: run branches concurrently with join strategies
  • State management: type-safe, reducer-based merging across steps
  • Memory integration: persist context across runs or update memory as a node

Quick Start with StateGraph

from typing import TypedDict, Dict, Any, Optional, Annotated
from spoon_ai.graph import StateGraph, END


class MyState(TypedDict):
user_query: str
intent: str
result: str
memory: Annotated[Optional[Dict[str, Any]], None]


async def analyze_intent(state: MyState) -> Dict[str, Any]:
# Use your LLM manager here in real code
query = state.get("user_query", "").lower()
intent = "greet" if "hello" in query else "other"
return {"intent": intent}


async def generate_result(state: MyState) -> Dict[str, Any]:
intent = state.get("intent", "other")
if intent == "greet":
return {"result": "Hi! How can I help?"}
return {"result": "Let me analyze that..."}


def build_graph() -> StateGraph:
graph = StateGraph(MyState)
graph.add_node("analyze_intent", analyze_intent)
graph.add_node("generate_result", generate_result)
graph.set_entry_point("analyze_intent")
graph.add_edge("analyze_intent", "generate_result")
graph.add_edge("generate_result", END)
return graph

Execute:

compiled = build_graph().compile()
result = await compiled.invoke({"user_query": "hello graph"})
print(result["result"]) # Hi! How can I help?

Intelligent Routing

SpoonOS offers three complementary routing styles. You can combine them; precedence is: LLM router → intelligent rules → regular edges.

1) LLM-Powered Router

graph.enable_llm_routing(config={"model": "gpt-4", "temperature": 0.1, "max_tokens": 64})

The engine will ask the LLM to select the best next node name based on the query and state. Results are validated against available nodes.

2) Conditional Edges (function-based)

def route_after_intent(state: MyState) -> str:
return "path_a" if state.get("intent") == "greet" else "path_b"

graph.add_conditional_edges(
"analyze_intent",
route_after_intent,
{"path_a": "generate_result", "path_b": "fallback"}
)

3) Rules and Patterns

graph.add_routing_rule("analyze_intent", lambda s, q: "price" in q, target_node="fetch_prices", priority=10)
graph.add_pattern_routing("analyze_intent", r"buy|sell|trade", target_node="make_decision", priority=5)

Parallel Execution

Group nodes and run them concurrently with explicit join strategies.

graph.add_parallel_group(
"fetch_group",
["fetch_prices", "fetch_social", "fetch_news"],
{"join_strategy": "all_complete", "error_strategy": "ignore_errors", "timeout": 15}
)

If the current node belongs to a parallel group, the engine gathers all branch updates and merges them into state using reducers.


Memory Integration

You can integrate memory in two ways:

Use GraphAgent persistent memory to store messages and metadata.

from spoon_ai.graph import GraphAgent

graph = build_graph()
agent = GraphAgent(name="demo", graph=graph, preserve_state=True)
await agent.run("hello")
stats = agent.get_memory_statistics()

B) Node-Level Memory (demo-style)

Add memory as regular nodes at graph entry/exit to load/update per-user context.

async def load_memory(state: MyState) -> Dict[str, Any]:
# read your JSON or DB here; keep it small and safe
return {"memory": {"greet_count": 3}}

async def update_memory(state: MyState) -> Dict[str, Any]:
# write back learned patterns/statistics
return {"memory": state.get("memory", {})}

graph.add_node("load_memory", load_memory)
graph.add_node("update_memory", update_memory)
graph.add_edge("__start__", "load_memory") # entry
graph.add_edge("load_memory", "analyze_intent")
graph.add_edge("generate_result", "update_memory")
graph.add_edge("update_memory", END)

Monitoring and Metrics

graph.enable_monitoring(["execution_time", "success_rate", "routing_performance"])
compiled = graph.compile()
result = await compiled.invoke({"user_query": "..."})
metrics = compiled.get_execution_metrics()

End-to-End Example (Intent → Parallel Fetch → Analysis → Memory)

The full example (used in our demo) routes a crypto query into general_qa, short_term_trend, macro_trend, or deep_research, runs true parallel data fetching, and updates memory before finishing.

from typing import TypedDict, Dict, Any, Optional, Annotated
from spoon_ai.graph import StateGraph, END


class AdvancedState(TypedDict):
user_query: str
user_name: str
session_id: str
symbol: str
query_analysis: Annotated[Optional[Dict[str, Any]], None]
final_output: str
memory_state: Annotated[Optional[Dict[str, Any]], None]


async def initialize_session(state: AdvancedState) -> Dict[str, Any]:
return {"session_id": "session_...", "final_output": ""}


async def analyze_query_intent(state: AdvancedState) -> Dict[str, Any]:
# Ask LLM to classify into: general_qa | short_term_trend | macro_trend | deep_research
category = "macro_trend" # pretend LLM said so
return {"query_analysis": {"query_type": category}}


async def load_memory(state: AdvancedState) -> Dict[str, Any]:
return {"memory_state": {"learned_patterns": {"query_type_counts": {"macro_trend": 12}}}}


async def update_memory(state: AdvancedState) -> Dict[str, Any]:
# persist updated counts, last summary, per-symbol stats, etc.
return {}


def build_advanced_graph() -> StateGraph:
g = StateGraph(AdvancedState).enable_monitoring(["execution_time"])
g.add_node("initialize_session", initialize_session)
g.add_node("load_memory", load_memory)
g.add_node("analyze_query_intent", analyze_query_intent)
# ... add: extract_symbol, fetch_15m/30m/1h, fetch_4h/1d/1w, search_news, analysis nodes, etc.
g.add_node("update_memory", update_memory)

g.set_entry_point("initialize_session")
g.add_edge("initialize_session", "load_memory")
g.add_edge("load_memory", "analyze_query_intent")

# Conditional routing (function based)
def route(state: AdvancedState) -> str:
qt = (state.get("query_analysis") or {}).get("query_type", "general_qa")
return qt

g.add_conditional_edges(
"analyze_query_intent",
route,
{
"general_qa": "general_qa",
"short_term_trend": "extract_symbol",
"macro_trend": "extract_symbol",
"deep_research": "deep_research_search",
},
)

# All terminal paths go through memory update
g.add_edge("general_qa", "update_memory")
g.add_edge("analyze_macro_trend", "update_memory")
g.add_edge("analyze_short_term_trend", "update_memory")
g.add_edge("deep_research_synthesize", "update_memory")
g.add_edge("update_memory", END)
return g

See a full working version in the Cookbook example linked below.


Best Practices

  • Keep nodes focused: one responsibility per node
  • Prefer conditional edges for deterministic routing; layer LLM router for flexibility
  • Use parallel groups for I/O-bound branches; choose join strategies wisely
  • Bound state growth: reducers should cap list sizes; keep memory small
  • Monitor execution and inspect compiled.get_execution_metrics()

Next Steps

📚 Hands-on Examples

🎯 Graph Crypto Analysis

GitHub: View Source

What it demonstrates:

  • Complete end-to-end cryptocurrency analysis pipeline
  • LLM-driven decision making from data collection to investment recommendations
  • Real-time technical indicator calculation (RSI, MACD, EMA)
  • Multi-timeframe analysis with parallel data processing
  • Advanced state management and error recovery

Key features:

  • Real Binance API integration (no simulated data)
  • Intelligent token selection based on market conditions
  • Comprehensive market analysis with actionable insights
  • Production-ready error handling and performance optimization

🔧 Comprehensive Graph Demo

GitHub: View Source

What it demonstrates:

  • Intelligent query routing system (general_qa → short_term_trend → macro_trend → deep_research)
  • True parallel execution across multiple timeframes (15m, 30m, 1h, 4h, daily, weekly)
  • Advanced memory management with persistent context
  • LLM-powered routing decisions and summarization
  • Production-style graph architecture

Key features:

  • Dynamic workflow routing based on user intent
  • Concurrent data fetching for optimal performance
  • Memory persistence across graph executions
  • Comprehensive error handling and recovery

🛠️ Integration Guides

📖 Additional Resources

What it demonstrates:

  • Complete end-to-end cryptocurrency analysis pipeline
  • LLM-driven decision making from data collection to investment recommendations
  • Real-time technical indicator calculation (RSI, MACD, EMA)
  • Multi-timeframe analysis with parallel data processing
  • Advanced state management and error recovery

Key features:

  • Real Binance API integration (no simulated data)
  • Intelligent token selection based on market conditions
  • Comprehensive market analysis with actionable insights
  • Production-ready error handling and performance optimization

🔧 Comprehensive Graph Demo

GitHub: View Source

What it demonstrates:

  • Intelligent query routing system (general_qa → short_term_trend → macro_trend → deep_research)
  • True parallel execution across multiple timeframes (15m, 30m, 1h, 4h, daily, weekly)
  • Advanced memory management with persistent context
  • LLM-powered routing decisions and summarization
  • Production-style graph architecture

Key features:

  • Dynamic workflow routing based on user intent
  • Concurrent data fetching for optimal performance
  • Memory persistence across graph executions
  • Comprehensive error handling and recovery

🛠️ Integration Guides

📖 Additional Resources