Connecteur vidéo Intégration Pipecat
Le transport Vonage Video Connector pour Pipecat vous permet de créer des Applications qui qui participent de façon transparente aux sessions de l'API Video de Vonage. Ce transport vous permet de recevoir audio et vidéo des participants à la session et d'envoyer l'audio et la vidéo traités à la session en temps réel. session en temps réel.
Pipecat est un cadre de travail pour la construction d'applications d'IA conversationnelle vocale et multimodale. Le transport Vonage Video Connector fait le lien entre le pipeline de traitement des médias de Pipecat et les sessions de l'API vidéo de Vonage, ce qui permet un large éventail de cas d'utilisation. sessions de l'API Video de Vonage, ce qui permet un large éventail de cas d'utilisation :
- Assistants IA vocaux et vidéo en temps réel
- Services de transcription et de traduction en direct
- Enregistrement et analyse des appels
- Traitement des effets audio et vidéo
- Modération automatisée et filtrage du contenu
- Traitement et manipulation personnalisés des médias
Le transport prend en charge la conversion des formats audio et vidéo, la gestion des sessions et la connectivité WebRTC ce qui vous permet de vous concentrer sur la construction de votre logique d'application.
Cette page comprend les sections suivantes :
- Bêta publique
- Exigences
- Configuration de base
- Configuration du transport
- Traitement de l'audio et de la vidéo
- Abonnement au streaming
- Gestion des sessions
- Intégration des pipelines
- Meilleures pratiques
Bêta publique
L'intégration de Vonage Video Connector Pipecat est en phase bêta. Le code est disponible à l'adresse suivante https://github.com/Vonage/pipecat.
Exigences
Pour utiliser ce transport, vous aurez besoin de la bibliothèque python Vonage Video Connector, qui fonctionne sur les plateformes Linux AMD64 et ARM64. fonctionne sur les plateformes Linux AMD64 et ARM64.
Configuration de base
Paramètres d'authentification et de session
Pour utiliser le transport Vonage Video Connector, vous devez :
- ID de l'application - Votre identifiant d'application Video API de Vonage
- ID de la session - L'ID de la session Video API que vous souhaitez rejoindre
- Jeton - Un jeton de participant valide pour la session
Ces paramètres peuvent être obtenus à partir de votre tableau de bord Video API de Vonage ou générés à l'aide des SDK du serveur Video API de Vonage. à l'aide des SDK du serveur Video API de Vonage.
Initialiser le transport
from pipecat.transports.vonage.video_connector import (
VonageVideoConnectorTransport,
VonageVideoConnectorTransportParams,
)
# Initialize the transport
transport = VonageVideoConnectorTransport(
application_id="your_application_id",
session_id="your_session_id",
token="your_participant_token",
params=VonageVideoConnectorTransportParams()
)
Configuration du transport
Paramètres audio et vidéo de base
Configurez le transport en fonction de vos besoins spécifiques en matière d'audio et de vidéo :
from pipecat.audio.vad.silero import SileroVADAnalyzer
transport_params = VonageVideoConnectorTransportParams(
audio_in_enabled=True, # Enable receiving audio
audio_out_enabled=True, # Enable sending audio
video_in_enabled=True, # Enable receiving video
video_out_enabled=True, # Enable sending video
publisher_name="AI Assistant", # Name for the published stream
audio_in_sample_rate=16000, # Input sample rate (Hz)
audio_in_channels=1, # Input channels
audio_out_sample_rate=24000, # Output sample rate (Hz)
audio_out_channels=1, # Output channels
video_out_width=1280, # Output video width
video_out_height=720, # Output video height
video_out_framerate=30, # Output video framerate
video_out_color_format="RGB", # Output video color format
vad_analyzer=SileroVADAnalyzer(), # Voice activity detection
audio_in_auto_subscribe=True, # Auto-subscribe to audio streams
video_in_auto_subscribe=False, # Auto-subscribe to video streams
video_in_preferred_resolution=(640, 480), # Preferred input resolution
video_in_preferred_framerate=15, # Preferred input framerate
publisher_enable_opus_dtx=False, # Enable Opus DTX
session_enable_migration=False, # Enable session migration
video_connector_log_level="INFO", # Log level
clear_buffers_on_interruption=True, # Clear buffers on interruption
)
transport = VonageVideoConnectorTransport(
application_id,
session_id,
token,
params=transport_params
)
Détection de l'activité vocale (VAD)
Il est conseillé d'utiliser ce transport avec la détection de l'activité vocale pour optimiser le traitement audio :
from pipecat.audio.vad.silero import SileroVADAnalyzer
# Configure VAD for better audio processing
vad = SileroVADAnalyzer()
transport_params = VonageVideoConnectorTransportParams(
vad_analyzer=vad,
# ... other parameters
)
La VAD permet de réduire les traitements inutiles en détectant la présence de la parole dans le flux audio. dans le flux audio.
Nettoyage de la mémoire tampon en cas d'interruption
Les clear_buffers_on_interruption détermine si les tampons de média sont
automatiquement effacés lorsqu'une trame d'interruption est reçue dans le pipeline.
transport_params = VonageVideoConnectorTransportParams(
clear_buffers_on_interruption=True, # Default: True
# ... other parameters
)
Quand activer (True(par défaut) :
- Applications d'IA conversationnelle où vous souhaitez arrêter immédiatement la lecture lorsque l'utilisateur l'interrompt.
- Les assistants vocaux interactifs qui doivent répondre rapidement aux demandes de l'utilisateur
- Applications dans lesquelles les données audio/vidéo obsolètes doivent être supprimées pour maintenir l'interaction en temps réel
- Scénarios dans lesquels la réduction de la latence est plus importante que l'achèvement de la lecture des médias
Quand désactiver (False) :
- Applications d'enregistrement ou de diffusion en continu dans lesquelles vous souhaitez conserver tous les médias.
- Applications qui doivent achever la lecture d'informations importantes même si elles sont interrompues.
- Scénarios de traitement par lots dans lesquels les supports doivent être traités de manière séquentielle sans interruption
- Cas d'utilisation où vous mettez en œuvre une logique de gestion des interruptions personnalisée
Traitement de l'audio et de la vidéo
Traitement des entrées audio et vidéo
Le transport convertit automatiquement l'audio et la vidéo entrants de la session Vonage Video dans les formats internes de Pipecat :
# Audio and video input is handled automatically by the transport
# Incoming audio frames are converted to AudioRawFrame format
# Incoming video frames are converted to ImageRawFrame format
pipeline = Pipeline([
transport.input(), # Receives audio and video from Vonage session
# ... your AI processing pipeline
])
Génération de sorties audio et vidéo
Renvoyer l'audio et la vidéo à la session Vonage Video :
# Audio and video output is sent automatically through the pipeline
pipeline = Pipeline([
# ... your AI processing pipeline
transport.output(), # Sends audio and video to Vonage session
])
Abonnement au streaming
Lorsque le transport s'abonne aux flux des participants à la session, il génère des trames Pipecat que votre pipeline peut traiter. Le comportement diffère selon qu'il s'agit d'audio ou de vidéo :
Flux vidéo: Le transport génère des images vidéo individuelles pour chaque flux souscrit, identifié par l'ID du flux. Cela vous permet de traiter la vidéo de différents participants séparément dans votre pipeline.
Flux audio: Le transport génère actuellement des trames audio avec tous les flux audio souscrits mélangés. souscrits, mélangés ensemble. Tous les flux audio des participants sont combinés en un seul flux audio que votre votre pipeline reçoit.
Par défaut, le transport s'abonne automatiquement aux flux sur la base de l'option audio_in_auto_subscribe
et video_in_auto_subscribe paramètres. Vous pouvez également contrôler manuellement les flux auxquels vous devez vous abonner pour un contrôle plus fin.
pour un contrôle plus fin.
Abonnement manuel à un flux de données
Si vous souhaitez mieux contrôler les flux auxquels vous vous abonnez, vous pouvez désactiver l'abonnement automatique et vous abonner manuellement à des flux spécifiques :
from pipecat.transports.vonage.video_connector import SubscribeSettings
# Disable auto-subscription
transport_params = VonageVideoConnectorTransportParams(
audio_in_auto_subscribe=False,
video_in_auto_subscribe=False,
# ... other parameters
)
# Manually subscribe when a participant joins
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, data):
stream_id = data['streamId']
logger.info(f"Participant joined with stream {stream_id}, subscribing...")
await transport.subscribe_to_stream(
stream_id,
SubscribeSettings(
subscribe_to_audio=True,
subscribe_to_video=True,
preferred_resolution=(640, 480),
preferred_framerate=15
)
)
Quand utiliser l'abonnement automatique (audio_in_auto_subscribe=True, video_in_auto_subscribe=True(par défaut) :
- Applications simples dans lesquelles vous souhaitez recevoir tous les flux de tous les participants
- Assistants vocaux ou vidéo qui doivent interagir avec toutes les personnes participant à la session
- Applications d'enregistrement ou de suivi qui doivent permettre de capter tous les participants
- Cas d'utilisation où la réduction de la complexité du code est plus importante que la souscription sélective
- Applications dans lesquelles tous les participants doivent être traités sur un pied d'égalité
Quand utiliser l'abonnement manuel (audio_in_auto_subscribe=False, video_in_auto_subscribe=False) :
- Applications qui ont besoin de s'abonner de manière sélective en fonction des métadonnées du participant ou de la logique de la session.
- Scénarios dans lesquels vous souhaitez optimiser la bande passante en vous abonnant uniquement à des flux spécifiques
- Cas d'utilisation nécessitant des paramètres d'abonnement personnalisés par participant (différents niveaux de qualité)
- Les applications qui ont besoin de valider ou d'authentifier les participants avant de s'abonner.
- Scénarios multipartites complexes dans lesquels vous souhaitez contrôler finement les flux à recevoir
Contrôle de la qualité vidéo avec la diffusion simultanée
Lorsque vous vous abonnez à des flux vidéo, vous pouvez contrôler la qualité vidéo que vous recevez à l'aide de la fonction
preferred_resolution et preferred_framerate paramètres. Ces paramètres sont particulièrement
particulièrement utiles lorsque l'éditeur envoie des flux simulcast (plusieurs couches de qualité).
Flux de diffusion simultanée contiennent plusieurs couches spatiales et temporelles :
- Couches spatiales: Différentes résolutions (par exemple, 1280x720, 640x480, 320x240)
- Couches temporelles: Différentes fréquences d'images (par exemple, 30 images/seconde, 15 images/seconde, 7,5 images/seconde)
En spécifiant votre résolution et votre fréquence d'images préférées, vous pouvez optimiser l'utilisation de la bande passante et les besoins de traitement. de traitement :
# Subscribe to a lower quality stream for bandwidth efficiency
await transport.subscribe_to_stream(
stream_id,
SubscribeSettings(
subscribe_to_video=True,
preferred_resolution=(320, 240), # Request low spatial layer
preferred_framerate=15 # Request lower temporal layer
)
)
# Subscribe to a high quality stream for better visual fidelity
await transport.subscribe_to_stream(
stream_id,
SubscribeSettings(
subscribe_to_video=True,
preferred_resolution=(1280, 720), # Request high spatial layer
preferred_framerate=30 # Request higher temporal layer
)
)
Remarques importantes :
- Si l'éditeur ne prend pas en charge la diffusion simultanée ou si la couche demandée n'est pas disponible, le serveur fournira la qualité disponible la plus proche
- Des résolutions et des fréquences d'images plus faibles réduisent la consommation de la bande passante et les frais généraux de traitement.
- Ces paramètres n'affectent que l'abonnement vidéo ; ils ne contrôlent pas la qualité de sortie de l'éditeur.
- Vous pouvez également définir des préférences globales en utilisant
video_in_preferred_resolutionetvideo_in_preferred_frameratedans les paramètres de transport pour les flux auto-souscrits
Gestion des sessions
Événements du cycle de vie de la session
Gérer les événements liés à l'entrée et à la sortie d'une session :
# Handle when the transport joins the session
@transport.event_handler("on_joined")
async def on_joined(transport, data):
logger.info(f"Joined session {data['sessionId']}")
# Initialize your application state
await task.queue_frames([LLMMessagesFrame()])
# Handle when the transport leaves the session
@transport.event_handler("on_left")
async def on_left(transport):
logger.info("Left session")
# Clean up resources or save session data
# Handle session errors
@transport.event_handler("on_error")
async def on_error(transport, error):
logger.error(f"Session error: {error}")
Manifestations des participants
Contrôler les participants qui rejoignent et quittent la session :
# Handle first participant joining (useful for triggering initial interactions)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, data):
logger.info(f"First participant joined: stream {data['streamId']}")
# Start recording, initialize conversation, etc.
# Handle any participant joining
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, data):
logger.info(f"Participant joined: stream {data['streamId']}")
# Update participant list, send greeting, etc.
# Handle participant leaving
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, data):
logger.info(f"Participant left: stream {data['streamId']}")
# Update participant list, handle cleanup
Événements de connexion du client
Surveiller les connexions individuelles des abonnés aux flux :
# Handle when a subscriber successfully connects to a stream
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, data):
logger.info(f"Client connected to stream {data['subscriberId']}")
# Handle when a subscriber disconnects from a stream
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, data):
logger.info(f"Client disconnected from stream {data['subscriberId']}")
Intégration des pipelines
Exemple de pipeline complet
Voici comment intégrer le transport Vonage Video Connector à un pipeline d'IA complet :
import asyncio
import os
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.aws_nova_sonic.aws import AWSNovaSonicLLMService
from pipecat.transports.vonage.video_connector import (
VonageVideoConnectorTransport,
VonageVideoConnectorTransportParams,
SubscribeSettings,
)
async def main():
# Configure transport
transport = VonageVideoConnectorTransport(
application_id=os.getenv("VONAGE_APPLICATION_ID"),
session_id=os.getenv("VONAGE_SESSION_ID"),
token=os.getenv("VONAGE_TOKEN"),
params=VonageVideoConnectorTransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
publisher_name="AI Assistant",
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
vad_analyzer=SileroVADAnalyzer(),
audio_in_auto_subscribe=True,
)
)
# Set up AI service
llm = AWSNovaSonicLLMService(
secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY", ""),
access_key_id=os.getenv("AWS_ACCESS_KEY_ID", ""),
region=os.getenv("AWS_REGION", ""),
session_token=os.getenv("AWS_SESSION_TOKEN", ""),
)
# Create context for conversation
context = OpenAILLMContext(
messages=[
{"role": "system", "content": "You are a helpful AI assistant."},
]
)
context_aggregator = llm.create_context_aggregator(context)
# Build pipeline
pipeline = Pipeline([
transport.input(), # Audio from Vonage session
context_aggregator.user(), # Process user input
llm, # AI processing
transport.output(), # Audio back to session
])
task = PipelineTask(pipeline)
# Register event handlers
@transport.event_handler("on_joined")
async def on_joined(transport, data):
print(f"Joined session: {data['sessionId']}")
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, data):
print(f"First participant joined: {data['streamId']}")
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, data):
await task.queue_frames([LLMRunFrame()])
await llm.trigger_assistant_response()
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
Meilleures pratiques
Optimisation des performances
Choisir des taux d'échantillonnage appropriés:
- Utilisez 16 kHz pour la reconnaissance vocale et la plupart des services d'intelligence artificielle.
- Utilisez 24kHz ou plus pour une meilleure qualité de la synthèse vocale.
- Éviter les taux d'échantillonnage élevés inutiles qui augmentent la charge de traitement
Optimiser le traitement du pipeline:
- Maintenir l'efficacité des pipelines de traitement de l'IA pour minimiser la latence.
- Utiliser des tailles de cadre et une gestion de la mémoire tampon appropriées
- Envisager l'utilisation de la VAD pour réduire les traitements inutiles
Débogage et surveillance
Activer la journalisation:
from loguru import logger
# Pipecat uses loguru for logging
logger.enable("pipecat")
# Set Vonage Video Connector log level
transport_params = VonageVideoConnectorTransportParams(
video_connector_log_level="DEBUG", # DEBUG, INFO, WARNING, ERROR
# ... other parameters
)