Video Connector Pipecat Integration

The Vonage Video Connector transport for Pipecat enables you to build applications that seamlessly participate in Vonage Video API sessions. This transport allows you to receive audio and video from session participants and send processed audio and video back to the session in real-time.

Pipecat is a framework for building voice and multimodal conversational AI applications. The Vonage Video Connector transport bridges Pipecat's media processing pipeline with Vonage Video API sessions, enabling a wide range of use cases:

  • Real-time voice and video AI assistants
  • Live transcription and translation services
  • Call recording and analysis
  • Audio and video effects processing
  • Automated moderation and content filtering
  • Custom media processing and manipulation

The transport handles audio and video format conversion, session management, and WebRTC connectivity, allowing you to focus on building your application logic.

This page includes the following sections:

Private Beta

The Vonage Video Connector Pipecat integration is in beta stage. Contact us to get early access.

Requirements

In order to use this transport you will need the Vonage Video Connector python library, which runs on Linux AMD64 and ARM64 platforms.

Basic Setup

Authentication and session parameters

To use the Vonage Video Connector transport, you need:

  • Application ID - Your Vonage Video API application identifier
  • Session ID - The ID of the Video API session you want to join
  • Token - A valid participant token for the session

These parameters can be obtained from your Vonage Video API dashboard or generated using the Vonage Video API server SDKs.

Initialize the transport

from pipecat.transports.vonage.video_connector import (
    VonageVideoConnectorTransport,
    VonageVideoConnectorTransportParams,
)

# Initialize the transport
transport = VonageVideoConnectorTransport(
    application_id="your_application_id",
    session_id="your_session_id",
    token="your_participant_token",
    params=VonageVideoConnectorTransportParams()
)

Transport Configuration

Basic audio and video parameters

Configure the transport for your specific audio and video requirements:

from pipecat.audio.vad.silero import SileroVADAnalyzer

transport_params = VonageVideoConnectorTransportParams(
    audio_in_enabled=True,                          # Enable receiving audio
    audio_out_enabled=True,                         # Enable sending audio
    video_in_enabled=True,                          # Enable receiving video
    video_out_enabled=True,                         # Enable sending video
    publisher_name="AI Assistant",                  # Name for the published stream
    audio_in_sample_rate=16000,                     # Input sample rate (Hz)
    audio_in_channels=1,                            # Input channels
    audio_out_sample_rate=24000,                    # Output sample rate (Hz)
    audio_out_channels=1,                           # Output channels
    video_out_width=1280,                           # Output video width
    video_out_height=720,                           # Output video height
    video_out_framerate=30,                         # Output video framerate
    video_out_color_format="RGB",                   # Output video color format
    vad_analyzer=SileroVADAnalyzer(),               # Voice activity detection
    audio_in_auto_subscribe=True,                   # Auto-subscribe to audio streams
    video_in_auto_subscribe=False,                  # Auto-subscribe to video streams
    video_in_preferred_resolution=(640, 480),       # Preferred input resolution
    video_in_preferred_framerate=15,                # Preferred input framerate
    publisher_enable_opus_dtx=False,                # Enable Opus DTX
    session_enable_migration=False,                 # Enable session migration
    video_connector_log_level="INFO",               # Log level
    clear_buffers_on_interruption=True,             # Clear buffers on interruption
)

transport = VonageVideoConnectorTransport(
    application_id,
    session_id,
    token,
    params=transport_params
)

Voice Activity Detection (VAD)

It is advisable to use this transport with Voice Activity Detection to optimize audio processing:

from pipecat.audio.vad.silero import SileroVADAnalyzer

# Configure VAD for better audio processing
vad = SileroVADAnalyzer()
transport_params = VonageVideoConnectorTransportParams(
    vad_analyzer=vad,
    # ... other parameters
)

VAD helps reduce unnecessary processing by detecting when speech is present in the audio stream.

Buffer clearing on interruptions

The clear_buffers_on_interruption parameter determines whether media buffers are automatically cleared when an interruption frame is received in the pipeline.

transport_params = VonageVideoConnectorTransportParams(
    clear_buffers_on_interruption=True,  # Default: True
    # ... other parameters
)

When to enable (True, default):

  • Conversational AI applications where you want to stop playback immediately when the user interrupts
  • Interactive voice assistants that need to respond quickly to user input
  • Applications where outdated audio/video should be discarded to maintain real-time interaction
  • Scenarios where minimizing latency is more important than completing media playback

When to disable (False):

  • Recording or streaming applications where you want to preserve all media
  • Applications that need to complete playback of important information even if interrupted
  • Batch processing scenarios where media should be processed sequentially without interruption
  • Use cases where you're implementing custom interruption handling logic

