Conector de vídeo Integración de Pipecat

El transporte de Vonage Video Connector para Pipecat te permite crear aplicaciones que participen sin problemas en las sesiones de la Video API de Vonage. Este transporte te permite recibir audio y video de los participantes de la sesión y enviar audio y video procesados de vuelta a la procesados a la sesión en tiempo real.

Pipecat es un marco para crear aplicaciones de IA conversacional multimodal y de voz. El transporte de Vonage Video Connector conecta la canalización de procesamiento de medios de Pipecat con las sesiones de la API de vídeo de Vonage. sesiones de Video API de Vonage, lo que permite una amplia gama de casos de uso:

  • Asistentes de voz y vídeo en tiempo real
  • Servicios de transcripción y traducción en directo
  • Grabación y análisis de llamadas
  • Tratamiento de efectos de audio y vídeo
  • Moderación automática y filtrado de contenidos
  • Tratamiento y manipulación de medios personalizados

El transporte se encarga de la conversión de formatos de audio y vídeo, la gestión de sesiones y la conectividad WebRTC lo que le permite centrarse en crear la lógica de su aplicación.

Esta página incluye las siguientes secciones:

Beta pública

La integración de Vonage Video Connector Pipecat está en fase beta. El código está disponible en https://github.com/Vonage/pipecat.

Requisitos

Para utilizar este transporte necesitarás la biblioteca python de Vonage Video Connector, que funciona en plataformas Linux AMD64 y ARM64.

Configuración básica

Parámetros de autenticación y sesión

Para usar el transporte Vonage Video Connector, necesitas:

  • ID de aplicación - Tu identificador de aplicación de Video API de Vonage
  • ID de sesión - ID de la sesión de la Video API a la que desea unirse
  • Ficha - Un código de participante válido para la sesión

Estos parámetros pueden obtenerse de tu panel de control de la Video API de Vonage o generarse utilizando los SDK del servidor de la Video API de Vonage.

Inicializar el transporte

from pipecat.transports.vonage.video_connector import (
    VonageVideoConnectorTransport,
    VonageVideoConnectorTransportParams,
)

# Initialize the transport
transport = VonageVideoConnectorTransport(
    application_id="your_application_id",
    session_id="your_session_id",
    token="your_participant_token",
    params=VonageVideoConnectorTransportParams()
)

Configuración del transporte

Parámetros básicos de audio y vídeo

Configure el transporte para sus requisitos específicos de audio y vídeo:

from pipecat.audio.vad.silero import SileroVADAnalyzer

transport_params = VonageVideoConnectorTransportParams(
    audio_in_enabled=True,                          # Enable receiving audio
    audio_out_enabled=True,                         # Enable sending audio
    video_in_enabled=True,                          # Enable receiving video
    video_out_enabled=True,                         # Enable sending video
    publisher_name="AI Assistant",                  # Name for the published stream
    audio_in_sample_rate=16000,                     # Input sample rate (Hz)
    audio_in_channels=1,                            # Input channels
    audio_out_sample_rate=24000,                    # Output sample rate (Hz)
    audio_out_channels=1,                           # Output channels
    video_out_width=1280,                           # Output video width
    video_out_height=720,                           # Output video height
    video_out_framerate=30,                         # Output video framerate
    video_out_color_format="RGB",                   # Output video color format
    vad_analyzer=SileroVADAnalyzer(),               # Voice activity detection
    audio_in_auto_subscribe=True,                   # Auto-subscribe to audio streams
    video_in_auto_subscribe=False,                  # Auto-subscribe to video streams
    video_in_preferred_resolution=(640, 480),       # Preferred input resolution
    video_in_preferred_framerate=15,                # Preferred input framerate
    publisher_enable_opus_dtx=False,                # Enable Opus DTX
    session_enable_migration=False,                 # Enable session migration
    video_connector_log_level="INFO",               # Log level
    clear_buffers_on_interruption=True,             # Clear buffers on interruption
)

transport = VonageVideoConnectorTransport(
    application_id,
    session_id,
    token,
    params=transport_params
)

