Overview
Streaming allows you to receive partial responses from the Perplexity API as they are generated, rather than waiting for the complete response. This is particularly useful for:
- Real-time user experiences - Display responses as they’re generated
- Long responses - Start showing content immediately for lengthy analyses
- Interactive applications - Provide immediate feedback to users
Streaming is supported across all Perplexity models including Sonar, Sonar Pro, and reasoning models.
Quick Start
The easiest way to get started is with the Perplexity SDKs, which handle all the streaming parsing automatically.
To enable streaming, set stream=True
(Python) or stream: true
(TypeScript) when creating completions:
Python SDK
TypeScript SDK
cURL
from perplexity import Perplexity
# Initialize the client (uses PERPLEXITY_API_KEY environment variable)
client = Perplexity()
# Create streaming completion
stream = client.chat.completions.create(
model="sonar",
messages=[{"role": "user", "content": "What is the latest in AI research?"}],
stream=True
)
# Process streaming response
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
Search results and metadata are delivered in the final chunk(s) of a streaming response, not progressively during the stream.
When streaming, you receive:
- Content chunks which arrive progressively in real-time
- Search results (delivered in the final chunk(s))
- Usage stats and other metadata
Python SDK
TypeScript SDK
Raw HTTP
from perplexity import Perplexity
def stream_with_metadata():
client = Perplexity()
stream = client.chat.completions.create(
model="sonar",
messages=[{"role": "user", "content": "Explain quantum computing"}],
stream=True
)
content = ""
search_results = []
usage_info = None
for chunk in stream:
# Process content
if chunk.choices[0].delta.content:
content_piece = chunk.choices[0].delta.content
content += content_piece
print(content_piece, end='', flush=True)
# Collect metadata from final chunks
if hasattr(chunk, 'search_results') and chunk.search_results:
search_results = chunk.search_results
if hasattr(chunk, 'usage') and chunk.usage:
usage_info = chunk.usage
# Check if streaming is complete
if chunk.choices[0].finish_reason:
print(f"\n\nSearch Results: {search_results}")
print(f"Usage: {usage_info}")
return content, search_results, usage_info
stream_with_metadata()
Error Handling
Proper error handling is important to ensure your application can recover from errors and provide a good user experience.
Python SDK
TypeScript SDK
Raw HTTP
import perplexity
from perplexity import Perplexity
client = Perplexity()
try:
stream = client.chat.completions.create(
model="sonar-pro",
messages=[
{"role": "user", "content": "Explain machine learning concepts"}
],
stream=True
)
content = ""
for chunk in stream:
if chunk.choices[0].delta.content:
content_chunk = chunk.choices[0].delta.content
content += content_chunk
print(content_chunk, end="")
except perplexity.APIConnectionError as e:
print(f"Network connection failed: {e}")
except perplexity.RateLimitError as e:
print(f"Rate limit exceeded, please retry later: {e}")
except perplexity.APIStatusError as e:
print(f"API error {e.status_code}: {e.response}")
except Exception as e:
print(f"Unexpected error: {e}")
Proper SSE Parsing
For production use, you should properly parse Server-Sent Events (SSE) format:
# pip install sseclient-py
import sseclient
import requests
import json
def stream_with_proper_sse():
url = "https://api.perplexity.ai/chat/completions"
headers = {
"Authorization": "Bearer YOUR_API_KEY",
"Content-Type": "application/json"
}
payload = {
"model": "sonar",
"messages": [{"role": "user", "content": "Explain quantum computing"}],
"stream": True
}
response = requests.post(url, headers=headers, json=payload, stream=True)
client = sseclient.SSEClient(response)
for event in client.events():
if event.data == '[DONE]':
break
try:
chunk_data = json.loads(event.data)
content = chunk_data['choices'][0]['delta'].get('content', '')
if content:
print(content, end='')
except json.JSONDecodeError:
continue
stream_with_proper_sse()
Advanced Streaming Patterns
Buffered Streaming
For applications that need to process chunks in batches:
Python SDK
TypeScript SDK
from perplexity import Perplexity
import time
def buffered_streaming(buffer_size=50, flush_interval=1.0):
client = Perplexity()
stream = client.chat.completions.create(
model="sonar",
messages=[{"role": "user", "content": "Write a detailed explanation of machine learning"}],
stream=True
)
buffer = ""
last_flush = time.time()
for chunk in stream:
if chunk.choices[0].delta.content:
buffer += chunk.choices[0].delta.content
# Flush buffer if it's full or enough time has passed
if len(buffer) >= buffer_size or (time.time() - last_flush) >= flush_interval:
print(buffer, end='', flush=True)
buffer = ""
last_flush = time.time()
# Flush remaining buffer
if buffer:
print(buffer, end='', flush=True)
buffered_streaming()
Stream Processing with Callbacks
For applications that need to process chunks with custom logic:
Python SDK
TypeScript SDK
from perplexity import Perplexity
from typing import Callable, Optional
def stream_with_callbacks(
query: str,
on_content: Optional[Callable[[str], None]] = None,
on_search_results: Optional[Callable[[list], None]] = None,
on_complete: Optional[Callable[[str, dict], None]] = None
):
client = Perplexity()
stream = client.chat.completions.create(
model="sonar",
messages=[{"role": "user", "content": query}],
stream=True
)
full_content = ""
metadata = {}
for chunk in stream:
# Handle content chunks
if chunk.choices[0].delta.content:
content_piece = chunk.choices[0].delta.content
full_content += content_piece
if on_content:
on_content(content_piece)
# Handle search results
if hasattr(chunk, 'search_results') and chunk.search_results:
metadata['search_results'] = chunk.search_results
if on_search_results:
on_search_results(chunk.search_results)
# Handle other metadata
if hasattr(chunk, 'usage') and chunk.usage:
metadata['usage'] = chunk.usage
# Handle completion
if chunk.choices[0].finish_reason:
if on_complete:
on_complete(full_content, metadata)
return full_content, metadata
# Usage example
def print_content(content: str):
print(content, end='', flush=True)
def handle_search_results(results: list):
print(f"\n[Found {len(results)} sources]", end='')
def handle_completion(content: str, metadata: dict):
print(f"\n\nCompleted. Total length: {len(content)} characters")
if 'usage' in metadata:
print(f"Token usage: {metadata['usage']}")
stream_with_callbacks(
"Explain the latest developments in renewable energy",
on_content=print_content,
on_search_results=handle_search_results,
on_complete=handle_completion
)
Best Practices
Handle network interruptions
Implement reconnection logic for robust streaming applications. Python SDK
TypeScript SDK
import time
import random
from perplexity import Perplexity
import perplexity
def robust_streaming(query: str, max_retries: int = 3):
client = Perplexity()
for attempt in range(max_retries):
try:
stream = client.chat.completions.create(
model="sonar",
messages=[{"role": "user", "content": query}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end='', flush=True)
return # Success, exit retry loop
except (perplexity.APIConnectionError, perplexity.APITimeoutError) as e:
if attempt < max_retries - 1:
delay = (2 ** attempt) + random.uniform(0, 1)
print(f"\nConnection error, retrying in {delay:.1f}s...")
time.sleep(delay)
else:
print(f"\nFailed after {max_retries} attempts: {e}")
raise
robust_streaming("Explain quantum computing")
Implement proper buffering
Use appropriate buffering strategies for your application’s needs.# For real-time chat applications
buffer_size = 1 # Character-by-character for immediate display
# For document processing
buffer_size = 100 # Larger chunks for efficiency
# For API responses
buffer_size = 500 # Balance between responsiveness and efficiency
Handle metadata appropriately
Remember that search results and metadata arrive at the end of the stream.# Don't expect search results until the stream is complete
if chunk.choices[0].finish_reason == "stop":
# Now search results and usage info are available
process_search_results(chunk.search_results)
log_usage_stats(chunk.usage)
Optimize for your use case
Choose streaming parameters based on your application requirements. Real-time Chat
Document Generation
# Optimize for immediate response
stream = client.chat.completions.create(
model="sonar",
messages=messages,
stream=True,
max_tokens=1000, # Reasonable limit
temperature=0.7 # Balanced creativity
)
Important: If you need search results immediately for your user interface, consider using non-streaming requests for use cases where search result display is critical to the real-time user experience.
Resources