Videoanschluss Pipecat Integration
Mit dem Vonage Video Connector Transport für Pipecat können Sie KI-gestützte Applikationen erstellen, die nahtlos an Vonage Video API Sitzungen teilnehmen. Dieser Transport ermöglicht Ihnen den Empfang von Audio- und Videodaten von Sitzungsteilnehmern empfangen und verarbeitete Audio- und Videodaten in Echtzeit an die Sitzung in Echtzeit.
Pipecat ist ein Framework für die Entwicklung von sprachgesteuerten und multimodalen KI-Applikationen. Der Vonage Video Connector überbrückt die Medienverarbeitungspipeline von Pipecat mit Vonage Video API-Sitzungen und ermöglicht so eine breite Palette von Anwendungsfällen:
- KI-Assistenten für Sprache und Video in Echtzeit
- Live-Transkriptions- und Übersetzungsdienste
- Aufzeichnung und Analyse von Anrufen
- Bearbeitung von Audio- und Videoeffekten
- Automatisierte Moderation und Inhaltsfilterung
- Benutzerdefinierte Medienverarbeitung und -manipulation
Der Transport übernimmt die Konvertierung von Audio- und Videoformaten, die Sitzungsverwaltung und die WebRTC Konnektivität, so dass Sie sich auf die Entwicklung Ihrer Anwendungslogik konzentrieren können.
Diese Seite enthält die folgenden Abschnitte:
- Erste Schritte
- Anforderungen
- Grundlegende Einrichtung
- Transport-Konfiguration
- Audio- und Videobearbeitung
- Stream-Abonnement
- Einzelne Audio-Abonnenten
- Untertitel
- Verwaltung der Sitzungen
- Integration von Rohrleitungen
- Bewährte Praktiken
- Bekannte Probleme
Erste Schritte
Mit Vonage Video Transport for Pipecat können Sie KI-gesteuerte Applikationen erstellen, die nahtlos an Vonage Video API-Sitzungen teilnehmen und sowohl Audio- als auch Videoanwendungen unterstützen. Der Code ist verfügbar unter https://github.com/Vonage/pipecat.
Anforderungen
Um diesen Transport zu nutzen, benötigen Sie die Vonage Video Connector Python-Bibliothek, die auf Linux AMD64 und ARM64 Plattformen läuft.
Grundlegende Einrichtung
Authentifizierung und Sitzungsparameter
Um den Vonage Video Connector Transport zu verwenden, benötigen Sie:
- Applikation ID - Die Kennung Ihrer Vonage Video API-Anwendung
- Sitzungs-ID - Die ID der Video API-Sitzung, der Sie beitreten möchten
- Token - Ein gültiges Teilnehmer-Token für die Sitzung
Diese Parameter können von Ihrem Vonage Video API Dashboard abgerufen oder mithilfe der Vonage Video API Server SDKs generiert werden.
Initialisieren Sie den Transport
from pipecat.transports.vonage.video_connector import (
VonageVideoConnectorTransport,
VonageVideoConnectorTransportParams,
)
# Initialize the transport
transport = VonageVideoConnectorTransport(
application_id="your_application_id",
session_id="your_session_id",
token="your_participant_token",
params=VonageVideoConnectorTransportParams()
)
Transport-Konfiguration
Grundlegende Audio- und Videoparameter
Konfigurieren Sie den Transport für Ihre spezifischen Audio- und Videoanforderungen:
from pipecat.audio.vad.silero import SileroVADAnalyzer
transport_params = VonageVideoConnectorTransportParams(
audio_in_enabled=True, # Enable receiving audio
audio_out_enabled=True, # Enable sending audio
video_in_enabled=True, # Enable receiving video
video_out_enabled=True, # Enable sending video
publisher_name="AI Assistant", # Name for the published stream
audio_in_sample_rate=16000, # Input sample rate (Hz)
audio_in_channels=1, # Input channels
audio_out_sample_rate=24000, # Output sample rate (Hz)
audio_out_channels=1, # Output channels
video_out_width=1280, # Output video width
video_out_height=720, # Output video height
video_out_framerate=30, # Output video framerate
video_out_color_format="RGB", # Output video color format
vad_analyzer=SileroVADAnalyzer(), # Voice activity detection
audio_in_auto_subscribe=True, # Auto-subscribe to audio streams
video_in_auto_subscribe=False, # Auto-subscribe to video streams
captions_in_enabled=False, # Enable receiving captions
captions_in_auto_subscribe=False, # Auto-subscribe to caption streams
video_in_preferred_resolution=(640, 480), # Preferred input resolution
video_in_preferred_framerate=15, # Preferred input framerate
publisher_enable_opus_dtx=False, # Enable Opus DTX
session_enable_migration=False, # Enable session migration
video_connector_log_level="INFO", # Log level
clear_buffers_on_interruption=True, # Clear buffers on interruption
)
transport = VonageVideoConnectorTransport(
application_id,
session_id,
token,
params=transport_params
)
Erkennung von Sprachaktivität (VAD)
Es ist ratsam, diesen Transport mit der Sprachaktivitätserkennung zu verwenden, um die Audioverarbeitung zu optimieren:
from pipecat.audio.vad.silero import SileroVADAnalyzer
# Configure VAD for better audio processing
vad = SileroVADAnalyzer()
transport_params = VonageVideoConnectorTransportParams(
vad_analyzer=vad,
# ... other parameters
)
VAD hilft, unnötige Verarbeitung zu vermeiden, indem es erkennt, wenn Sprache im Audiostrom vorhanden ist. im Audiostrom.
Pufferlöschung bei Unterbrechungen
Die clear_buffers_on_interruption Parameter bestimmt, ob Medienpuffer
automatisch geleert werden, wenn ein Unterbrechungsrahmen in der Pipeline empfangen wird.
transport_params = VonageVideoConnectorTransportParams(
clear_buffers_on_interruption=True, # Default: True
# ... other parameters
)
Wann wird aktiviert (True, Standard):
- Konversations-KI-Applikationen, bei denen die Wiedergabe sofort gestoppt werden soll, wenn der Benutzer sie unterbricht
- Interaktive Sprachassistenten, die schnell auf Benutzereingaben reagieren müssen
- Applications, bei denen veraltete Audio-/Videodaten verworfen werden sollten, um die Interaktion in Echtzeit aufrechtzuerhalten
- Szenarien, in denen die Minimierung der Latenzzeit wichtiger ist als die Fertigstellung der Medienwiedergabe
Wann ist die Funktion zu deaktivieren? (False):
- Aufzeichnungs- oder Streaming-Anwendungen, bei denen Sie alle Medien aufbewahren möchten
- Applications, die die Wiedergabe wichtiger Informationen auch bei Unterbrechung fortsetzen müssen
- Stapelverarbeitungsszenarien, bei denen Medien ohne Unterbrechung sequentiell verarbeitet werden sollen
- Anwendungsfälle, in denen Sie eine benutzerdefinierte Logik für die Unterbrechungsbehandlung implementieren
Audio- und Videobearbeitung
Audio- und Videoeingangsverarbeitung
Der Transport konvertiert automatisch eingehende Audio- und Videodaten aus der Vonage Video-Sitzung in die internen Medienformate von Pipecat um:
# Audio and video input is handled automatically by the transport
# Incoming audio frames are converted to AudioRawFrame format
# Incoming video frames are converted to ImageRawFrame format
pipeline = Pipeline([
transport.input(), # Receives audio and video from Vonage session
# ... your AI processing pipeline
])
Erzeugung von Audio- und Videoausgaben
Senden Sie Audio- und Videodaten zurück an die Vonage Video-Sitzung:
# Audio and video output is sent automatically through the pipeline
pipeline = Pipeline([
# ... your AI processing pipeline
transport.output(), # Sends audio and video to Vonage session
])
Stream-Abonnement
Wenn der Transport Ströme von Sitzungsteilnehmern abonniert, erzeugt er Pipecat-Frames die Ihre Pipeline verarbeiten kann. Das Verhalten unterscheidet sich zwischen Audio und Video:
Video-Streams: Der Transport generiert einzelne Videobilder für jeden abonnierten Stream, identifiziert durch die Stream-ID. Auf diese Weise können Sie Videos von verschiedenen Teilnehmern getrennt in Ihrer Pipeline verarbeiten.
Audio-Streams: Standardmäßig erzeugt der Transport Audio-Frames mit allen abonnierten Audio-Streams Streams zusammengemischt. Alle Teilnehmer-Audiosignale werden zu einem einzigen Audiostrom kombiniert, den Ihre Pipeline empfängt. Für Anwendungsfälle, die Audio für einzelne Teilnehmer erfordern, siehe Einzelne Audio-Abonnenten.
Standardmäßig abonniert der Transport automatisch Ströme auf der Grundlage der audio_in_auto_subscribe
und video_in_auto_subscribe Parameter. Sie können auch manuell festlegen, welche Streams abonniert werden sollen
Sie können auch manuell festlegen, welche Streams abonniert werden sollen, um eine genauere Kontrolle zu erhalten.
Manuelles Stream-Abonnement
Wenn Sie mehr Kontrolle darüber haben möchten, welche Streams Sie abonnieren möchten, können Sie die automatische Anmeldung deaktivieren deaktivieren und bestimmte Streams manuell abonnieren:
from pipecat.transports.vonage.video_connector import SubscribeSettings
# Disable auto-subscription
transport_params = VonageVideoConnectorTransportParams(
audio_in_auto_subscribe=False,
video_in_auto_subscribe=False,
# ... other parameters
)
# Manually subscribe when a participant joins
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, data):
stream_id = data['streamId']
logger.info(f"Participant joined with stream {stream_id}, subscribing...")
await transport.subscribe_to_stream(
stream_id,
SubscribeSettings(
subscribe_to_audio=True,
subscribe_to_video=True,
preferred_resolution=(640, 480),
preferred_framerate=15
)
)
Wann ist ein automatisches Abonnement sinnvoll? (audio_in_auto_subscribe=True, video_in_auto_subscribe=True, Standard):
- Einfache Applications, bei denen Sie alle Streams von allen Teilnehmern empfangen möchten
- Sprach- oder Videoassistenten, die mit allen Sitzungsteilnehmern interagieren müssen
- Aufzeichnungs- oder Überwachungsanwendungen, die alle Teilnehmer erfassen sollen
- Anwendungsfälle, bei denen die Minimierung der Codekomplexität wichtiger ist als ein selektives Abonnement
- Applications, bei denen alle Teilnehmer gleich behandelt werden sollten
Wann ist ein manuelles Abonnement sinnvoll? (audio_in_auto_subscribe=False, video_in_auto_subscribe=False):
- Applikationen, die auf der Grundlage von Teilnehmer-Metadaten oder Sitzungslogik selektiv abonnieren müssen
- Szenarien, in denen Sie die Bandbreite optimieren möchten, indem Sie nur bestimmte Streams abonnieren
- Anwendungsfälle, die benutzerdefinierte Abonnementeinstellungen pro Teilnehmer erfordern (verschiedene Qualitätsstufen)
- Applikationen, die Teilnehmer vor der Anmeldung validieren oder authentifizieren müssen
- Komplexe Szenarien mit mehreren Teilnehmern, bei denen Sie fein abgestufte Kontrolle darüber wünschen, welche Streams Sie empfangen möchten
Kontrolle der Videoqualität mit Simulcast
Wenn Sie einen Videostream abonnieren, können Sie die Videoqualität, die Sie erhalten, mit der Option
preferred_resolution und preferred_framerate Parameter. Diese Parameter sind besonders
nützlich, wenn der Herausgeber Simulcast-Streams (mehrere Qualitätsebenen) sendet.
Simulcast-Streams mehrere räumliche und zeitliche Ebenen enthalten:
- Räumliche Schichten: Verschiedene Auflösungen (z. B. 1280x720, 640x480, 320x240)
- Zeitliche Schichten: Verschiedene Bildwiederholraten (z. B. 30fps, 15fps, 7,5fps)
Indem Sie Ihre bevorzugte Auflösung und Framerate angeben, können Sie die Bandbreitennutzung und Verarbeitungsanforderungen optimieren:
# Subscribe to a lower quality stream for bandwidth efficiency
await transport.subscribe_to_stream(
stream_id,
SubscribeSettings(
subscribe_to_video=True,
preferred_resolution=(320, 240), # Request low spatial layer
preferred_framerate=15 # Request lower temporal layer
)
)
# Subscribe to a high quality stream for better visual fidelity
await transport.subscribe_to_stream(
stream_id,
SubscribeSettings(
subscribe_to_video=True,
preferred_resolution=(1280, 720), # Request high spatial layer
preferred_framerate=30 # Request higher temporal layer
)
)
Wichtige Hinweise:
- Wenn der Verlag Simulcast nicht unterstützt oder die angeforderte Ebene nicht verfügbar ist, bietet der Server die nächstmögliche Qualität zur Verfügung
- Niedrigere Auflösungen und Frameraten verringern den Bandbreitenverbrauch und den Verarbeitungsaufwand
- Diese Einstellungen wirken sich nur auf das Videoabonnement aus; sie steuern nicht die Ausgabequalität des Herausgebers.
- Sie können auch globale Einstellungen vornehmen, indem Sie
video_in_preferred_resolutionundvideo_in_preferred_frameratein den Transportparametern für automatisch abonnierte Ströme
Einzelne Audio-Abonnenten
Beta: Diese Funktion ist derzeit als Beta-Funktion verfügbar.
Zusätzlich zu dem standardmäßig gemischten Audiostrom liefert der Transport individuelle
UserAudioRawFrame Rahmen für jeden abonnierten Teilnehmer. Jeder Rahmen ist user_id Feld
wird auf die Stream-ID des Teilnehmers gesetzt, so dass Sie die Audioquellen in Ihrer
Pipeline zu unterscheiden.
Sowohl der gemischte als auch der teilnehmerbezogene Audiopfad sind gleichzeitig aktiv - es ist keine zusätzliche ist keine zusätzliche Konfiguration erforderlich, außer der Aktivierung des Audioeingangs.
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
from pipecat.frames.frames import Frame, UserAudioRawFrame
class PerSubscriberAudioProcessor(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, UserAudioRawFrame):
logger.info(f"Audio from subscriber {frame.user_id}")
await self.push_frame(frame, direction)
Untertitel
Beta: Diese Funktion ist derzeit als Beta-Funktion verfügbar.
Der Transport unterstützt den Empfang von Untertiteln in Echtzeit von Sitzungsteilnehmern. Untertitel werden
geliefert als TranscriptionFrame (endgültig) oder InterimTranscriptionFrame (in Arbeit) und
durch die Pipeline geschoben.
Hinweis: Untertitel müssen auf Sitzungsebene über die Vonage Video API aktiviert werden, bevor sie vom Transportsystem empfangen werden können. Siehe die Live-Unterschriften Leitfaden für Einzelheiten.
Aktivieren von Untertiteln
Um Untertitel zu empfangen, stellen Sie captions_in_enabled zu True in den Transportparametern. Sie
müssen auch die Untertitel abonnieren, entweder automatisch oder manuell.
Abonnement für automatische Untertitel
transport_params = VonageVideoConnectorTransportParams(
captions_in_enabled=True,
captions_in_auto_subscribe=True,
# ... other parameters
)
Abonnement für manuelle Untertitel
Wenn Sie eine feinere Steuerung wünschen, deaktivieren Sie das automatische Abonnement und abonnieren Sie die Untertitel pro Stream:
from pipecat.transports.vonage.video_connector import SubscribeSettings
transport_params = VonageVideoConnectorTransportParams(
captions_in_enabled=True,
captions_in_auto_subscribe=False,
# ... other parameters
)
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, data):
stream_id = data['streamId']
await transport.subscribe_to_stream(
stream_id,
SubscribeSettings(
subscribe_to_audio=True,
subscribe_to_captions=True,
)
)
Verarbeitung von Beschriftungen in der Pipeline
Untertitelrahmen enthalten den transkribierten Text, die Stream-ID des Teilnehmers als user_idund
einen Zeitstempel. TranscriptionFrame eine endgültige Transkription darstellt, während
InterimTranscriptionFrame stellt eine laufende Transkription dar, die sich noch ändern kann.
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
from pipecat.frames.frames import Frame, TranscriptionFrame, InterimTranscriptionFrame
class CaptionProcessor(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
logger.info(f"[{frame.user_id}] Final: {frame.text}")
elif isinstance(frame, InterimTranscriptionFrame):
logger.info(f"[{frame.user_id}] Interim: {frame.text}")
await self.push_frame(frame, direction)
Sitzungsmanagement
Ereignisse im Lebenszyklus einer Sitzung
Behandlung von Ereignissen beim Beitritt und Verlassen von Sitzungen:
# Handle when the transport joins the session
@transport.event_handler("on_joined")
async def on_joined(transport, data):
logger.info(f"Joined session {data['sessionId']}")
# Initialize your application state
await task.queue_frames([LLMMessagesFrame()])
# Handle when the transport leaves the session
@transport.event_handler("on_left")
async def on_left(transport):
logger.info("Left session")
# Clean up resources or save session data
# Handle session errors
@transport.event_handler("on_error")
async def on_error(transport, error):
logger.error(f"Session error: {error}")
Veranstaltungen für Teilnehmer
Überwachen Sie die Teilnehmer beim Betreten und Verlassen der Sitzung:
# Handle first participant joining (useful for triggering initial interactions)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, data):
logger.info(f"First participant joined: stream {data['streamId']}")
# Start recording, initialize conversation, etc.
# Handle any participant joining
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, data):
logger.info(f"Participant joined: stream {data['streamId']}")
# Update participant list, send greeting, etc.
# Handle participant leaving
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, data):
logger.info(f"Participant left: stream {data['streamId']}")
# Update participant list, handle cleanup
Client-Verbindungsereignisse
Überwachung einzelner Stream-Teilnehmerverbindungen:
# Handle when a subscriber successfully connects to a stream
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, data):
logger.info(f"Client connected to stream {data['subscriberId']}")
# Handle when a subscriber disconnects from a stream
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, data):
logger.info(f"Client disconnected from stream {data['subscriberId']}")
Pipeline-Integration
Beispiel für eine vollständige Pipeline
Hier erfahren Sie, wie Sie den Vonage Video Connector Transport in eine komplette KI-Pipeline integrieren:
import asyncio
import os
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.aws_nova_sonic.aws import AWSNovaSonicLLMService
from pipecat.transports.vonage.video_connector import (
VonageVideoConnectorTransport,
VonageVideoConnectorTransportParams,
SubscribeSettings,
)
async def main():
# Configure transport
transport = VonageVideoConnectorTransport(
application_id=os.getenv("VONAGE_APPLICATION_ID"),
session_id=os.getenv("VONAGE_SESSION_ID"),
token=os.getenv("VONAGE_TOKEN"),
params=VonageVideoConnectorTransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
publisher_name="AI Assistant",
audio_in_sample_rate=16000,
audio_out_sample_rate=24000,
vad_analyzer=SileroVADAnalyzer(),
audio_in_auto_subscribe=True,
)
)
# Set up AI service
llm = AWSNovaSonicLLMService(
secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY", ""),
access_key_id=os.getenv("AWS_ACCESS_KEY_ID", ""),
region=os.getenv("AWS_REGION", ""),
session_token=os.getenv("AWS_SESSION_TOKEN", ""),
)
# Create context for conversation
context = OpenAILLMContext(
messages=[
{"role": "system", "content": "You are a helpful AI assistant."},
]
)
context_aggregator = llm.create_context_aggregator(context)
# Build pipeline
pipeline = Pipeline([
transport.input(), # Audio from Vonage session
context_aggregator.user(), # Process user input
llm, # AI processing
transport.output(), # Audio back to session
])
task = PipelineTask(pipeline)
# Register event handlers
@transport.event_handler("on_joined")
async def on_joined(transport, data):
print(f"Joined session: {data['sessionId']}")
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, data):
print(f"First participant joined: {data['streamId']}")
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, data):
await task.queue_frames([LLMRunFrame()])
await llm.trigger_assistant_response()
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())
Bewährte Praktiken
Optimierung der Leistung
Wählen Sie geeignete Abtastraten:
- 16kHz für Spracherkennung und die meisten KI-Dienste verwenden
- Verwenden Sie 24kHz oder höher für eine bessere Text-zu-Sprache-Qualität
- Vermeiden Sie unnötig hohe Abtastraten, die die Verarbeitungslast erhöhen.
Optimierung der Pipeline-Verarbeitung:
- Effiziente KI-Verarbeitungspipelines zur Minimierung von Latenzzeiten
- Geeignete Rahmengrößen und Pufferverwaltung verwenden
- Erwägen Sie den Einsatz von VAD, um unnötige Verarbeitung zu vermeiden
Fehlersuche und Überwachung
Protokollierung einschalten:
from loguru import logger
# Pipecat uses loguru for logging
logger.enable("pipecat")
# Set Vonage Video Connector log level
transport_params = VonageVideoConnectorTransportParams(
video_connector_log_level="DEBUG", # DEBUG, INFO, WARNING, ERROR
# ... other parameters
)
Bekannte Probleme
- In seltenen Fällen kann es vorkommen, dass die Anwendung während des Abschaltvorgangs kurzzeitig nicht mehr reagiert, insbesondere, wenn eine extrem große Anzahl von Streams gleichzeitig unveröffentlicht wird, abgemeldet und die Verbindung zwangsweise unterbrochen wird. Wir arbeiten aktiv an einer Lösung für dieses Problem.