Detección de actividad vocal (VAD)

Es aconsejable utilizar este transporte con la Detección de Actividad de Voz para optimizar el procesamiento de audio:

from pipecat.audio.vad.silero import SileroVADAnalyzer

# Configure VAD for better audio processing
vad = SileroVADAnalyzer()
transport_params = VonageVideoConnectorTransportParams(
    vad_analyzer=vad,
    # ... other parameters
)

VAD ayuda a reducir el procesamiento innecesario mediante la detección de cuando el habla está presente en el flujo de audio.

Compensación del búfer en las interrupciones

En clear_buffers_on_interruption determina si las memorias intermedias automáticamente cuando se recibe una trama de interrupción en el canal.

transport_params = VonageVideoConnectorTransportParams(
    clear_buffers_on_interruption=True,  # Default: True
    # ... other parameters
)

Cuándo activar (Truepor defecto):

  • Aplicaciones de IA conversacional en las que se desea detener la reproducción inmediatamente cuando el usuario interrumpe
  • Asistentes de voz interactivos que deben responder con rapidez a las preguntas de los usuarios.
  • Applications where outdated audio/video should be discarded to maintain real-time interaction
  • Escenarios en los que minimizar la latencia es más importante que completar la reproducción multimedia

Cuándo desactivar (False):

  • Aplicaciones de grabación o streaming en las que desee conservar todos los medios
  • Applications that need to complete playback of important information even if interrupted
  • Escenarios de procesamiento por lotes en los que los soportes deben procesarse secuencialmente sin interrupción
  • Casos en los que se implementa una lógica de gestión de interrupciones personalizada

Tratamiento de audio y vídeo

Procesamiento de entradas de audio y vídeo

El transporte convierte automáticamente el audio y el video entrantes de la sesión de Vonage Video a los formatos de medios internos de Pipecat:

# Audio and video input is handled automatically by the transport
# Incoming audio frames are converted to AudioRawFrame format
# Incoming video frames are converted to ImageRawFrame format
pipeline = Pipeline([
    transport.input(),     # Receives audio and video from Vonage session
    # ... your AI processing pipeline
])

Generación de salida de audio y vídeo

Envía audio y video de vuelta a la sesión de Vonage Video:

# Audio and video output is sent automatically through the pipeline
pipeline = Pipeline([
    # ... your AI processing pipeline
    transport.output(),    # Sends audio and video to Vonage session
])

Suscripción a Stream

Cuando el transporte se suscribe a flujos de participantes en la sesión, genera tramas Pipecat que su pipeline puede procesar. El comportamiento difiere entre audio y vídeo:

Secuencias de vídeo: El transporte genera fotogramas de vídeo individuales para cada flujo suscrito, identificados por el ID del flujo. Esto le permite procesar vídeo de diferentes participantes por separado.

Secuencias de audio: El transporte genera actualmente tramas de audio con todos los flujos de audio suscritos suscritos mezclados. Todo el audio de los participantes se combina en un único flujo de audio que recibe su canalización.

Por defecto, el transporte se suscribe automáticamente a los flujos basándose en la dirección audio_in_auto_subscribe y video_in_auto_subscribe parámetros. También puede controlar manualmente a qué flujos suscribirse para un control más preciso.

Suscripción manual

Si necesita más control sobre los flujos a los que suscribirse, puede desactivar la suscripción automática y suscribirte manualmente a flujos específicos:

from pipecat.transports.vonage.video_connector import SubscribeSettings

# Disable auto-subscription
transport_params = VonageVideoConnectorTransportParams(
    audio_in_auto_subscribe=False,
    video_in_auto_subscribe=False,
    # ... other parameters
)

# Manually subscribe when a participant joins
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, data):
    stream_id = data['streamId']
    logger.info(f"Participant joined with stream {stream_id}, subscribing...")
    await transport.subscribe_to_stream(
        stream_id,
        SubscribeSettings(
            subscribe_to_audio=True,
            subscribe_to_video=True,
            preferred_resolution=(640, 480),
            preferred_framerate=15
        )
    )

