Video Connector Pipecat Integration
The Vonage Video Connector transport for Pipecat enables you to build applications that seamlessly participate in Vonage Video API sessions. This transport allows you to receive audio and video from session participants and send processed audio and video back to the session in real-time.
Pipecat is a framework for building voice and multimodal conversational AI applications. The Vonage Video Connector transport bridges Pipecat's media processing pipeline with Vonage Video API sessions, enabling a wide range of use cases:
- Real-time voice and video AI assistants
- Live transcription and translation services
- Call recording and analysis
- Audio and video effects processing
- Automated moderation and content filtering
- Custom media processing and manipulation
The transport handles audio and video format conversion, session management, and WebRTC connectivity, allowing you to focus on building your application logic.
This page includes the following sections:
- Private beta
- Requirements
- Basic setup
- Transport configuration
- Audio and video handling
- Stream subscription
- Session management
- Pipeline integration
- Best practices
Private Beta
The Vonage Video Connector Pipecat integration is in beta stage. Contact us to get early access.
Requirements
In order to use this transport you will need the Vonage Video Connector python library, which runs on Linux AMD64 and ARM64 platforms.
Basic Setup
Authentication and session parameters
To use the Vonage Video Connector transport, you need:
- Application ID - Your Vonage Video API application identifier
- Session ID - The ID of the Video API session you want to join
- Token - A valid participant token for the session
These parameters can be obtained from your Vonage Video API dashboard or generated using the Vonage Video API server SDKs.
Initialize the transport
from pipecat.transports.vonage.video_connector import (
VonageVideoConnectorTransport,
VonageVideoConnectorTransportParams,
)
# Initialize the transport
transport = VonageVideoConnectorTransport(
application_id="your_application_id",
session_id="your_session_id",
token="your_participant_token",
params=VonageVideoConnectorTransportParams()
)
Transport Configuration
Basic audio and video parameters
Configure the transport for your specific audio and video requirements:
from pipecat.audio.vad.silero import SileroVADAnalyzer
transport_params = VonageVideoConnectorTransportParams(
audio_in_enabled=True, # Enable receiving audio
audio_out_enabled=True, # Enable sending audio
video_in_enabled=True, # Enable receiving video
video_out_enabled=True, # Enable sending video
publisher_name="AI Assistant", # Name for the published stream
audio_in_sample_rate=16000, # Input sample rate (Hz)
audio_in_channels=1, # Input channels
audio_out_sample_rate=24000, # Output sample rate (Hz)
audio_out_channels=1, # Output channels
video_out_width=1280, # Output video width
video_out_height=720, # Output video height
video_out_framerate=30, # Output video framerate
video_out_color_format="RGB", # Output video color format
vad_analyzer=SileroVADAnalyzer(), # Voice activity detection
audio_in_auto_subscribe=True, # Auto-subscribe to audio streams
video_in_auto_subscribe=False, # Auto-subscribe to video streams
video_in_preferred_resolution=(640, 480), # Preferred input resolution
video_in_preferred_framerate=15, # Preferred input framerate
publisher_enable_opus_dtx=False, # Enable Opus DTX
session_enable_migration=False, # Enable session migration
video_connector_log_level="INFO", # Log level
clear_buffers_on_interruption=True, # Clear buffers on interruption
)
transport = VonageVideoConnectorTransport(
application_id,
session_id,
token,
params=transport_params
)
Voice Activity Detection (VAD)
It is advisable to use this transport with Voice Activity Detection to optimize audio processing:
from pipecat.audio.vad.silero import SileroVADAnalyzer
# Configure VAD for better audio processing
vad = SileroVADAnalyzer()
transport_params = VonageVideoConnectorTransportParams(
vad_analyzer=vad,
# ... other parameters
)
VAD helps reduce unnecessary processing by detecting when speech is present in the audio stream.
Buffer clearing on interruptions
The clear_buffers_on_interruption parameter determines whether media buffers are
automatically cleared when an interruption frame is received in the pipeline.
transport_params = VonageVideoConnectorTransportParams(
clear_buffers_on_interruption=True, # Default: True
# ... other parameters
)
When to enable (True, default):
- Conversational AI applications where you want to stop playback immediately when the user interrupts
- Interactive voice assistants that need to respond quickly to user input
- Applications where outdated audio/video should be discarded to maintain real-time interaction
- Scenarios where minimizing latency is more important than completing media playback
When to disable (False):
- Recording or streaming applications where you want to preserve all media
- Applications that need to complete playback of important information even if interrupted
- Batch processing scenarios where media should be processed sequentially without interruption
- Use cases where you're implementing custom interruption handling logic
Audio and Video Handling
Audio and video input processing
The transport automatically converts incoming audio and video from the Vonage Video session to Pipecat's internal media formats:
# Audio and video input is handled automatically by the transport
# Incoming audio frames are converted to AudioRawFrame format
# Incoming video frames are converted to ImageRawFrame format
pipeline = Pipeline([
transport.input(), # Receives audio and video from Vonage session
# ... your AI processing pipeline
])
Audio and video output generation
Send audio and video back to the Vonage Video session:
# Audio and video output is sent automatically through the pipeline
pipeline = Pipeline([
# ... your AI processing pipeline
transport.output(), # Sends audio and video to Vonage session
])
Stream Subscription
When the transport subscribes to streams from session participants, it generates Pipecat frames that your pipeline can process. The behavior differs between audio and video:
Video streams: The transport generates individual video frames for each subscribed stream, identified by the stream ID. This allows you to process video from different participants separately in your pipeline.
Audio streams: The transport currently generates audio frames with all subscribed audio streams mixed together. All participant audio is combined into a single audio stream that your pipeline receives.
By default, the transport automatically subscribes to streams based on the audio_in_auto_subscribe
and video_in_auto_subscribe parameters. You can also manually control which streams to subscribe
to for more fine-grained control.
Manual stream subscription
If you need more control over which streams to subscribe to, you can disable auto-subscription and manually subscribe to specific streams:
from pipecat.transports.vonage.video_connector import SubscribeSettings
# Disable auto-subscription
transport_params = VonageVideoConnectorTransportParams(
audio_in_auto_subscribe=False,
video_in_auto_subscribe=False,
# ... other parameters
)
# Manually subscribe when a participant joins
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, data):
stream_id = data['streamId']
logger.info(f"Participant joined with stream {stream_id}, subscribing...")
await transport.subscribe_to_stream(
stream_id,
SubscribeSettings(
subscribe_to_audio=True,
subscribe_to_video=True,
preferred_resolution=(640, 480),
preferred_framerate=15
)
)
When to use automatic subscription (audio_in_auto_subscribe=True, video_in_auto_subscribe=True, default):
- Simple applications where you want to receive all streams from all participants
- Voice or video assistants that need to interact with everyone in the session
- Recording or monitoring applications that should capture all participants
- Use cases where minimizing code complexity is more important than selective subscription
- Applications where all participants should be treated equally
When to use manual subscription (audio_in_auto_subscribe=False, video_in_auto_subscribe=False):
- Applications that need to selectively subscribe based on participant metadata or session logic
- Scenarios where you want to optimize bandwidth by subscribing only to specific streams
- Use cases requiring custom subscription settings per participant (different quality levels)
- Applications that need to validate or authenticate participants before subscribing
- Complex multi-party scenarios where you want fine-grained control over which streams to receive
Controlling video quality with simulcast
When subscribing to video streams, you can control the video quality you receive using the
preferred_resolution and preferred_framerate parameters. These parameters are particularly
useful when the publisher is sending simulcast streams (multiple quality layers).
Simulcast streams contain multiple spatial and temporal layers:
- Spatial layers: Different resolutions (e.g., 1280x720, 640x480, 320x240)
- Temporal layers: Different framerates (e.g., 30fps, 15fps, 7.5fps)
By specifying your preferred resolution and framerate, you can optimize bandwidth usage and processing requirements:
# Subscribe to a lower quality stream for bandwidth efficiency
await transport.subscribe_to_stream(
stream_id,
SubscribeSettings(
subscribe_to_video=True,
preferred_resolution=(320, 240), # Request low spatial layer
preferred_framerate=15 # Request lower temporal layer
)
)
# Subscribe to a high quality stream for better visual fidelity
await transport.subscribe_to_stream(
stream_id,
SubscribeSettings(
subscribe_to_video=True,
preferred_resolution=(1280, 720), # Request high spatial layer
preferred_framerate=30 # Request higher temporal layer
)
)
Important notes:
- If the publisher doesn't support simulcast or the requested layer isn't available, the server will provide the closest available quality
- Lower resolutions and framerates reduce bandwidth consumption and processing overhead
- These settings only affect video subscription; they don't control the publisher's output quality
- You can also set global preferences using
video_in_preferred_resolutionandvideo_in_preferred_frameratein the transport parameters for auto-subscribed streams
Session Management
Session lifecycle events
Handle session join and leave events:
# Handle when the transport joins the session
@transport.event_handler("on_joined")
async def on_joined(transport, data):
logger.info(f"Joined session {data['sessionId']}")
# Initialize your application state
await task.queue_frames([LLMMessagesFrame()])
# Handle when the transport leaves the session
@transport.event_handler("on_left")
async def on_left(transport):
logger.info("Left session")
# Clean up resources or save session data
# Handle session errors
@transport.event_handler("on_error")
async def on_error(transport, error):
logger.error(f"Session error: {error}")
Participant events
Monitor participants joining and leaving the session:
# Handle first participant joining (useful for triggering initial interactions)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, data):
logger.info(f"First participant joined: stream {data['streamId']}")
# Start recording, initialize conversation, etc.
# Handle any participant joining
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, data):
logger.info(f"Participant joined: stream {data['streamId']}")
# Update participant list, send greeting, etc.
# Handle participant leaving
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, data):
logger.info(f"Participant left: stream {data['streamId']}")
# Update participant list, handle cleanup
Client connection events
Monitor individual stream subscriber connections:
# Handle when a subscriber successfully connects to a stream
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, data):
logger.info(f"Client connected to stream {data['subscriberId']}")
# Handle when a subscriber disconnects from a stream
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, data):
logger.info(f"Client disconnected from stream {data['subscriberId']}")
Pipeline Integration
Complete pipeline example
Here's how to integrate the Vonage Video Connector transport with a complete AI pipeline:
import asyncio
import os
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.aws_nova_sonic.aws import AWSNovaSonicLLMService
from pipecat.transports.vonage.video_connector import (
VonageVideoConnectorTransport,
VonageVideoConnectorTransportParams,
SubscribeSettings,
)
async def main():
# Configure transport
transport = VonageVideoConnectorTransport(
application_id=os.getenv("VONAGE_APPLICATION_ID"),
session_id=os.getenv("VONAGE_SESSION_ID"),
token=os.getenv("VONAGE_TOKEN"),
params=VonageVideoConnectorTransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
publisher_name="AI Assistant",
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
vad_analyzer=SileroVADAnalyzer(),
audio_in_auto_subscribe=True,
)
)
# Set up AI service
llm = AWSNovaSonicLLMService(
secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY", ""),
access_key_id=os.getenv("AWS_ACCESS_KEY_ID", ""),
region=os.getenv("AWS_REGION", ""),
session_token=os.getenv("AWS_SESSION_TOKEN", ""),
)
# Create context for conversation
context = OpenAILLMContext(
messages=[
{"role": "system", "content": "You are a helpful AI assistant."},
]
)
context_aggregator = llm.create_context_aggregator(context)
# Build pipeline
pipeline = Pipeline([
transport.input(), # Audio from Vonage session
context_aggregator.user(), # Process user input
llm, # AI processing
transport.output(), # Audio back to session
])
task = PipelineTask(pipeline)
# Register event handlers
@transport.event_handler("on_joined")
async def on_joined(transport, data):
print(f"Joined session: {data['sessionId']}")
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, data):
print(f"First participant joined: {data['streamId']}")
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, data):
await task.queue_frames([LLMRunFrame()])
await llm.trigger_assistant_response()
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
Best Practices
Performance optimization
Choose appropriate sample rates:
- Use 16kHz for speech recognition and most AI services
- Use 24kHz or higher for better text-to-speech quality
- Avoid unnecessary high sample rates that increase processing load
Optimize pipeline processing:
- Keep AI processing pipelines efficient to minimize latency
- Use appropriate frame sizes and buffer management
- Consider using VAD to reduce unnecessary processing
Debugging and monitoring
Enable logging:
from loguru import logger
# Pipecat uses loguru for logging
logger.enable("pipecat")
# Set Vonage Video Connector log level
transport_params = VonageVideoConnectorTransportParams(
video_connector_log_level="DEBUG", # DEBUG, INFO, WARNING, ERROR
# ... other parameters
)