Skip to main content

Table of Contents

Module spoon_ai.runnables.base

log_patches_from_events​

async def log_patches_from_events(
event_iter: AsyncIterator[Dict[str, Any]],
*,
diff: bool = True) -> AsyncIterator[RunLogPatch]

Convert a stream of events into run log patches.

Runnable Objects​

class Runnable(ABC, Generic[Input, Output])

astream_log​

async def astream_log(input: Input,
config: Optional[RunnableConfig] = None,
*,
diff: bool = True) -> AsyncIterator[RunLogPatch]

Asynchronously stream structured log patches derived from execution events.

astream_events​

async def astream_events(
input: Input,
config: Optional[RunnableConfig] = None
) -> AsyncIterator[Dict[str, Any]]

Asynchronously stream structured execution events.