Audio and Video Handling

Audio and video input processing

The transport automatically converts incoming audio and video from the Vonage Video session to Pipecat's internal media formats:

# Audio and video input is handled automatically by the transport
# Incoming audio frames are converted to AudioRawFrame format
# Incoming video frames are converted to ImageRawFrame format
pipeline = Pipeline([
    transport.input(),     # Receives audio and video from Vonage session
    # ... your AI processing pipeline
])

Audio and video output generation

Send audio and video back to the Vonage Video session:

# Audio and video output is sent automatically through the pipeline
pipeline = Pipeline([
    # ... your AI processing pipeline
    transport.output(),    # Sends audio and video to Vonage session
])

Stream Subscription

When the transport subscribes to streams from session participants, it generates Pipecat frames that your pipeline can process. The behavior differs between audio and video:

Video streams: The transport generates individual video frames for each subscribed stream, identified by the stream ID. This allows you to process video from different participants separately in your pipeline.

Audio streams: The transport currently generates audio frames with all subscribed audio streams mixed together. All participant audio is combined into a single audio stream that your pipeline receives.

By default, the transport automatically subscribes to streams based on the audio_in_auto_subscribe and video_in_auto_subscribe parameters. You can also manually control which streams to subscribe to for more fine-grained control.

Manual stream subscription

If you need more control over which streams to subscribe to, you can disable auto-subscription and manually subscribe to specific streams:

from pipecat.transports.vonage.video_connector import SubscribeSettings

# Disable auto-subscription
transport_params = VonageVideoConnectorTransportParams(
    audio_in_auto_subscribe=False,
    video_in_auto_subscribe=False,
    # ... other parameters
)

# Manually subscribe when a participant joins
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, data):
    stream_id = data['streamId']
    logger.info(f"Participant joined with stream {stream_id}, subscribing...")
    await transport.subscribe_to_stream(
        stream_id,
        SubscribeSettings(
            subscribe_to_audio=True,
            subscribe_to_video=True,
            preferred_resolution=(640, 480),
            preferred_framerate=15
        )
    )

When to use automatic subscription (audio_in_auto_subscribe=True, video_in_auto_subscribe=True, default):

  • Simple applications where you want to receive all streams from all participants
  • Voice or video assistants that need to interact with everyone in the session
  • Recording or monitoring applications that should capture all participants
  • Use cases where minimizing code complexity is more important than selective subscription
  • Applications where all participants should be treated equally

When to use manual subscription (audio_in_auto_subscribe=False, video_in_auto_subscribe=False):

  • Applications that need to selectively subscribe based on participant metadata or session logic
  • Scenarios where you want to optimize bandwidth by subscribing only to specific streams
  • Use cases requiring custom subscription settings per participant (different quality levels)
  • Applications that need to validate or authenticate participants before subscribing
  • Complex multi-party scenarios where you want fine-grained control over which streams to receive

Controlling video quality with simulcast

When subscribing to video streams, you can control the video quality you receive using the preferred_resolution and preferred_framerate parameters. These parameters are particularly useful when the publisher is sending simulcast streams (multiple quality layers).

Simulcast streams contain multiple spatial and temporal layers:

  • Spatial layers: Different resolutions (e.g., 1280x720, 640x480, 320x240)
  • Temporal layers: Different framerates (e.g., 30fps, 15fps, 7.5fps)

By specifying your preferred resolution and framerate, you can optimize bandwidth usage and processing requirements:

# Subscribe to a lower quality stream for bandwidth efficiency
await transport.subscribe_to_stream(
    stream_id,
    SubscribeSettings(
        subscribe_to_video=True,
        preferred_resolution=(320, 240),  # Request low spatial layer
        preferred_framerate=15            # Request lower temporal layer
    )
)

# Subscribe to a high quality stream for better visual fidelity
await transport.subscribe_to_stream(
    stream_id,
    SubscribeSettings(
        subscribe_to_video=True,
        preferred_resolution=(1280, 720),  # Request high spatial layer
        preferred_framerate=30             # Request higher temporal layer
    )
)

Important notes:

  • If the publisher doesn't support simulcast or the requested layer isn't available, the server will provide the closest available quality
  • Lower resolutions and framerates reduce bandwidth consumption and processing overhead
  • These settings only affect video subscription; they don't control the publisher's output quality
  • You can also set global preferences using video_in_preferred_resolution and video_in_preferred_framerate in the transport parameters for auto-subscribed streams

Session Management

Session lifecycle events

