Events
Bidirectional streaming events enable real-time monitoring and processing of audio, text, and tool execution during persistent conversations. Unlike standard streaming which uses async iterators or callbacks, bidirectional streaming uses send() and receive() methods for explicit control over the conversation flow.
Event Model
Section titled “Event Model”Bidirectional streaming uses a different event model than standard streaming:
Standard Streaming:
- Uses
stream_async()or callback handlers - Request-response pattern (one invocation per call)
- Events flow in one direction (model → application)
Bidirectional Streaming:
- Uses
send()andreceive()methods - Persistent connection (multiple turns per connection)
- Events flow in both directions (application ↔ model)
- Supports real-time audio and interruptions
import asynciofrom strands.experimental.bidi import BidiAgentfrom strands.experimental.bidi.models import BidiNovaSonicModel
async def main(): model = BidiNovaSonicModel()
async with BidiAgent(model=model) as agent: # Send input to model await agent.send("What is 2+2?")
# Receive events from model async for event in agent.receive(): print(f"Event: {event['type']}")
asyncio.run(main())Input Event Types
Section titled “Input Event Types”Events sent to the model via agent.send().
BidiTextInputEvent
Section titled “BidiTextInputEvent”Send text input to the model.
await agent.send("What is the weather?")# Or explicitly:from strands.experimental.bidi.types.events import BidiTextInputEventawait agent.send(BidiTextInputEvent(text="What is the weather?", role="user"))BidiAudioInputEvent
Section titled “BidiAudioInputEvent”Send audio input to the model. Audio must be base64-encoded.
import base64from strands.experimental.bidi.types.events import BidiAudioInputEvent
audio_bytes = record_audio() # Your audio capture logicaudio_base64 = base64.b64encode(audio_bytes).decode('utf-8')
await agent.send(BidiAudioInputEvent( audio=audio_base64, format="pcm", sample_rate=16000, channels=1))BidiImageInputEvent
Section titled “BidiImageInputEvent”Send image input to the model. Images must be base64-encoded.
import base64from strands.experimental.bidi.types.events import BidiImageInputEvent
with open("image.jpg", "rb") as f: image_bytes = f.read() image_base64 = base64.b64encode(image_bytes).decode('utf-8')
await agent.send(BidiImageInputEvent( image=image_base64, mime_type="image/jpeg"))Output Event Types
Section titled “Output Event Types”Events received from the model via agent.receive().
Connection Lifecycle Events
Section titled “Connection Lifecycle Events”Events that track the connection state throughout the conversation.
BidiConnectionStartEvent
Section titled “BidiConnectionStartEvent”Emitted when the streaming connection is established and ready for interaction.
{ "type": "bidi_connection_start", "connection_id": "conn_abc123", "model": "amazon.nova-sonic-v1:0"}Properties:
connection_id: Unique identifier for this streaming connectionmodel: Model identifier (e.g., “amazon.nova-sonic-v1:0”, “gemini-2.0-flash-live”)
BidiConnectionRestartEvent
Section titled “BidiConnectionRestartEvent”Emitted when the agent is restarting the model connection after a timeout. The conversation history is preserved and the connection resumes automatically.
{ "type": "bidi_connection_restart", "timeout_error": BidiModelTimeoutError(...)}Properties:
timeout_error: The timeout error that triggered the restart
Usage:
async for event in agent.receive(): if event["type"] == "bidi_connection_restart": print("Connection restarting, please wait...") # Connection resumes automatically with full historySee Connection Lifecycle for more details on timeout handling.
BidiConnectionCloseEvent
Section titled “BidiConnectionCloseEvent”Emitted when the streaming connection is closed.
{ "type": "bidi_connection_close", "connection_id": "conn_abc123", "reason": "user_request"}Properties:
connection_id: Unique identifier for this streaming connectionreason: Why the connection closed"client_disconnect": Client disconnected"timeout": Connection timed out"error": Error occurred"complete": Conversation completed normally"user_request": User requested closure (viastop_conversationtool)
Response Lifecycle Events
Section titled “Response Lifecycle Events”Events that track individual model responses within the conversation.
BidiResponseStartEvent
Section titled “BidiResponseStartEvent”Emitted when the model begins generating a response.
{ "type": "bidi_response_start", "response_id": "resp_xyz789"}Properties:
response_id: Unique identifier for this response (matchesBidiResponseCompleteEvent)
BidiResponseCompleteEvent
Section titled “BidiResponseCompleteEvent”Emitted when the model finishes generating a response.
{ "type": "bidi_response_complete", "response_id": "resp_xyz789", "stop_reason": "complete"}Properties:
response_id: Unique identifier for this responsestop_reason: Why the response ended"complete": Model completed its response"interrupted": User interrupted the response"tool_use": Model is requesting tool execution"error": Error occurred during generation
Audio Events
Section titled “Audio Events”Events for streaming audio input and output.
BidiAudioStreamEvent
Section titled “BidiAudioStreamEvent”Emitted when the model generates audio output. Audio is base64-encoded for JSON compatibility.
{ "type": "bidi_audio_stream", "audio": "base64_encoded_audio_data...", "format": "pcm", "sample_rate": 16000, "channels": 1}Properties:
audio: Base64-encoded audio stringformat: Audio encoding format ("pcm","wav","opus","mp3")sample_rate: Sample rate in Hz (16000,24000,48000)channels: Number of audio channels (1= mono,2= stereo)
Usage:
import base64
async for event in agent.receive(): if event["type"] == "bidi_audio_stream": # Decode and play audio audio_bytes = base64.b64decode(event["audio"]) play_audio(audio_bytes, sample_rate=event["sample_rate"])Transcript Events
Section titled “Transcript Events”Events for speech-to-text transcription of both user and assistant speech.
BidiTranscriptStreamEvent
Section titled “BidiTranscriptStreamEvent”Emitted when speech is transcribed. Supports incremental updates for providers that send partial transcripts.
{ "type": "bidi_transcript_stream", "delta": {"text": "Hello"}, "text": "Hello", "role": "assistant", "is_final": True, "current_transcript": "Hello world"}Properties:
delta: The incremental transcript changetext: The delta text (same as delta content)role: Who is speaking ("user"or"assistant")is_final: Whether this is the final/complete transcriptcurrent_transcript: The accumulated transcript text so far (None for first delta)
Usage:
async for event in agent.receive(): if event["type"] == "bidi_transcript_stream": role = event["role"] text = event["text"] is_final = event["is_final"]
if is_final: print(f"{role}: {text}") else: print(f"{role} (preview): {text}")Interruption Events
Section titled “Interruption Events”Events for handling user interruptions during model responses.
BidiInterruptionEvent
Section titled “BidiInterruptionEvent”Emitted when the model’s response is interrupted, typically by user speech detected via voice activity detection.
{ "type": "bidi_interruption", "reason": "user_speech"}Properties:
reason: Why the interruption occurred"user_speech": User started speaking (most common)"error": Error caused interruption
Usage:
async for event in agent.receive(): if event["type"] == "bidi_interruption": print(f"Interrupted by {event['reason']}") # Audio output automatically cleared # Model ready for new inputTool Events
Section titled “Tool Events”Events for tool execution during conversations. Bidirectional streaming reuses the standard ToolUseStreamEvent from Strands.
ToolUseStreamEvent
Section titled “ToolUseStreamEvent”Emitted when the model requests tool execution. See Tools Overview for details.
{ "type": "tool_use_stream", "current_tool_use": { "toolUseId": "tool_123", "name": "calculator", "input": {"expression": "2+2"} }}Properties:
current_tool_use: Information about the tool being usedtoolUseId: Unique ID for this tool usename: Name of the toolinput: Tool input parameters
Tools execute automatically in the background and results are sent back to the model without blocking the conversation.
Usage Events
Section titled “Usage Events”Events for tracking token consumption across different modalities.
BidiUsageEvent
Section titled “BidiUsageEvent”Emitted periodically to report token usage with modality breakdown.
{ "type": "bidi_usage", "inputTokens": 150, "outputTokens": 75, "totalTokens": 225, "modality_details": [ {"modality": "text", "input_tokens": 100, "output_tokens": 50}, {"modality": "audio", "input_tokens": 50, "output_tokens": 25} ]}Properties:
inputTokens: Total tokens used for all input modalitiesoutputTokens: Total tokens used for all output modalitiestotalTokens: Sum of input and output tokensmodality_details: Optional list of token usage per modalitycacheReadInputTokens: Optional tokens read from cachecacheWriteInputTokens: Optional tokens written to cache
Error Events
Section titled “Error Events”Events for error handling during conversations.
BidiErrorEvent
Section titled “BidiErrorEvent”Emitted when an error occurs during the session.
{ "type": "bidi_error", "message": "Connection failed", "code": "ConnectionError", "details": {"retry_after": 5}}Properties:
message: Human-readable error messagecode: Error code (exception class name)details: Optional additional error contexterror: The original exception (accessible via property, not in JSON)
Usage:
async for event in agent.receive(): if event["type"] == "bidi_error": print(f"Error: {event['message']}") # Access original exception if needed if hasattr(event, 'error'): raise event.errorEvent Flow Examples
Section titled “Event Flow Examples”Basic Audio Conversation
Section titled “Basic Audio Conversation”import asynciofrom strands.experimental.bidi import BidiAgent, BidiAudioIOfrom strands.experimental.bidi.models import BidiNovaSonicModel
async def main(): model = BidiNovaSonicModel() agent = BidiAgent(model=model) audio_io = BidiAudioIO()
await agent.start()
# Process events from audio conversation async for event in agent.receive(): if event["type"] == "bidi_connection_start": print(f"🔗 Connected to {event['model']}")
elif event["type"] == "bidi_response_start": print(f"▶️ Response starting: {event['response_id']}")
elif event["type"] == "bidi_audio_stream": print(f"🔊 Audio chunk: {len(event['audio'])} bytes")
elif event["type"] == "bidi_transcript_stream": if event["is_final"]: print(f"{event['role']}: {event['text']}")
elif event["type"] == "bidi_response_complete": print(f"✅ Response complete: {event['stop_reason']}")
await agent.stop()
asyncio.run(main())Tracking Transcript State
Section titled “Tracking Transcript State”import asynciofrom strands.experimental.bidi import BidiAgentfrom strands.experimental.bidi.models import BidiNovaSonicModel
async def main(): model = BidiNovaSonicModel()
async with BidiAgent(model=model) as agent: await agent.send("Tell me about Python")
# Track incremental transcript updates current_speaker = None current_text = ""
async for event in agent.receive(): if event["type"] == "bidi_transcript_stream": role = event["role"]
if role != current_speaker: if current_text: print(f"\n{current_speaker}: {current_text}") current_speaker = role current_text = ""
current_text = event.get("current_transcript", event["text"])
if event["is_final"]: print(f"\n{role}: {current_text}") current_text = ""
asyncio.run(main())Tool Execution During Conversation
Section titled “Tool Execution During Conversation”import asynciofrom strands.experimental.bidi import BidiAgentfrom strands.experimental.bidi.models import BidiNovaSonicModelfrom strands_tools import calculator
async def main(): model = BidiNovaSonicModel() agent = BidiAgent(model=model, tools=[calculator])
async with agent as agent: await agent.send("What is 25 times 48?")
async for event in agent.receive(): event_type = event["type"]
if event_type == "bidi_transcript_stream" and event["is_final"]: print(f"{event['role']}: {event['text']}")
elif event_type == "tool_use_stream": tool_use = event["current_tool_use"] print(f"🔧 Using tool: {tool_use['name']}") print(f" Input: {tool_use['input']}")
elif event_type == "bidi_response_complete": if event["stop_reason"] == "tool_use": print(" Tool executing in background...")
asyncio.run(main())Handling Interruptions
Section titled “Handling Interruptions”import asynciofrom strands.experimental.bidi import BidiAgentfrom strands.experimental.bidi.models import BidiNovaSonicModel
async def main(): model = BidiNovaSonicModel()
async with BidiAgent(model=model) as agent: await agent.send("Tell me a long story about space exploration")
interruption_count = 0
async for event in agent.receive(): if event["type"] == "bidi_transcript_stream" and event["is_final"]: print(f"{event['role']}: {event['text']}")
elif event["type"] == "bidi_interruption": interruption_count += 1 print(f"\n⚠️ Interrupted (#{interruption_count})")
elif event["type"] == "bidi_response_complete": if event["stop_reason"] == "interrupted": print(f"Response interrupted {interruption_count} times")
asyncio.run(main())Connection Restart Handling
Section titled “Connection Restart Handling”import asynciofrom strands.experimental.bidi import BidiAgentfrom strands.experimental.bidi.models import BidiNovaSonicModel
async def main(): model = BidiNovaSonicModel() # 8-minute timeout
async with BidiAgent(model=model) as agent: # Continuous conversation that handles restarts async for event in agent.receive(): if event["type"] == "bidi_connection_restart": print("⚠️ Connection restarting (timeout)...") print(" Conversation history preserved") # Connection resumes automatically
elif event["type"] == "bidi_connection_start": print(f"✅ Connected to {event['model']}")
elif event["type"] == "bidi_transcript_stream" and event["is_final"]: print(f"{event['role']}: {event['text']}")
asyncio.run(main())Hook Events
Section titled “Hook Events”Hook events are a separate concept from streaming events. While streaming events flow through agent.receive() during conversations, hook events are callbacks that trigger at specific lifecycle points (like initialization, message added, or interruption). Hook events allow you to inject custom logic for cross-cutting concerns like logging, analytics, and session persistence without processing the event stream directly.
For details on hook events and usage patterns, see the Hooks documentation.