Advanced Features
Streaming Responses

Streaming LLM Responses

Track streaming completions with proper latency measurement and complete transcript capture.

Basic Streaming Pattern

import { OpenAI } from 'openai'
 
const turn = session.turn()
const startTime = Date.now()
 
try {
  const stream = await turn.wrapLLM(
    async () => {
      return await openai.chat.completions.create({
        model: 'gpt-4o',
        messages: [{ role: 'user', content: userMessage }],
        stream: true,
      })
    },
    { model: 'gpt-4o', prompt_id: 'chat_v1' }
  )
 
  let fullResponse = ''
  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content || ''
    fullResponse += content
    // Display to user in real-time
    displayChunk(content)
  }
 
  // Record complete transcript
  turn.setMessages([
    { role: 'user', content: userMessage },
    { role: 'assistant', content: fullResponse }
  ])
 
  // Add streaming metadata
  turn.annotate({
    streaming: true,
    totalLatencyMs: Date.now() - startTime,
  })
} finally {
  await turn.finish()
}

Tracking First Token Latency

const turn = session.turn()
const startTime = Date.now()
let firstTokenTime: number | null = null
 
try {
  const stream = await turn.wrapLLM(
    async () => {
      return await openai.chat.completions.create({
        model: 'gpt-4o',
        messages: [{ role: 'user', content: userMessage }],
        stream: true,
      })
    },
    { model: 'gpt-4o', prompt_id: 'chat_v1' }
  )
 
  let fullResponse = ''
  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content || ''
 
    // Capture first token latency
    if (content && firstTokenTime === null) {
      firstTokenTime = Date.now()
    }
 
    fullResponse += content
    displayChunk(content)
  }
 
  turn.setMessages([
    { role: 'user', content: userMessage },
    { role: 'assistant', content: fullResponse }
  ])
 
  turn.annotate({
    streaming: true,
    firstTokenLatencyMs: firstTokenTime ? firstTokenTime - startTime : null,
    totalLatencyMs: Date.now() - startTime,
  })
} finally {
  await turn.finish()
}

Streaming with Token Usage

Some providers (like OpenAI) include token usage in the final chunk.

const stream = await turn.wrapLLM(
  async () => {
    return await openai.chat.completions.create({
      model: 'gpt-4o',
      messages: [{ role: 'user', content: userMessage }],
      stream: true,
      stream_options: { include_usage: true },
    })
  },
  { model: 'gpt-4o', prompt_id: 'chat_v1' }
)
 
let fullResponse = ''
let usage = null
 
for await (const chunk of stream) {
  const content = chunk.choices[0]?.delta?.content || ''
  fullResponse += content
 
  // Capture usage from final chunk
  if (chunk.usage) {
    usage = chunk.usage
  }
 
  displayChunk(content)
}
 
turn.setMessages([
  { role: 'user', content: userMessage },
  { role: 'assistant', content: fullResponse }
])
 
if (usage) {
  turn.annotate({
    tokens: {
      prompt: usage.prompt_tokens,
      completion: usage.completion_tokens,
      total: usage.total_tokens,
    }
  })
}
 
await turn.finish()

Error Handling in Streams

const turn = session.turn()
 
try {
  const stream = await turn.wrapLLM(
    async () => {
      return await openai.chat.completions.create({
        model: 'gpt-4o',
        messages: [{ role: 'user', content: userMessage }],
        stream: true,
      })
    },
    { model: 'gpt-4o', prompt_id: 'chat_v1' }
  )
 
  let fullResponse = ''
  try {
    for await (const chunk of stream) {
      const content = chunk.choices[0]?.delta?.content || ''
      fullResponse += content
      displayChunk(content)
    }
  } catch (streamError) {
    // Handle mid-stream errors
    turn.annotate({
      streamError: {
        message: streamError.message,
        partialResponse: fullResponse,
        charactersReceived: fullResponse.length,
      }
    })
    throw streamError
  }
 
  turn.setMessages([
    { role: 'user', content: userMessage },
    { role: 'assistant', content: fullResponse }
  ])
 
} catch (error) {
  turn.annotate({
    error: {
      type: error.constructor.name,
      message: error.message,
    }
  })
  throw error
} finally {
  await turn.finish()
}

Best Practices

Always Capture Complete Transcript

Even with streaming, always set the complete messages after the stream finishes:

// ✅ Good: Set complete transcript
turn.setMessages([
  { role: 'user', content: userMessage },
  { role: 'assistant', content: fullResponse }
])
 
// ❌ Bad: Don't set transcript incrementally
for await (const chunk of stream) {
  turn.setMessages([...]) // Don't do this on every chunk
}

Measure Both Latencies

Track both first token latency (user-perceived speed) and total latency:

turn.annotate({
  firstTokenLatencyMs: firstTokenTime - startTime,
  totalLatencyMs: Date.now() - startTime,
})

Always Call finish()

Use try/finally to ensure finish() is called even if streaming fails:

try {
  // streaming logic
} finally {
  await turn.finish()  // Always executes
}

Next Steps