Videoanschluss Pipecat Integration
Mit dem Vonage Video Connector für Pipecat können Sie Anwendungen erstellen, die nahtlos an Vonage Video API Sitzungen teilnehmen. Dieser Transport ermöglicht Ihnen den Empfang von Audio- und Videodaten von Sitzungsteilnehmern empfangen und verarbeitete Audio- und Videodaten in Echtzeit an die Sitzung in Echtzeit.
Pipecat ist ein Framework für die Entwicklung von sprachgesteuerten und multimodalen KI-Applikationen. Der Vonage Video Connector überbrückt die Medienverarbeitungspipeline von Pipecat mit Vonage Video API-Sitzungen und ermöglicht so eine breite Palette von Anwendungsfällen:
- KI-Assistenten für Sprache und Video in Echtzeit
- Live-Transkriptions- und Übersetzungsdienste
- Aufzeichnung und Analyse von Anrufen
- Bearbeitung von Audio- und Videoeffekten
- Automatisierte Moderation und Inhaltsfilterung
- Benutzerdefinierte Medienverarbeitung und -manipulation
Der Transport übernimmt die Konvertierung von Audio- und Videoformaten, die Sitzungsverwaltung und die WebRTC Konnektivität, so dass Sie sich auf die Entwicklung Ihrer Anwendungslogik konzentrieren können.
Diese Seite enthält die folgenden Abschnitte:
- Private Beta
- Anforderungen
- Grundlegende Einrichtung
- Transport-Konfiguration
- Audio- und Videobearbeitung
- Stream-Abonnement
- Verwaltung der Sitzungen
- Integration von Rohrleitungen
- Bewährte Praktiken
Private Beta
Die Vonage Video Connector Pipecat Integration befindet sich in der Beta-Phase. Kontaktieren Sie uns, um frühen Zugang zu erhalten.
Anforderungen
Um diesen Transport zu nutzen, benötigen Sie die Vonage Video Connector Python-Bibliothek, die auf Linux AMD64 und ARM64 Plattformen läuft.
Grundlegende Einrichtung
Authentifizierung und Sitzungsparameter
Um den Vonage Video Connector Transport zu verwenden, benötigen Sie:
- Applikation ID - Die Kennung Ihrer Vonage Video API-Anwendung
- Sitzungs-ID - Die ID der Video API-Sitzung, der Sie beitreten möchten
- Token - Ein gültiges Teilnehmer-Token für die Sitzung
Diese Parameter können von Ihrem Vonage Video API Dashboard abgerufen oder mithilfe der Vonage Video API Server SDKs generiert werden.
Initialisieren Sie den Transport
from pipecat.transports.vonage.video_connector import (
VonageVideoConnectorTransport,
VonageVideoConnectorTransportParams,
)
# Initialize the transport
transport = VonageVideoConnectorTransport(
application_id="your_application_id",
session_id="your_session_id",
token="your_participant_token",
params=VonageVideoConnectorTransportParams()
)
Transport-Konfiguration
Grundlegende Audio- und Videoparameter
Konfigurieren Sie den Transport für Ihre spezifischen Audio- und Videoanforderungen:
from pipecat.audio.vad.silero import SileroVADAnalyzer
transport_params = VonageVideoConnectorTransportParams(
audio_in_enabled=True, # Enable receiving audio
audio_out_enabled=True, # Enable sending audio
video_in_enabled=True, # Enable receiving video
video_out_enabled=True, # Enable sending video
publisher_name="AI Assistant", # Name for the published stream
audio_in_sample_rate=16000, # Input sample rate (Hz)
audio_in_channels=1, # Input channels
audio_out_sample_rate=24000, # Output sample rate (Hz)
audio_out_channels=1, # Output channels
video_out_width=1280, # Output video width
video_out_height=720, # Output video height
video_out_framerate=30, # Output video framerate
video_out_color_format="RGB", # Output video color format
vad_analyzer=SileroVADAnalyzer(), # Voice activity detection
audio_in_auto_subscribe=True, # Auto-subscribe to audio streams
video_in_auto_subscribe=False, # Auto-subscribe to video streams
video_in_preferred_resolution=(640, 480), # Preferred input resolution
video_in_preferred_framerate=15, # Preferred input framerate
publisher_enable_opus_dtx=False, # Enable Opus DTX
session_enable_migration=False, # Enable session migration
video_connector_log_level="INFO", # Log level
clear_buffers_on_interruption=True, # Clear buffers on interruption
)
transport = VonageVideoConnectorTransport(
application_id,
session_id,
token,
params=transport_params
)
Erkennung von Sprachaktivität (VAD)
Es ist ratsam, diesen Transport mit der Sprachaktivitätserkennung zu verwenden, um die Audioverarbeitung zu optimieren:
from pipecat.audio.vad.silero import SileroVADAnalyzer
# Configure VAD for better audio processing
vad = SileroVADAnalyzer()
transport_params = VonageVideoConnectorTransportParams(
vad_analyzer=vad,
# ... other parameters
)
VAD hilft, unnötige Verarbeitung zu vermeiden, indem es erkennt, wenn Sprache im Audiostrom vorhanden ist. im Audiostrom.
Pufferlöschung bei Unterbrechungen
Die clear_buffers_on_interruption Parameter bestimmt, ob Medienpuffer
automatisch geleert werden, wenn ein Unterbrechungsrahmen in der Pipeline empfangen wird.
transport_params = VonageVideoConnectorTransportParams(
clear_buffers_on_interruption=True, # Default: True
# ... other parameters
)
Wann wird aktiviert (True, Standard):
- Konversations-KI-Applikationen, bei denen die Wiedergabe sofort gestoppt werden soll, wenn der Benutzer sie unterbricht
- Interaktive Sprachassistenten, die schnell auf Benutzereingaben reagieren müssen
- Applications, bei denen veraltete Audio-/Videodaten verworfen werden sollten, um die Interaktion in Echtzeit aufrechtzuerhalten
- Szenarien, in denen die Minimierung der Latenzzeit wichtiger ist als die Fertigstellung der Medienwiedergabe
Wann ist die Funktion zu deaktivieren? (False):
- Aufzeichnungs- oder Streaming-Anwendungen, bei denen Sie alle Medien aufbewahren möchten
- Applications, die die Wiedergabe wichtiger Informationen auch bei Unterbrechung fortsetzen müssen
- Stapelverarbeitungsszenarien, bei denen Medien ohne Unterbrechung sequentiell verarbeitet werden sollen
- Anwendungsfälle, in denen Sie eine benutzerdefinierte Logik für die Unterbrechungsbehandlung implementieren
Audio- und Videobearbeitung
Audio- und Videoeingangsverarbeitung
Der Transport konvertiert automatisch eingehende Audio- und Videodaten aus der Vonage Video-Sitzung in die internen Medienformate von Pipecat um:
# Audio and video input is handled automatically by the transport
# Incoming audio frames are converted to AudioRawFrame format
# Incoming video frames are converted to ImageRawFrame format
pipeline = Pipeline([
transport.input(), # Receives audio and video from Vonage session
# ... your AI processing pipeline
])
Erzeugung von Audio- und Videoausgaben
Senden Sie Audio- und Videodaten zurück an die Vonage Video-Sitzung:
# Audio and video output is sent automatically through the pipeline
pipeline = Pipeline([
# ... your AI processing pipeline
transport.output(), # Sends audio and video to Vonage session
])
Stream-Abonnement
Wenn der Transport Ströme von Sitzungsteilnehmern abonniert, erzeugt er Pipecat-Frames die Ihre Pipeline verarbeiten kann. Das Verhalten unterscheidet sich zwischen Audio und Video:
Video-Streams: Der Transport generiert einzelne Videobilder für jeden abonnierten Stream, identifiziert durch die Stream-ID. Auf diese Weise können Sie Videos von verschiedenen Teilnehmern getrennt in Ihrer Pipeline verarbeiten.
Audio-Streams: Der Transport erzeugt derzeit Audio-Frames mit allen abonnierten Audio Streams zusammengemischt. Alle Teilnehmer-Audiosignale werden zu einem einzigen Audiostrom kombiniert, den Ihre Pipeline empfängt.
Standardmäßig abonniert der Transport automatisch Ströme auf der Grundlage der audio_in_auto_subscribe
und video_in_auto_subscribe Parameter. Sie können auch manuell festlegen, welche Streams abonniert werden sollen
Sie können auch manuell festlegen, welche Streams abonniert werden sollen, um eine genauere Kontrolle zu erhalten.
Manuelles Stream-Abonnement
Wenn Sie mehr Kontrolle darüber haben möchten, welche Streams Sie abonnieren möchten, können Sie die automatische Anmeldung deaktivieren deaktivieren und bestimmte Streams manuell abonnieren:
from pipecat.transports.vonage.video_connector import SubscribeSettings
# Disable auto-subscription
transport_params = VonageVideoConnectorTransportParams(
audio_in_auto_subscribe=False,
video_in_auto_subscribe=False,
# ... other parameters
)
# Manually subscribe when a participant joins
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, data):
stream_id = data['streamId']
logger.info(f"Participant joined with stream {stream_id}, subscribing...")
await transport.subscribe_to_stream(
stream_id,
SubscribeSettings(
subscribe_to_audio=True,
subscribe_to_video=True,
preferred_resolution=(640, 480),
preferred_framerate=15
)
)
Wann ist ein automatisches Abonnement sinnvoll? (audio_in_auto_subscribe=True, video_in_auto_subscribe=True, Standard):
- Einfache Applications, bei denen Sie alle Streams von allen Teilnehmern empfangen möchten
- Sprach- oder Videoassistenten, die mit allen Sitzungsteilnehmern interagieren müssen
- Aufzeichnungs- oder Überwachungsanwendungen, die alle Teilnehmer erfassen sollen
- Anwendungsfälle, bei denen die Minimierung der Codekomplexität wichtiger ist als ein selektives Abonnement
- Applications, bei denen alle Teilnehmer gleich behandelt werden sollten
Wann ist ein manuelles Abonnement sinnvoll? (audio_in_auto_subscribe=False, video_in_auto_subscribe=False):
- Applikationen, die auf der Grundlage von Teilnehmer-Metadaten oder Sitzungslogik selektiv abonnieren müssen
- Szenarien, in denen Sie die Bandbreite optimieren möchten, indem Sie nur bestimmte Streams abonnieren
- Anwendungsfälle, die benutzerdefinierte Abonnementeinstellungen pro Teilnehmer erfordern (verschiedene Qualitätsstufen)
- Applikationen, die Teilnehmer vor der Anmeldung validieren oder authentifizieren müssen
- Komplexe Szenarien mit mehreren Teilnehmern, bei denen Sie fein abgestufte Kontrolle darüber wünschen, welche Streams Sie empfangen möchten
Kontrolle der Videoqualität mit Simulcast
Wenn Sie einen Videostream abonnieren, können Sie die Videoqualität, die Sie erhalten, mit der Option
preferred_resolution und preferred_framerate Parameter. Diese Parameter sind besonders
nützlich, wenn der Herausgeber Simulcast-Streams (mehrere Qualitätsebenen) sendet.
Simulcast-Streams mehrere räumliche und zeitliche Ebenen enthalten:
- Räumliche Schichten: Verschiedene Auflösungen (z. B. 1280x720, 640x480, 320x240)
- Zeitliche Schichten: Verschiedene Bildwiederholraten (z. B. 30fps, 15fps, 7,5fps)
Indem Sie Ihre bevorzugte Auflösung und Framerate angeben, können Sie die Bandbreitennutzung und Verarbeitungsanforderungen optimieren:
# Subscribe to a lower quality stream for bandwidth efficiency
await transport.subscribe_to_stream(
stream_id,
SubscribeSettings(
subscribe_to_video=True,
preferred_resolution=(320, 240), # Request low spatial layer
preferred_framerate=15 # Request lower temporal layer
)
)
# Subscribe to a high quality stream for better visual fidelity
await transport.subscribe_to_stream(
stream_id,
SubscribeSettings(
subscribe_to_video=True,
preferred_resolution=(1280, 720), # Request high spatial layer
preferred_framerate=30 # Request higher temporal layer
)
)
Wichtige Hinweise:
- Wenn der Verlag Simulcast nicht unterstützt oder die angeforderte Ebene nicht verfügbar ist, bietet der Server die nächstmögliche Qualität zur Verfügung
- Niedrigere Auflösungen und Frameraten verringern den Bandbreitenverbrauch und den Verarbeitungsaufwand
- Diese Einstellungen wirken sich nur auf das Videoabonnement aus; sie steuern nicht die Ausgabequalität des Herausgebers.
- Sie können auch globale Einstellungen vornehmen, indem Sie
video_in_preferred_resolutionundvideo_in_preferred_frameratein den Transportparametern für automatisch abonnierte Ströme
Sitzungsmanagement
Ereignisse im Lebenszyklus einer Sitzung
Behandlung von Ereignissen beim Beitritt und Verlassen von Sitzungen:
# Handle when the transport joins the session
@transport.event_handler("on_joined")
async def on_joined(transport, data):
logger.info(f"Joined session {data['sessionId']}")
# Initialize your application state
await task.queue_frames([LLMMessagesFrame()])
# Handle when the transport leaves the session
@transport.event_handler("on_left")
async def on_left(transport):
logger.info("Left session")
# Clean up resources or save session data
# Handle session errors
@transport.event_handler("on_error")
async def on_error(transport, error):
logger.error(f"Session error: {error}")
Veranstaltungen für Teilnehmer
Überwachen Sie die Teilnehmer beim Betreten und Verlassen der Sitzung:
# Handle first participant joining (useful for triggering initial interactions)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, data):
logger.info(f"First participant joined: stream {data['streamId']}")
# Start recording, initialize conversation, etc.
# Handle any participant joining
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, data):
logger.info(f"Participant joined: stream {data['streamId']}")
# Update participant list, send greeting, etc.
# Handle participant leaving
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, data):
logger.info(f"Participant left: stream {data['streamId']}")
# Update participant list, handle cleanup
Client-Verbindungsereignisse
Überwachung einzelner Stream-Teilnehmerverbindungen:
# Handle when a subscriber successfully connects to a stream
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, data):
logger.info(f"Client connected to stream {data['subscriberId']}")
# Handle when a subscriber disconnects from a stream
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, data):
logger.info(f"Client disconnected from stream {data['subscriberId']}")
Pipeline-Integration
Beispiel für eine vollständige Pipeline
Hier erfahren Sie, wie Sie den Vonage Video Connector Transport in eine komplette KI-Pipeline integrieren:
import asyncio
import os
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.aws_nova_sonic.aws import AWSNovaSonicLLMService
from pipecat.transports.vonage.video_connector import (
VonageVideoConnectorTransport,
VonageVideoConnectorTransportParams,
SubscribeSettings,
)
async def main():
# Configure transport
transport = VonageVideoConnectorTransport(
application_id=os.getenv("VONAGE_APPLICATION_ID"),
session_id=os.getenv("VONAGE_SESSION_ID"),
token=os.getenv("VONAGE_TOKEN"),
params=VonageVideoConnectorTransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
publisher_name="AI Assistant",
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
vad_analyzer=SileroVADAnalyzer(),
audio_in_auto_subscribe=True,
)
)
# Set up AI service
llm = AWSNovaSonicLLMService(
secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY", ""),
access_key_id=os.getenv("AWS_ACCESS_KEY_ID", ""),
region=os.getenv("AWS_REGION", ""),
session_token=os.getenv("AWS_SESSION_TOKEN", ""),
)
# Create context for conversation
context = OpenAILLMContext(
messages=[
{"role": "system", "content": "You are a helpful AI assistant."},
]
)
context_aggregator = llm.create_context_aggregator(context)
# Build pipeline
pipeline = Pipeline([
transport.input(), # Audio from Vonage session
context_aggregator.user(), # Process user input
llm, # AI processing
transport.output(), # Audio back to session
])
task = PipelineTask(pipeline)
# Register event handlers
@transport.event_handler("on_joined")
async def on_joined(transport, data):
print(f"Joined session: {data['sessionId']}")
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, data):
print(f"First participant joined: {data['streamId']}")
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, data):
await task.queue_frames([LLMRunFrame()])
await llm.trigger_assistant_response()
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
Bewährte Praktiken
Optimierung der Leistung
Wählen Sie geeignete Abtastraten:
- 16kHz für Spracherkennung und die meisten KI-Dienste verwenden
- Verwenden Sie 24kHz oder höher für eine bessere Text-zu-Sprache-Qualität
- Vermeiden Sie unnötig hohe Abtastraten, die die Verarbeitungslast erhöhen.
Optimierung der Pipeline-Verarbeitung:
- Effiziente KI-Verarbeitungspipelines zur Minimierung von Latenzzeiten
- Geeignete Rahmengrößen und Pufferverwaltung verwenden
- Erwägen Sie den Einsatz von VAD, um unnötige Verarbeitung zu vermeiden
Fehlersuche und Überwachung
Protokollierung einschalten:
from loguru import logger
# Pipecat uses loguru for logging
logger.enable("pipecat")
# Set Vonage Video Connector log level
transport_params = VonageVideoConnectorTransportParams(
video_connector_log_level="DEBUG", # DEBUG, INFO, WARNING, ERROR
# ... other parameters
)