Streaming LLM Responses
Track streaming completions with proper latency measurement and complete transcript capture.
Basic Streaming Pattern
import { OpenAI } from 'openai'
const turn = session.turn()
const startTime = Date.now()
try {
const stream = await turn.wrapLLM(
async () => {
return await openai.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: userMessage }],
stream: true,
})
},
{ model: 'gpt-4o', prompt_id: 'chat_v1' }
)
let fullResponse = ''
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || ''
fullResponse += content
// Display to user in real-time
displayChunk(content)
}
// Record complete transcript
turn.setMessages([
{ role: 'user', content: userMessage },
{ role: 'assistant', content: fullResponse }
])
// Add streaming metadata
turn.annotate({
streaming: true,
totalLatencyMs: Date.now() - startTime,
})
} finally {
await turn.finish()
}Tracking First Token Latency
const turn = session.turn()
const startTime = Date.now()
let firstTokenTime: number | null = null
try {
const stream = await turn.wrapLLM(
async () => {
return await openai.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: userMessage }],
stream: true,
})
},
{ model: 'gpt-4o', prompt_id: 'chat_v1' }
)
let fullResponse = ''
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || ''
// Capture first token latency
if (content && firstTokenTime === null) {
firstTokenTime = Date.now()
}
fullResponse += content
displayChunk(content)
}
turn.setMessages([
{ role: 'user', content: userMessage },
{ role: 'assistant', content: fullResponse }
])
turn.annotate({
streaming: true,
firstTokenLatencyMs: firstTokenTime ? firstTokenTime - startTime : null,
totalLatencyMs: Date.now() - startTime,
})
} finally {
await turn.finish()
}Streaming with Token Usage
Some providers (like OpenAI) include token usage in the final chunk.
const stream = await turn.wrapLLM(
async () => {
return await openai.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: userMessage }],
stream: true,
stream_options: { include_usage: true },
})
},
{ model: 'gpt-4o', prompt_id: 'chat_v1' }
)
let fullResponse = ''
let usage = null
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || ''
fullResponse += content
// Capture usage from final chunk
if (chunk.usage) {
usage = chunk.usage
}
displayChunk(content)
}
turn.setMessages([
{ role: 'user', content: userMessage },
{ role: 'assistant', content: fullResponse }
])
if (usage) {
turn.annotate({
tokens: {
prompt: usage.prompt_tokens,
completion: usage.completion_tokens,
total: usage.total_tokens,
}
})
}
await turn.finish()Error Handling in Streams
const turn = session.turn()
try {
const stream = await turn.wrapLLM(
async () => {
return await openai.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: userMessage }],
stream: true,
})
},
{ model: 'gpt-4o', prompt_id: 'chat_v1' }
)
let fullResponse = ''
try {
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || ''
fullResponse += content
displayChunk(content)
}
} catch (streamError) {
// Handle mid-stream errors
turn.annotate({
streamError: {
message: streamError.message,
partialResponse: fullResponse,
charactersReceived: fullResponse.length,
}
})
throw streamError
}
turn.setMessages([
{ role: 'user', content: userMessage },
{ role: 'assistant', content: fullResponse }
])
} catch (error) {
turn.annotate({
error: {
type: error.constructor.name,
message: error.message,
}
})
throw error
} finally {
await turn.finish()
}Best Practices
Always Capture Complete Transcript
Even with streaming, always set the complete messages after the stream finishes:
// ✅ Good: Set complete transcript
turn.setMessages([
{ role: 'user', content: userMessage },
{ role: 'assistant', content: fullResponse }
])
// ❌ Bad: Don't set transcript incrementally
for await (const chunk of stream) {
turn.setMessages([...]) // Don't do this on every chunk
}Measure Both Latencies
Track both first token latency (user-perceived speed) and total latency:
turn.annotate({
firstTokenLatencyMs: firstTokenTime - startTime,
totalLatencyMs: Date.now() - startTime,
})Always Call finish()
Use try/finally to ensure finish() is called even if streaming fails:
try {
// streaming logic
} finally {
await turn.finish() // Always executes
}Next Steps
- Error Handling - Robust error handling patterns
- Best Practices - General SDK best practices