Cuándo utilizar la suscripción automática (audio_in_auto_subscribe=True, video_in_auto_subscribe=Truepor defecto):

  • Applications simples en las que desea recibir todos los flujos de todos los participantes
  • Asistentes de voz o vídeo que necesitan interactuar con todos los participantes en la sesión
  • Aplicaciones de grabación o seguimiento que deben captar a todos los participantes
  • Casos en los que minimizar la complejidad del código es más importante que la suscripción selectiva
  • Applications dans lesquelles tous les participants doivent être traités de la même manière

Cuándo utilizar la suscripción manual (audio_in_auto_subscribe=False, video_in_auto_subscribe=False):

  • Applications that need to selectively subscribe based on participant metadata or session logic
  • Escenarios en los que se desea optimizar el ancho de banda suscribiéndose sólo a flujos específicos
  • Casos de uso que requieren ajustes de suscripción personalizados por participante (distintos niveles de calidad)
  • Applications that need to validate or authenticate participants before subscribing
  • Escenarios multipartitos complejos en los que se desea un control preciso de los flujos que se van a recibir.

Control de la calidad de vídeo con emisión simultánea

Al suscribirte a flujos de vídeo, puedes controlar la calidad de vídeo que recibes utilizando la opción preferred_resolution y preferred_framerate parámetros. Estos parámetros son especialmente útiles cuando el editor envía flujos de emisión simultánea (varias capas de calidad).

Transmisiones simultáneas contienen múltiples capas espaciales y temporales:

  • Capas espaciales: Diferentes resoluciones (por ejemplo, 1280x720, 640x480, 320x240)
  • Capas temporales: Diferentes velocidades de fotogramas (por ejemplo, 30fps, 15fps, 7,5fps)

Si especifica su resolución y velocidad de fotogramas preferidas, puede optimizar el uso del ancho de banda y los requisitos de procesamiento. requisitos de procesamiento:

# Subscribe to a lower quality stream for bandwidth efficiency
await transport.subscribe_to_stream(
    stream_id,
    SubscribeSettings(
        subscribe_to_video=True,
        preferred_resolution=(320, 240),  # Request low spatial layer
        preferred_framerate=15            # Request lower temporal layer
    )
)

# Subscribe to a high quality stream for better visual fidelity
await transport.subscribe_to_stream(
    stream_id,
    SubscribeSettings(
        subscribe_to_video=True,
        preferred_resolution=(1280, 720),  # Request high spatial layer
        preferred_framerate=30             # Request higher temporal layer
    )
)

Notas importantes:

  • Si el editor no admite la emisión simultánea o la capa solicitada no está disponible, el servidor proporcionará la calidad disponible más próxima
  • Las resoluciones y frecuencias de cuadro más bajas reducen el consumo de ancho de banda y la sobrecarga de procesamiento.
  • Estos ajustes sólo afectan a la suscripción de vídeo; no controlan la calidad de salida del editor
  • También puede establecer preferencias globales mediante video_in_preferred_resolution y video_in_preferred_framerate en los parámetros de transporte de los flujos de suscripción automática

Gestión de sesiones

Eventos del ciclo de vida de la sesión

Gestiona los eventos de entrada y salida de sesión:

# Handle when the transport joins the session
@transport.event_handler("on_joined")
async def on_joined(transport, data):
    logger.info(f"Joined session {data['sessionId']}")
    # Initialize your application state
    await task.queue_frames([LLMMessagesFrame()])

# Handle when the transport leaves the session
@transport.event_handler("on_left")
async def on_left(transport):
    logger.info("Left session")
    # Clean up resources or save session data

# Handle session errors
@transport.event_handler("on_error")
async def on_error(transport, error):
    logger.error(f"Session error: {error}")

Actos de los participantes

Supervise a los participantes que entran y salen de la sesión:

# Handle first participant joining (useful for triggering initial interactions)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, data):
    logger.info(f"First participant joined: stream {data['streamId']}")
    # Start recording, initialize conversation, etc.

# Handle any participant joining
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, data):
    logger.info(f"Participant joined: stream {data['streamId']}")
    # Update participant list, send greeting, etc.

