Skip to content

Events

chimera.events provides a lightweight publish/subscribe event bus with middleware support. It is used throughout the framework to broadcast lifecycle events — tool calls, streaming deltas, permission decisions, and more — without coupling the emitter to the consumer.

Every event inherits from Event, which carries three fields:

FieldTypeDescription
typestrDiscriminator string (e.g. "tool_call", "error")
timestampfloatMonotonic timestamp set at creation via time.monotonic
metadatadict[str, Any]Arbitrary extra data

The central hub. Handlers are registered for a specific event type string, or for "*" to receive every event.

MethodDescription
subscribe(event_type, handler)Register a handler; returns an unsubscribe callable
on(event_type)Decorator form of subscribe
publish(event)Dispatch to exact-type handlers and wildcard "*" handlers
use(middleware)Append a Middleware to the processing chain
clear()Remove all handlers and middleware

chimera.events.types defines 32 concrete event dataclasses across five categories: core dispatch, lifecycle, advanced, team coordination, and session persistence.

Classtype fieldKey fields
ToolCallEventtool_calltool_name, arguments, call_id
ToolResultEventtool_resultcall_id, output, success, tool_metadata
StepEventstepstep_number, content
TextDeltaEventtext_deltacontent
ErrorEventerrorerror, recoverable
LoopDetectedEventloop_detectedpattern
CompactionEventcompactionmessages_before, messages_after
PermissionEventpermissiontool_name, action, granted, call_id
SessionEventsessionaction, session_id

Cover the full agent lifecycle, including per-request telemetry, turn tracking, streaming, and cancellation.

Classtype fieldKey fields
ModelRequestEventmodel_requestmodel, message_count, tool_count
ModelResponseEventmodel_responsemodel, content_length, tool_calls_count, input_tokens, output_tokens
TurnStartEventturn_startturn_number
TurnEndEventturn_endturn_number, tool_calls_count
StreamStartEventstream_startmodel
StreamEndEventstream_endtotal_tokens
AgentStartEventagent_startmax_steps
AgentEndEventagent_endsteps, success, total_cost
SteeringEventsteeringcontent
CancellationEventcancellationat_step
Classtype fieldKey fields
CriticEventcriticscore, passed, feedback, iteration
SecurityEventsecuritytool_name, arguments, risk, action
StepCostEventstep_coststep_index, cost, input_tokens, output_tokens, reasoning_tokens, cache_hit_rate, duration
ExternalAgentStartEventexternal_agent_startagent_name, task
ExternalAgentCompleteEventexternal_agent_completeagent_name, response_text, cost, tool_calls_count
ExternalAgentToolCallEventexternal_agent_tool_callagent_name, tool_call_id, title, status

Emitted by the multi-agent task and messaging layer.

Classtype fieldKey fields
TeammateIdleEventteammate_idleteam, agent_id
TaskCreatedEventtask_createdteam, task_id, description, created_by
TaskCompletedEventtask_completedteam, task_id, agent_id, result
TeammateMessageEventteammate_messageteam, sender, recipient, content

Used by event-sourced session storage and hook integrations.

Classtype fieldKey fields
TodoWriteEventtodo_writetodos, op ("add"/"complete"/"set"/"remove"), session_id
HookUpdatedInputEventhook_updated_inputtool_name, call_id, original, updated

Subscribe to any of these using their type string or the "*" wildcard:

from chimera.events import EventBus
from chimera.events.types import AgentEndEvent
bus = EventBus()
@bus.on("agent_end")
def on_done(event: AgentEndEvent):
print(f"Finished in {event.steps} steps, cost ${event.total_cost:.4f}")
@bus.on("cancellation")
def on_cancel(event):
print(f"Cancelled at step {event.at_step}")

Middleware wraps the dispatch pipeline. Each middleware implements a single method:

class Middleware(ABC):
@abstractmethod
def process(self, event: Event, next_handler: Callable[[Event], None]) -> None: ...

Two built-in implementations are provided:

  • LoggingMiddleware — logs every event’s type and timestamp at DEBUG level before forwarding.
  • FilterMiddleware — only forwards events whose type is in the given allow_types set.

When multiple middleware are registered via use(), they are chained outermost-first: the last middleware added wraps around all earlier ones.

from chimera.events import EventBus, ToolCallEvent
bus = EventBus()
# Function-based subscription
def on_tool(event: ToolCallEvent):
print(f"Tool called: {event.tool_name}")
unsub = bus.subscribe("tool_call", on_tool)
# Decorator-based subscription
@bus.on("error")
def on_error(event):
print(f"Error: {event.error}")
bus.publish(ToolCallEvent(tool_name="bash", arguments={"command": "ls"}))
from chimera.events import LoggingMiddleware, FilterMiddleware
bus.use(LoggingMiddleware())
bus.use(FilterMiddleware(allow_types={"tool_call", "error"}))
@bus.on("*")
def catch_all(event):
print(f"[{event.type}] at {event.timestamp}")
unsub = bus.subscribe("step", my_handler)
# Later...
unsub() # my_handler will no longer be called