Base Tool API Reference
The BaseTool
class is the foundation for all tools in SpoonOS, providing a standardized interface for tool development and execution.
Class Definition​
from spoon_ai.tools.base import BaseTool
from typing import Any, Dict, Optional
class BaseTool:
name: str
description: str
def __init__(self, **kwargs):
pass
async def execute(self, **kwargs) -> Any:
raise NotImplementedError
Required Attributes​
name: str
​
Unique identifier for the tool. Must be a valid Python identifier.
description: str
​
Human-readable description of what the tool does. Used by agents to understand tool capabilities.
Methods​
Core Methods​
async execute(**kwargs) -> Any
​
Execute the tool with provided parameters.
Parameters:
**kwargs
: Tool-specific parameters
Returns:
Any
: Tool execution result
Raises:
ToolError
: When tool execution failsValidationError
: When parameters are invalid
Example:
class CalculatorTool(BaseTool):
name = "calculator"
description = "Perform basic arithmetic operations"
async def execute(self, operation: str, a: float, b: float) -> float:
if operation == "add":
return a + b
elif operation == "subtract":
return a - b
elif operation == "multiply":
return a * b
elif operation == "divide":
if b == 0:
raise ToolError("Division by zero")
return a / b
else:
raise ValidationError(f"Unknown operation: {operation}")
Validation Methods​
validate_parameters(self, **kwargs) -> Dict[str, Any]
​
Validate and normalize input parameters.
Parameters:
**kwargs
: Raw input parameters
Returns:
Dict[str, Any]
: Validated and normalized parameters
Example:
class WebSearchTool(BaseTool):
name = "web_search"
description = "Search the web for information"
def validate_parameters(self, **kwargs) -> Dict[str, Any]:
query = kwargs.get("query")
if not query or not isinstance(query, str):
raise ValidationError("Query must be a non-empty string")
limit = kwargs.get("limit", 10)
if not isinstance(limit, int) or limit < 1 or limit > 100:
raise ValidationError("Limit must be an integer between 1 and 100")
return {
"query": query.strip(),
"limit": limit
}
async def execute(self, **kwargs) -> Dict[str, Any]:
params = self.validate_parameters(**kwargs)
# Perform web search with validated parameters
return await self._search(params["query"], params["limit"])
Configuration Methods​
configure(self, config: Dict[str, Any]) -> None
​
Configure the tool with runtime settings.
Parameters:
config
: Configuration dictionary
Example:
class APITool(BaseTool):
name = "api_tool"
description = "Make API requests"
def __init__(self):
self.api_key = None
self.base_url = None
self.timeout = 30
def configure(self, config: Dict[str, Any]) -> None:
self.api_key = config.get("api_key")
self.base_url = config.get("base_url")
self.timeout = config.get("timeout", 30)
if not self.api_key:
raise ConfigurationError("API key is required")
Tool Metadata​
Schema Definition​
class WeatherTool(BaseTool):
name = "get_weather"
description = "Get current weather for a location"
# Define parameter schema
schema = {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City name or coordinates"
},
"units": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"default": "celsius",
"description": "Temperature units"
}
},
"required": ["location"]
}
async def execute(self, location: str, units: str = "celsius") -> Dict[str, Any]:
# Implementation here
pass
Categories and Tags​
class CryptoTool(BaseTool):
name = "get_crypto_price"
description = "Get cryptocurrency price data"
category = "crypto"
tags = ["price", "market", "cryptocurrency"]
version = "1.0.0"
async def execute(self, symbol: str) -> Dict[str, Any]:
# Implementation here
pass
Error Handling​
Exception Types​
from spoon_ai.tools.errors import (
ToolError,
ValidationError,
ConfigurationError,
ExecutionError
)
class RobustTool(BaseTool):
name = "robust_tool"
description = "Tool with comprehensive error handling"
async def execute(self, **kwargs) -> Any:
try:
# Validate parameters
params = self.validate_parameters(**kwargs)
# Execute tool logic
result = await self._perform_operation(params)
return result
except ValidationError:
# Re-raise validation errors
raise
except ConfigurationError:
# Re-raise configuration errors
raise
except Exception as e:
# Wrap unexpected errors
raise ExecutionError(f"Tool execution failed: {str(e)}") from e
def validate_parameters(self, **kwargs) -> Dict[str, Any]:
# Parameter validation logic
pass
async def _perform_operation(self, params: Dict[str, Any]) -> Any:
# Core tool logic
pass
Async Patterns​
HTTP Requests​
import aiohttp
from typing import Dict, Any
class HTTPTool(BaseTool):
name = "http_request"
description = "Make HTTP requests"
def __init__(self):
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def execute(self, url: str, method: str = "GET", **kwargs) -> Dict[str, Any]:
if not self.session:
self.session = aiohttp.ClientSession()
try:
async with self.session.request(method, url, **kwargs) as response:
return {
"status": response.status,
"data": await response.text(),
"headers": dict(response.headers)
}
except Exception as e:
raise ExecutionError(f"HTTP request failed: {str(e)}")
Database Operations​
import asyncpg
from typing import List, Dict, Any
class DatabaseTool(BaseTool):
name = "database_query"
description = "Execute database queries"
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.pool = None
async def initialize(self):
"""Initialize database connection pool"""
self.pool = await asyncpg.create_pool(self.connection_string)
async def execute(self, query: str, params: List[Any] = None) -> List[Dict[str, Any]]:
if not self.pool:
await self.initialize()
try:
async with self.pool.acquire() as connection:
rows = await connection.fetch(query, *(params or []))
return [dict(row) for row in rows]
except Exception as e:
raise ExecutionError(f"Database query failed: {str(e)}")
async def cleanup(self):
"""Clean up database connections"""
if self.pool:
await self.pool.close()
Testing Tools​
Unit Testing​
import pytest
from unittest.mock import AsyncMock, patch
from my_tool import CalculatorTool
@pytest.fixture
def calculator_tool():
return CalculatorTool()
@pytest.mark.asyncio
async def test_addition(calculator_tool):
result = await calculator_tool.execute(operation="add", a=5, b=3)
assert result == 8
@pytest.mark.asyncio
async def test_division_by_zero(calculator_tool):
with pytest.raises(ToolError, match="Division by zero"):
await calculator_tool.execute(operation="divide", a=5, b=0)
@pytest.mark.asyncio
async def test_invalid_operation(calculator_tool):
with pytest.raises(ValidationError, match="Unknown operation"):
await calculator_tool.execute(operation="invalid", a=5, b=3)
Integration Testing​
import pytest
from spoon_ai.tools import ToolManager
from my_tool import WebSearchTool
@pytest.mark.asyncio
async def test_tool_integration():
# Test tool registration
tool_manager = ToolManager()
web_tool = WebSearchTool()
tool_manager.register_tool(web_tool)
assert "web_search" in tool_manager.get_tool_names()
# Test tool execution through manager
result = await tool_manager.execute_tool(
"web_search",
query="SpoonOS documentation",
limit=5
)
assert isinstance(result, dict)
assert "results" in result
Performance Optimization​
Caching​
from functools import lru_cache
import asyncio
from typing import Dict, Any
class CachedTool(BaseTool):
name = "cached_tool"
description = "Tool with caching support"
def __init__(self):
self._cache = {}
self._cache_ttl = 300 # 5 minutes
async def execute(self, **kwargs) -> Any:
# Create cache key
cache_key = self._create_cache_key(**kwargs)
# Check cache
cached_result = self._get_from_cache(cache_key)
if cached_result is not None:
return cached_result
# Execute and cache result
result = await self._perform_operation(**kwargs)
self._set_cache(cache_key, result)
return result
def _create_cache_key(self, **kwargs) -> str:
"""Create cache key from parameters"""
import hashlib
import json
key_data = json.dumps(kwargs, sort_keys=True)
return hashlib.md5(key_data.encode()).hexdigest()
def _get_from_cache(self, key: str) -> Any:
"""Get value from cache if not expired"""
import time
if key in self._cache:
value, timestamp = self._cache[key]
if time.time() - timestamp < self._cache_ttl:
return value
else:
del self._cache[key]
return None
def _set_cache(self, key: str, value: Any) -> None:
"""Set value in cache with timestamp"""
import time
self._cache[key] = (value, time.time())
Connection Pooling​
class PooledTool(BaseTool):
name = "pooled_tool"
description = "Tool with connection pooling"
_connection_pool = None
_pool_size = 10
@classmethod
async def get_connection_pool(cls):
"""Get or create connection pool"""
if cls._connection_pool is None:
cls._connection_pool = await create_connection_pool(
size=cls._pool_size
)
return cls._connection_pool
async def execute(self, **kwargs) -> Any:
pool = await self.get_connection_pool()
async with pool.acquire() as connection:
# Use connection for tool operation
result = await self._perform_operation(connection, **kwargs)
return result
Best Practices​
Tool Design​
- Keep tools focused on a single responsibility
- Use descriptive names and clear descriptions
- Implement proper parameter validation
- Handle errors gracefully
- Document expected inputs and outputs
Performance​
- Use async/await for I/O operations
- Implement caching for expensive operations
- Use connection pooling for database/API tools
- Set appropriate timeouts
- Monitor resource usage
Security​
- Validate all input parameters
- Sanitize user inputs
- Use secure communication protocols
- Implement proper authentication
- Log security-relevant events
Testing​
- Write comprehensive unit tests
- Test error conditions
- Use mocking for external dependencies
- Perform integration testing
- Test with realistic data