4.3.2. Streaming and Real-Time AI Systems
💡 First Principle: Streaming reduces perceived latency without reducing actual processing time — the first tokens arrive and display within 200–500ms even if the full response takes 30 seconds to generate. For user-facing applications, streaming is the difference between a responsive and an unresponsive interface.
Bedrock streaming invocation:
# InvokeModelWithResponseStream — HTTP chunked transfer encoding
response = bedrock_runtime.invoke_model_with_response_stream(
modelId='anthropic.claude-3-sonnet-20240229-v1:0',
body=json.dumps({
'anthropic_version': 'bedrock-2023-05-31',
'max_tokens': 2048,
'messages': [{'role': 'user', 'content': user_query}]
})
)
# Process chunks as they arrive
for event in response['body']:
chunk = json.loads(event['chunk']['bytes'])
if chunk['type'] == 'content_block_delta':
token = chunk['delta']['text']
yield token # Stream to client immediately
elif chunk['type'] == 'message_delta':
# Final metadata: stop reason, token counts
stop_reason = chunk['delta']['stop_reason']
Streaming via API Gateway → Lambda → Bedrock:
For web clients, the streaming chain requires specific configuration:
- Lambda Function URLs with
InvokeMode: RESPONSE_STREAM(not standard Lambda response) - OR API Gateway WebSocket API for true bidirectional streaming
- Chunked Transfer Encoding via API Gateway REST API (requires REGIONAL endpoint type)
# Lambda streaming response (Function URL with RESPONSE_STREAM)
import awslambdaric.bootstrap as bootstrap
def lambda_handler(event, context):
# Return a generator that yields chunks
def stream_bedrock():
response = bedrock_runtime.invoke_model_with_response_stream(...)
for event in response['body']:
chunk = json.loads(event['chunk']['bytes'])
if chunk['type'] == 'content_block_delta':
yield chunk['delta']['text'].encode('utf-8')
return {
'statusCode': 200,
'headers': {
'Content-Type': 'text/plain',
'Transfer-Encoding': 'chunked'
},
'body': stream_bedrock()
}
WebSocket for bidirectional real-time interaction: When clients need to send messages while receiving a streaming response (e.g., "stop generating" button, real-time collaborative editing), WebSocket APIs are required:
⚠️ Exam Trap: API Gateway REST API has a 29-second integration timeout — streaming responses that take longer than 29 seconds to complete will be cut off. For long-context streaming, use Lambda Function URLs (15-minute timeout) or API Gateway WebSocket APIs (which have no integration timeout limit for message-based communication).
Reflection Question: You're building a code generation tool where users should see tokens appearing progressively as the FM generates them, with a "stop generating" button. What API Gateway type (REST, HTTP, or WebSocket) is required, and why can you not use a standard REST API with Lambda?