Handle session join and leave events:

# Handle when the transport joins the session
@transport.event_handler("on_joined")
async def on_joined(transport, data):
    logger.info(f"Joined session {data['sessionId']}")
    # Initialize your application state
    await task.queue_frames([LLMMessagesFrame()])

# Handle when the transport leaves the session
@transport.event_handler("on_left")
async def on_left(transport):
    logger.info("Left session")
    # Clean up resources or save session data

# Handle session errors
@transport.event_handler("on_error")
async def on_error(transport, error):
    logger.error(f"Session error: {error}")

Participant events

Monitor participants joining and leaving the session:

# Handle first participant joining (useful for triggering initial interactions)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, data):
    logger.info(f"First participant joined: stream {data['streamId']}")
    # Start recording, initialize conversation, etc.

# Handle any participant joining
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, data):
    logger.info(f"Participant joined: stream {data['streamId']}")
    # Update participant list, send greeting, etc.

# Handle participant leaving
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, data):
    logger.info(f"Participant left: stream {data['streamId']}")
    # Update participant list, handle cleanup

Client connection events

Monitor individual stream subscriber connections:

# Handle when a subscriber successfully connects to a stream
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, data):
    logger.info(f"Client connected to stream {data['subscriberId']}")

# Handle when a subscriber disconnects from a stream
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, data):
    logger.info(f"Client disconnected from stream {data['subscriberId']}")

Pipeline Integration

Complete pipeline example

Here's how to integrate the Vonage Video Connector transport with a complete AI pipeline:

import asyncio
import os

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.aws_nova_sonic.aws import AWSNovaSonicLLMService
from pipecat.transports.vonage.video_connector import (
    VonageVideoConnectorTransport,
    VonageVideoConnectorTransportParams,
    SubscribeSettings,
)

async def main():
    # Configure transport
    transport = VonageVideoConnectorTransport(
        application_id=os.getenv("VONAGE_APPLICATION_ID"),
        session_id=os.getenv("VONAGE_SESSION_ID"),
        token=os.getenv("VONAGE_TOKEN"),
        params=VonageVideoConnectorTransportParams(
            audio_in_enabled=True,
            audio_out_enabled=True,
            publisher_name="AI Assistant",
            audio_in_sample_rate=16000,
            audio_out_sample_rate=24000,
            vad_analyzer=SileroVADAnalyzer(),
            audio_in_auto_subscribe=True,
        )
    )

    # Set up AI service
    llm = AWSNovaSonicLLMService(
        secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY", ""),
        access_key_id=os.getenv("AWS_ACCESS_KEY_ID", ""),
        region=os.getenv("AWS_REGION", ""),
        session_token=os.getenv("AWS_SESSION_TOKEN", ""),
    )

    # Create context for conversation
    context = OpenAILLMContext(
        messages=[
            {"role": "system", "content": "You are a helpful AI assistant."},
        ]
    )
    context_aggregator = llm.create_context_aggregator(context)

    # Build pipeline
    pipeline = Pipeline([
        transport.input(),           # Audio from Vonage session
        context_aggregator.user(),   # Process user input
        llm,                        # AI processing
        transport.output(),         # Audio back to session
    ])

    task = PipelineTask(pipeline)

    # Register event handlers
    @transport.event_handler("on_joined")
    async def on_joined(transport, data):
        print(f"Joined session: {data['sessionId']}")

    @transport.event_handler("on_first_participant_joined")
    async def on_first_participant_joined(transport, data):
        print(f"First participant joined: {data['streamId']}")

    @transport.event_handler("on_client_connected")
    async def on_client_connected(transport, data):
        await task.queue_frames([LLMRunFrame()])
        await llm.trigger_assistant_response()


    runner = PipelineRunner()
    await runner.run(task)

if __name__ == "__main__":
    asyncio.run(main())

Best Practices

Performance optimization

  1. Choose appropriate sample rates:

    • Use 16kHz for speech recognition and most AI services
    • Use 24kHz or higher for better text-to-speech quality
    • Avoid unnecessary high sample rates that increase processing load
  2. Optimize pipeline processing:

    • Keep AI processing pipelines efficient to minimize latency
    • Use appropriate frame sizes and buffer management
    • Consider using VAD to reduce unnecessary processing

Debugging and monitoring

Enable logging:

from loguru import logger

# Pipecat uses loguru for logging
logger.enable("pipecat")

# Set Vonage Video Connector log level
transport_params = VonageVideoConnectorTransportParams(
    video_connector_log_level="DEBUG",  # DEBUG, INFO, WARNING, ERROR
    # ... other parameters
)