# Handle participant leaving
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, data):
    logger.info(f"Participant left: stream {data['streamId']}")
    # Update participant list, handle cleanup

Eventos de conexión de clientes

Supervisar las conexiones individuales de los abonados al flujo:

# Handle when a subscriber successfully connects to a stream
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, data):
    logger.info(f"Client connected to stream {data['subscriberId']}")

# Handle when a subscriber disconnects from a stream
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, data):
    logger.info(f"Client disconnected from stream {data['subscriberId']}")

Integración de tuberías

Ejemplo de canalización completa

Aquí te mostramos cómo integrar el transporte de Vonage Video Connector con una canalización de IA completa:

import asyncio
import os

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.aws_nova_sonic.aws import AWSNovaSonicLLMService
from pipecat.transports.vonage.video_connector import (
    VonageVideoConnectorTransport,
    VonageVideoConnectorTransportParams,
    SubscribeSettings,
)

async def main():
    # Configure transport
    transport = VonageVideoConnectorTransport(
        application_id=os.getenv("VONAGE_APPLICATION_ID"),
        session_id=os.getenv("VONAGE_SESSION_ID"),
        token=os.getenv("VONAGE_TOKEN"),
        params=VonageVideoConnectorTransportParams(
            audio_in_enabled=True,
            audio_out_enabled=True,
            publisher_name="AI Assistant",
            audio_in_sample_rate=16000,
            audio_out_sample_rate=24000,
            vad_analyzer=SileroVADAnalyzer(),
            audio_in_auto_subscribe=True,
        )
    )

    # Set up AI service
    llm = AWSNovaSonicLLMService(
        secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY", ""),
        access_key_id=os.getenv("AWS_ACCESS_KEY_ID", ""),
        region=os.getenv("AWS_REGION", ""),
        session_token=os.getenv("AWS_SESSION_TOKEN", ""),
    )

    # Create context for conversation
    context = OpenAILLMContext(
        messages=[
            {"role": "system", "content": "You are a helpful AI assistant."},
        ]
    )
    context_aggregator = llm.create_context_aggregator(context)

    # Build pipeline
    pipeline = Pipeline([
        transport.input(),           # Audio from Vonage session
        context_aggregator.user(),   # Process user input
        llm,                        # AI processing
        transport.output(),         # Audio back to session
    ])

    task = PipelineTask(pipeline)

    # Register event handlers
    @transport.event_handler("on_joined")
    async def on_joined(transport, data):
        print(f"Joined session: {data['sessionId']}")

    @transport.event_handler("on_first_participant_joined")
    async def on_first_participant_joined(transport, data):
        print(f"First participant joined: {data['streamId']}")

    @transport.event_handler("on_client_connected")
    async def on_client_connected(transport, data):
        await task.queue_frames([LLMRunFrame()])
        await llm.trigger_assistant_response()


    runner = PipelineRunner()
    await runner.run(task)

if __name__ == "__main__":
    asyncio.run(main())

Buenas prácticas

Optimización del rendimiento

  1. Elija las frecuencias de muestreo adecuadas:

    • Utiliza 16 kHz para el reconocimiento de voz y la mayoría de los servicios de IA
    • Utiliza 24 kHz o más para mejorar la calidad de la conversión de texto a voz.
    • Evitar frecuencias de muestreo innecesariamente altas que aumentan la carga de procesamiento.
  2. Optimizar el procesamiento de canalizaciones:

    • Mantener la eficiencia de los canales de procesamiento de IA para minimizar la latencia
    • Utilizar tamaños de trama y gestión de búfer adecuados
    • Considere la posibilidad de utilizar el DVA para reducir los trámites innecesarios

Depuración y supervisión

Activar el registro:

from loguru import logger

# Pipecat uses loguru for logging
logger.enable("pipecat")

# Set Vonage Video Connector log level
transport_params = VonageVideoConnectorTransportParams(
    video_connector_log_level="DEBUG",  # DEBUG, INFO, WARNING, ERROR
    # ... other parameters
)