Skip to content

Message Queues

chimera.core.message_queue provides two thread-safe queues for injecting messages into a running agent loop from any external thread — webhooks, a REPL, or another async task.

A single FIFO queue for generic message injection:

Method / PropertyDescription
enqueue(message)Add a Message to the queue
enqueue_text(text)Convenience: wrap text in a user Message and enqueue
drain()Remove and return all queued messages
peek()View queued messages without removing them
is_empty()True if the queue has no messages
sizeNumber of queued messages
clear()Discard all queued messages

MessageQueues holds two separate queues with different semantics:

QueueDrained whenPurpose
steeringBefore each model call inside the current turnRedirect the agent mid-turn
follow_upAfter the current turn completesQueue the next user prompt
Method / PropertyDescription
steer(message)Enqueue a steering message
follow_up(message)Enqueue a follow-up message
drain_steering()Remove and return all steering messages
drain_follow_up()Remove and return all follow-up messages
has_steeringTrue if steering messages are pending
has_follow_upTrue if follow-up messages are pending

All operations on both queues are protected by a single threading.Lock.

import threading
from chimera.core.message_queue import MessageQueues
from chimera.types import Message
queues = MessageQueues()
# In another thread (e.g. a webhook handler):
def inject_hint():
queues.steer(Message.user("Focus only on the login flow."))
threading.Timer(2.0, inject_hint).start()
# The loop drains steering messages before each model call:
for msg in queues.drain_steering():
context.add(msg)