Table of Contents
Module spoon_ai.runnables.base
log_patches_from_events​
async def log_patches_from_events(
event_iter: AsyncIterator[Dict[str, Any]],
*,
diff: bool = True) -> AsyncIterator[RunLogPatch]
Convert a stream of events into run log patches.
Runnable Objects​
class Runnable(ABC, Generic[Input, Output])
astream_log​
async def astream_log(input: Input,
config: Optional[RunnableConfig] = None,
*,
diff: bool = True) -> AsyncIterator[RunLogPatch]
Asynchronously stream structured log patches derived from execution events.
astream_events​
async def astream_events(
input: Input,
config: Optional[RunnableConfig] = None
) -> AsyncIterator[Dict[str, Any]]
Asynchronously stream structured execution events.