Conector de vídeo Integración de Pipecat
El transporte de Vonage Video Connector para Pipecat te permite crear aplicaciones que participen sin problemas en las sesiones de la Video API de Vonage. Este transporte te permite recibir audio y video de los participantes de la sesión y enviar audio y video procesados de vuelta a la procesados a la sesión en tiempo real.
Pipecat es un marco para crear aplicaciones de IA conversacional multimodal y de voz. El transporte de Vonage Video Connector conecta la canalización de procesamiento de medios de Pipecat con las sesiones de la API de vídeo de Vonage. sesiones de Video API de Vonage, lo que permite una amplia gama de casos de uso:
- Asistentes de voz y vídeo en tiempo real
- Servicios de transcripción y traducción en directo
- Grabación y análisis de llamadas
- Tratamiento de efectos de audio y vídeo
- Moderación automática y filtrado de contenidos
- Tratamiento y manipulación de medios personalizados
El transporte se encarga de la conversión de formatos de audio y vídeo, la gestión de sesiones y la conectividad WebRTC lo que le permite centrarse en crear la lógica de su aplicación.
Esta página incluye las siguientes secciones:
- Beta pública
- Requisitos
- Configuración básica
- Configuración del transporte
- Tratamiento de audio y vídeo
- Abono Stream
- Gestión de sesiones
- Integración de tuberías
- Buenas prácticas
Beta pública
La integración de Vonage Video Connector Pipecat está en fase beta. El código está disponible en https://github.com/Vonage/pipecat.
Requisitos
Para utilizar este transporte necesitarás la biblioteca python de Vonage Video Connector, que funciona en plataformas Linux AMD64 y ARM64.
Configuración básica
Parámetros de autenticación y sesión
Para usar el transporte Vonage Video Connector, necesitas:
- ID de aplicación - Tu identificador de aplicación de Video API de Vonage
- ID de sesión - ID de la sesión de la Video API a la que desea unirse
- Ficha - Un código de participante válido para la sesión
Estos parámetros pueden obtenerse de tu panel de control de la Video API de Vonage o generarse utilizando los SDK del servidor de la Video API de Vonage.
Inicializar el transporte
from pipecat.transports.vonage.video_connector import (
VonageVideoConnectorTransport,
VonageVideoConnectorTransportParams,
)
# Initialize the transport
transport = VonageVideoConnectorTransport(
application_id="your_application_id",
session_id="your_session_id",
token="your_participant_token",
params=VonageVideoConnectorTransportParams()
)
Configuración del transporte
Parámetros básicos de audio y vídeo
Configure el transporte para sus requisitos específicos de audio y vídeo:
from pipecat.audio.vad.silero import SileroVADAnalyzer
transport_params = VonageVideoConnectorTransportParams(
audio_in_enabled=True, # Enable receiving audio
audio_out_enabled=True, # Enable sending audio
video_in_enabled=True, # Enable receiving video
video_out_enabled=True, # Enable sending video
publisher_name="AI Assistant", # Name for the published stream
audio_in_sample_rate=16000, # Input sample rate (Hz)
audio_in_channels=1, # Input channels
audio_out_sample_rate=24000, # Output sample rate (Hz)
audio_out_channels=1, # Output channels
video_out_width=1280, # Output video width
video_out_height=720, # Output video height
video_out_framerate=30, # Output video framerate
video_out_color_format="RGB", # Output video color format
vad_analyzer=SileroVADAnalyzer(), # Voice activity detection
audio_in_auto_subscribe=True, # Auto-subscribe to audio streams
video_in_auto_subscribe=False, # Auto-subscribe to video streams
video_in_preferred_resolution=(640, 480), # Preferred input resolution
video_in_preferred_framerate=15, # Preferred input framerate
publisher_enable_opus_dtx=False, # Enable Opus DTX
session_enable_migration=False, # Enable session migration
video_connector_log_level="INFO", # Log level
clear_buffers_on_interruption=True, # Clear buffers on interruption
)
transport = VonageVideoConnectorTransport(
application_id,
session_id,
token,
params=transport_params
)
Detección de actividad vocal (VAD)
Es aconsejable utilizar este transporte con la Detección de Actividad de Voz para optimizar el procesamiento de audio:
from pipecat.audio.vad.silero import SileroVADAnalyzer
# Configure VAD for better audio processing
vad = SileroVADAnalyzer()
transport_params = VonageVideoConnectorTransportParams(
vad_analyzer=vad,
# ... other parameters
)
VAD ayuda a reducir el procesamiento innecesario mediante la detección de cuando el habla está presente en el flujo de audio.
Compensación del búfer en las interrupciones
En clear_buffers_on_interruption determina si las memorias intermedias
automáticamente cuando se recibe una trama de interrupción en el canal.
transport_params = VonageVideoConnectorTransportParams(
clear_buffers_on_interruption=True, # Default: True
# ... other parameters
)
Cuándo activar (Truepor defecto):
- Aplicaciones de IA conversacional en las que se desea detener la reproducción inmediatamente cuando el usuario interrumpe
- Asistentes de voz interactivos que deben responder con rapidez a las preguntas de los usuarios.
- Applications where outdated audio/video should be discarded to maintain real-time interaction
- Escenarios en los que minimizar la latencia es más importante que completar la reproducción multimedia
Cuándo desactivar (False):
- Aplicaciones de grabación o streaming en las que desee conservar todos los medios
- Applications that need to complete playback of important information even if interrupted
- Escenarios de procesamiento por lotes en los que los soportes deben procesarse secuencialmente sin interrupción
- Casos en los que se implementa una lógica de gestión de interrupciones personalizada
Tratamiento de audio y vídeo
Procesamiento de entradas de audio y vídeo
El transporte convierte automáticamente el audio y el video entrantes de la sesión de Vonage Video a los formatos de medios internos de Pipecat:
# Audio and video input is handled automatically by the transport
# Incoming audio frames are converted to AudioRawFrame format
# Incoming video frames are converted to ImageRawFrame format
pipeline = Pipeline([
transport.input(), # Receives audio and video from Vonage session
# ... your AI processing pipeline
])
Generación de salida de audio y vídeo
Envía audio y video de vuelta a la sesión de Vonage Video:
# Audio and video output is sent automatically through the pipeline
pipeline = Pipeline([
# ... your AI processing pipeline
transport.output(), # Sends audio and video to Vonage session
])
Suscripción a Stream
Cuando el transporte se suscribe a flujos de participantes en la sesión, genera tramas Pipecat que su pipeline puede procesar. El comportamiento difiere entre audio y vídeo:
Secuencias de vídeo: El transporte genera fotogramas de vídeo individuales para cada flujo suscrito, identificados por el ID del flujo. Esto le permite procesar vídeo de diferentes participantes por separado.
Secuencias de audio: El transporte genera actualmente tramas de audio con todos los flujos de audio suscritos suscritos mezclados. Todo el audio de los participantes se combina en un único flujo de audio que recibe su canalización.
Por defecto, el transporte se suscribe automáticamente a los flujos basándose en la dirección audio_in_auto_subscribe
y video_in_auto_subscribe parámetros. También puede controlar manualmente a qué flujos suscribirse
para un control más preciso.
Suscripción manual
Si necesita más control sobre los flujos a los que suscribirse, puede desactivar la suscripción automática y suscribirte manualmente a flujos específicos:
from pipecat.transports.vonage.video_connector import SubscribeSettings
# Disable auto-subscription
transport_params = VonageVideoConnectorTransportParams(
audio_in_auto_subscribe=False,
video_in_auto_subscribe=False,
# ... other parameters
)
# Manually subscribe when a participant joins
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, data):
stream_id = data['streamId']
logger.info(f"Participant joined with stream {stream_id}, subscribing...")
await transport.subscribe_to_stream(
stream_id,
SubscribeSettings(
subscribe_to_audio=True,
subscribe_to_video=True,
preferred_resolution=(640, 480),
preferred_framerate=15
)
)
Cuándo utilizar la suscripción automática (audio_in_auto_subscribe=True, video_in_auto_subscribe=Truepor defecto):
- Applications simples en las que desea recibir todos los flujos de todos los participantes
- Asistentes de voz o vídeo que necesitan interactuar con todos los participantes en la sesión
- Aplicaciones de grabación o seguimiento que deben captar a todos los participantes
- Casos en los que minimizar la complejidad del código es más importante que la suscripción selectiva
- Applications dans lesquelles tous les participants doivent être traités de la même manière
Cuándo utilizar la suscripción manual (audio_in_auto_subscribe=False, video_in_auto_subscribe=False):
- Applications that need to selectively subscribe based on participant metadata or session logic
- Escenarios en los que se desea optimizar el ancho de banda suscribiéndose sólo a flujos específicos
- Casos de uso que requieren ajustes de suscripción personalizados por participante (distintos niveles de calidad)
- Applications that need to validate or authenticate participants before subscribing
- Escenarios multipartitos complejos en los que se desea un control preciso de los flujos que se van a recibir.
Control de la calidad de vídeo con emisión simultánea
Al suscribirte a flujos de vídeo, puedes controlar la calidad de vídeo que recibes utilizando la opción
preferred_resolution y preferred_framerate parámetros. Estos parámetros son especialmente
útiles cuando el editor envía flujos de emisión simultánea (varias capas de calidad).
Transmisiones simultáneas contienen múltiples capas espaciales y temporales:
- Capas espaciales: Diferentes resoluciones (por ejemplo, 1280x720, 640x480, 320x240)
- Capas temporales: Diferentes velocidades de fotogramas (por ejemplo, 30fps, 15fps, 7,5fps)
Si especifica su resolución y velocidad de fotogramas preferidas, puede optimizar el uso del ancho de banda y los requisitos de procesamiento. requisitos de procesamiento:
# Subscribe to a lower quality stream for bandwidth efficiency
await transport.subscribe_to_stream(
stream_id,
SubscribeSettings(
subscribe_to_video=True,
preferred_resolution=(320, 240), # Request low spatial layer
preferred_framerate=15 # Request lower temporal layer
)
)
# Subscribe to a high quality stream for better visual fidelity
await transport.subscribe_to_stream(
stream_id,
SubscribeSettings(
subscribe_to_video=True,
preferred_resolution=(1280, 720), # Request high spatial layer
preferred_framerate=30 # Request higher temporal layer
)
)
Notas importantes:
- Si el editor no admite la emisión simultánea o la capa solicitada no está disponible, el servidor proporcionará la calidad disponible más próxima
- Las resoluciones y frecuencias de cuadro más bajas reducen el consumo de ancho de banda y la sobrecarga de procesamiento.
- Estos ajustes sólo afectan a la suscripción de vídeo; no controlan la calidad de salida del editor
- También puede establecer preferencias globales mediante
video_in_preferred_resolutionyvideo_in_preferred_framerateen los parámetros de transporte de los flujos de suscripción automática
Gestión de sesiones
Eventos del ciclo de vida de la sesión
Gestiona los eventos de entrada y salida de sesión:
# Handle when the transport joins the session
@transport.event_handler("on_joined")
async def on_joined(transport, data):
logger.info(f"Joined session {data['sessionId']}")
# Initialize your application state
await task.queue_frames([LLMMessagesFrame()])
# Handle when the transport leaves the session
@transport.event_handler("on_left")
async def on_left(transport):
logger.info("Left session")
# Clean up resources or save session data
# Handle session errors
@transport.event_handler("on_error")
async def on_error(transport, error):
logger.error(f"Session error: {error}")
Actos de los participantes
Supervise a los participantes que entran y salen de la sesión:
# Handle first participant joining (useful for triggering initial interactions)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, data):
logger.info(f"First participant joined: stream {data['streamId']}")
# Start recording, initialize conversation, etc.
# Handle any participant joining
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, data):
logger.info(f"Participant joined: stream {data['streamId']}")
# Update participant list, send greeting, etc.
# Handle participant leaving
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, data):
logger.info(f"Participant left: stream {data['streamId']}")
# Update participant list, handle cleanup
Eventos de conexión de clientes
Supervisar las conexiones individuales de los abonados al flujo:
# Handle when a subscriber successfully connects to a stream
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, data):
logger.info(f"Client connected to stream {data['subscriberId']}")
# Handle when a subscriber disconnects from a stream
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, data):
logger.info(f"Client disconnected from stream {data['subscriberId']}")
Integración de tuberías
Ejemplo de canalización completa
Aquí te mostramos cómo integrar el transporte de Vonage Video Connector con una canalización de IA completa:
import asyncio
import os
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.aws_nova_sonic.aws import AWSNovaSonicLLMService
from pipecat.transports.vonage.video_connector import (
VonageVideoConnectorTransport,
VonageVideoConnectorTransportParams,
SubscribeSettings,
)
async def main():
# Configure transport
transport = VonageVideoConnectorTransport(
application_id=os.getenv("VONAGE_APPLICATION_ID"),
session_id=os.getenv("VONAGE_SESSION_ID"),
token=os.getenv("VONAGE_TOKEN"),
params=VonageVideoConnectorTransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
publisher_name="AI Assistant",
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
vad_analyzer=SileroVADAnalyzer(),
audio_in_auto_subscribe=True,
)
)
# Set up AI service
llm = AWSNovaSonicLLMService(
secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY", ""),
access_key_id=os.getenv("AWS_ACCESS_KEY_ID", ""),
region=os.getenv("AWS_REGION", ""),
session_token=os.getenv("AWS_SESSION_TOKEN", ""),
)
# Create context for conversation
context = OpenAILLMContext(
messages=[
{"role": "system", "content": "You are a helpful AI assistant."},
]
)
context_aggregator = llm.create_context_aggregator(context)
# Build pipeline
pipeline = Pipeline([
transport.input(), # Audio from Vonage session
context_aggregator.user(), # Process user input
llm, # AI processing
transport.output(), # Audio back to session
])
task = PipelineTask(pipeline)
# Register event handlers
@transport.event_handler("on_joined")
async def on_joined(transport, data):
print(f"Joined session: {data['sessionId']}")
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, data):
print(f"First participant joined: {data['streamId']}")
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, data):
await task.queue_frames([LLMRunFrame()])
await llm.trigger_assistant_response()
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
Buenas prácticas
Optimización del rendimiento
Elija las frecuencias de muestreo adecuadas:
- Utiliza 16 kHz para el reconocimiento de voz y la mayoría de los servicios de IA
- Utiliza 24 kHz o más para mejorar la calidad de la conversión de texto a voz.
- Evitar frecuencias de muestreo innecesariamente altas que aumentan la carga de procesamiento.
Optimizar el procesamiento de canalizaciones:
- Mantener la eficiencia de los canales de procesamiento de IA para minimizar la latencia
- Utilizar tamaños de trama y gestión de búfer adecuados
- Considere la posibilidad de utilizar el DVA para reducir los trámites innecesarios
Depuración y supervisión
Activar el registro:
from loguru import logger
# Pipecat uses loguru for logging
logger.enable("pipecat")
# Set Vonage Video Connector log level
transport_params = VonageVideoConnectorTransportParams(
video_connector_log_level="DEBUG", # DEBUG, INFO, WARNING, ERROR
# ... other parameters
)