Videoanschluss Pipecat Integration

Mit dem Vonage Video Connector für Pipecat können Sie Anwendungen erstellen, die nahtlos an Vonage Video API Sitzungen teilnehmen. Dieser Transport ermöglicht Ihnen den Empfang von Audio- und Videodaten von Sitzungsteilnehmern empfangen und verarbeitete Audio- und Videodaten in Echtzeit an die Sitzung in Echtzeit.

Pipecat ist ein Framework für die Entwicklung von sprachgesteuerten und multimodalen KI-Applikationen. Der Vonage Video Connector überbrückt die Medienverarbeitungspipeline von Pipecat mit Vonage Video API-Sitzungen und ermöglicht so eine breite Palette von Anwendungsfällen:

  • KI-Assistenten für Sprache und Video in Echtzeit
  • Live-Transkriptions- und Übersetzungsdienste
  • Aufzeichnung und Analyse von Anrufen
  • Bearbeitung von Audio- und Videoeffekten
  • Automatisierte Moderation und Inhaltsfilterung
  • Benutzerdefinierte Medienverarbeitung und -manipulation

Der Transport übernimmt die Konvertierung von Audio- und Videoformaten, die Sitzungsverwaltung und die WebRTC Konnektivität, so dass Sie sich auf die Entwicklung Ihrer Anwendungslogik konzentrieren können.

Diese Seite enthält die folgenden Abschnitte:

Private Beta

Die Vonage Video Connector Pipecat Integration befindet sich in der Beta-Phase. Kontaktieren Sie uns, um frühen Zugang zu erhalten.

Anforderungen

Um diesen Transport zu nutzen, benötigen Sie die Vonage Video Connector Python-Bibliothek, die auf Linux AMD64 und ARM64 Plattformen läuft.

Grundlegende Einrichtung

Authentifizierung und Sitzungsparameter

Um den Vonage Video Connector Transport zu verwenden, benötigen Sie:

  • Applikation ID - Die Kennung Ihrer Vonage Video API-Anwendung
  • Sitzungs-ID - Die ID der Video API-Sitzung, der Sie beitreten möchten
  • Token - Ein gültiges Teilnehmer-Token für die Sitzung

Diese Parameter können von Ihrem Vonage Video API Dashboard abgerufen oder mithilfe der Vonage Video API Server SDKs generiert werden.

Initialisieren Sie den Transport

from pipecat.transports.vonage.video_connector import (
    VonageVideoConnectorTransport,
    VonageVideoConnectorTransportParams,
)

# Initialize the transport
transport = VonageVideoConnectorTransport(
    application_id="your_application_id",
    session_id="your_session_id",
    token="your_participant_token",
    params=VonageVideoConnectorTransportParams()
)

Transport-Konfiguration

Grundlegende Audio- und Videoparameter

Konfigurieren Sie den Transport für Ihre spezifischen Audio- und Videoanforderungen:

from pipecat.audio.vad.silero import SileroVADAnalyzer

transport_params = VonageVideoConnectorTransportParams(
    audio_in_enabled=True,                          # Enable receiving audio
    audio_out_enabled=True,                         # Enable sending audio
    video_in_enabled=True,                          # Enable receiving video
    video_out_enabled=True,                         # Enable sending video
    publisher_name="AI Assistant",                  # Name for the published stream
    audio_in_sample_rate=16000,                     # Input sample rate (Hz)
    audio_in_channels=1,                            # Input channels
    audio_out_sample_rate=24000,                    # Output sample rate (Hz)
    audio_out_channels=1,                           # Output channels
    video_out_width=1280,                           # Output video width
    video_out_height=720,                           # Output video height
    video_out_framerate=30,                         # Output video framerate
    video_out_color_format="RGB",                   # Output video color format
    vad_analyzer=SileroVADAnalyzer(),               # Voice activity detection
    audio_in_auto_subscribe=True,                   # Auto-subscribe to audio streams
    video_in_auto_subscribe=False,                  # Auto-subscribe to video streams
    video_in_preferred_resolution=(640, 480),       # Preferred input resolution
    video_in_preferred_framerate=15,                # Preferred input framerate
    publisher_enable_opus_dtx=False,                # Enable Opus DTX
    session_enable_migration=False,                 # Enable session migration
    video_connector_log_level="INFO",               # Log level
    clear_buffers_on_interruption=True,             # Clear buffers on interruption
)

transport = VonageVideoConnectorTransport(
    application_id,
    session_id,
    token,
    params=transport_params
)

Erkennung von Sprachaktivität (VAD)

Es ist ratsam, diesen Transport mit der Sprachaktivitätserkennung zu verwenden, um die Audioverarbeitung zu optimieren:

from pipecat.audio.vad.silero import SileroVADAnalyzer

# Configure VAD for better audio processing
vad = SileroVADAnalyzer()
transport_params = VonageVideoConnectorTransportParams(
    vad_analyzer=vad,
    # ... other parameters
)

VAD hilft, unnötige Verarbeitung zu vermeiden, indem es erkennt, wenn Sprache im Audiostrom vorhanden ist. im Audiostrom.

Pufferlöschung bei Unterbrechungen

Die clear_buffers_on_interruption Parameter bestimmt, ob Medienpuffer automatisch geleert werden, wenn ein Unterbrechungsrahmen in der Pipeline empfangen wird.

transport_params = VonageVideoConnectorTransportParams(
    clear_buffers_on_interruption=True,  # Default: True
    # ... other parameters
)

Wann wird aktiviert (True, Standard):

  • Konversations-KI-Applikationen, bei denen die Wiedergabe sofort gestoppt werden soll, wenn der Benutzer sie unterbricht
  • Interaktive Sprachassistenten, die schnell auf Benutzereingaben reagieren müssen
  • Applications, bei denen veraltete Audio-/Videodaten verworfen werden sollten, um die Interaktion in Echtzeit aufrechtzuerhalten
  • Szenarien, in denen die Minimierung der Latenzzeit wichtiger ist als die Fertigstellung der Medienwiedergabe

Wann ist die Funktion zu deaktivieren? (False):

  • Aufzeichnungs- oder Streaming-Anwendungen, bei denen Sie alle Medien aufbewahren möchten
  • Applications, die die Wiedergabe wichtiger Informationen auch bei Unterbrechung fortsetzen müssen
  • Stapelverarbeitungsszenarien, bei denen Medien ohne Unterbrechung sequentiell verarbeitet werden sollen
  • Anwendungsfälle, in denen Sie eine benutzerdefinierte Logik für die Unterbrechungsbehandlung implementieren

Audio- und Videobearbeitung

Audio- und Videoeingangsverarbeitung

Der Transport konvertiert automatisch eingehende Audio- und Videodaten aus der Vonage Video-Sitzung in die internen Medienformate von Pipecat um:

# Audio and video input is handled automatically by the transport
# Incoming audio frames are converted to AudioRawFrame format
# Incoming video frames are converted to ImageRawFrame format
pipeline = Pipeline([
    transport.input(),     # Receives audio and video from Vonage session
    # ... your AI processing pipeline
])

Erzeugung von Audio- und Videoausgaben

Senden Sie Audio- und Videodaten zurück an die Vonage Video-Sitzung:

# Audio and video output is sent automatically through the pipeline
pipeline = Pipeline([
    # ... your AI processing pipeline
    transport.output(),    # Sends audio and video to Vonage session
])

Stream-Abonnement

Wenn der Transport Ströme von Sitzungsteilnehmern abonniert, erzeugt er Pipecat-Frames die Ihre Pipeline verarbeiten kann. Das Verhalten unterscheidet sich zwischen Audio und Video:

Video-Streams: Der Transport generiert einzelne Videobilder für jeden abonnierten Stream, identifiziert durch die Stream-ID. Auf diese Weise können Sie Videos von verschiedenen Teilnehmern getrennt in Ihrer Pipeline verarbeiten.

Audio-Streams: Der Transport erzeugt derzeit Audio-Frames mit allen abonnierten Audio Streams zusammengemischt. Alle Teilnehmer-Audiosignale werden zu einem einzigen Audiostrom kombiniert, den Ihre Pipeline empfängt.

Standardmäßig abonniert der Transport automatisch Ströme auf der Grundlage der audio_in_auto_subscribe und video_in_auto_subscribe Parameter. Sie können auch manuell festlegen, welche Streams abonniert werden sollen Sie können auch manuell festlegen, welche Streams abonniert werden sollen, um eine genauere Kontrolle zu erhalten.

Manuelles Stream-Abonnement

Wenn Sie mehr Kontrolle darüber haben möchten, welche Streams Sie abonnieren möchten, können Sie die automatische Anmeldung deaktivieren deaktivieren und bestimmte Streams manuell abonnieren:

from pipecat.transports.vonage.video_connector import SubscribeSettings

# Disable auto-subscription
transport_params = VonageVideoConnectorTransportParams(
    audio_in_auto_subscribe=False,
    video_in_auto_subscribe=False,
    # ... other parameters
)

# Manually subscribe when a participant joins
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, data):
    stream_id = data['streamId']
    logger.info(f"Participant joined with stream {stream_id}, subscribing...")
    await transport.subscribe_to_stream(
        stream_id,
        SubscribeSettings(
            subscribe_to_audio=True,
            subscribe_to_video=True,
            preferred_resolution=(640, 480),
            preferred_framerate=15
        )
    )

Wann ist ein automatisches Abonnement sinnvoll? (audio_in_auto_subscribe=True, video_in_auto_subscribe=True, Standard):

  • Einfache Applications, bei denen Sie alle Streams von allen Teilnehmern empfangen möchten
  • Sprach- oder Videoassistenten, die mit allen Sitzungsteilnehmern interagieren müssen
  • Aufzeichnungs- oder Überwachungsanwendungen, die alle Teilnehmer erfassen sollen
  • Anwendungsfälle, bei denen die Minimierung der Codekomplexität wichtiger ist als ein selektives Abonnement
  • Applications, bei denen alle Teilnehmer gleich behandelt werden sollten

Wann ist ein manuelles Abonnement sinnvoll? (audio_in_auto_subscribe=False, video_in_auto_subscribe=False):

  • Applikationen, die auf der Grundlage von Teilnehmer-Metadaten oder Sitzungslogik selektiv abonnieren müssen
  • Szenarien, in denen Sie die Bandbreite optimieren möchten, indem Sie nur bestimmte Streams abonnieren
  • Anwendungsfälle, die benutzerdefinierte Abonnementeinstellungen pro Teilnehmer erfordern (verschiedene Qualitätsstufen)
  • Applikationen, die Teilnehmer vor der Anmeldung validieren oder authentifizieren müssen
  • Komplexe Szenarien mit mehreren Teilnehmern, bei denen Sie fein abgestufte Kontrolle darüber wünschen, welche Streams Sie empfangen möchten

Kontrolle der Videoqualität mit Simulcast

Wenn Sie einen Videostream abonnieren, können Sie die Videoqualität, die Sie erhalten, mit der Option preferred_resolution und preferred_framerate Parameter. Diese Parameter sind besonders nützlich, wenn der Herausgeber Simulcast-Streams (mehrere Qualitätsebenen) sendet.

Simulcast-Streams mehrere räumliche und zeitliche Ebenen enthalten:

  • Räumliche Schichten: Verschiedene Auflösungen (z. B. 1280x720, 640x480, 320x240)
  • Zeitliche Schichten: Verschiedene Bildwiederholraten (z. B. 30fps, 15fps, 7,5fps)

Indem Sie Ihre bevorzugte Auflösung und Framerate angeben, können Sie die Bandbreitennutzung und Verarbeitungsanforderungen optimieren:

# Subscribe to a lower quality stream for bandwidth efficiency
await transport.subscribe_to_stream(
    stream_id,
    SubscribeSettings(
        subscribe_to_video=True,
        preferred_resolution=(320, 240),  # Request low spatial layer
        preferred_framerate=15            # Request lower temporal layer
    )
)

# Subscribe to a high quality stream for better visual fidelity
await transport.subscribe_to_stream(
    stream_id,
    SubscribeSettings(
        subscribe_to_video=True,
        preferred_resolution=(1280, 720),  # Request high spatial layer
        preferred_framerate=30             # Request higher temporal layer
    )
)

Wichtige Hinweise:

  • Wenn der Verlag Simulcast nicht unterstützt oder die angeforderte Ebene nicht verfügbar ist, bietet der Server die nächstmögliche Qualität zur Verfügung
  • Niedrigere Auflösungen und Frameraten verringern den Bandbreitenverbrauch und den Verarbeitungsaufwand
  • Diese Einstellungen wirken sich nur auf das Videoabonnement aus; sie steuern nicht die Ausgabequalität des Herausgebers.
  • Sie können auch globale Einstellungen vornehmen, indem Sie video_in_preferred_resolution und video_in_preferred_framerate in den Transportparametern für automatisch abonnierte Ströme

Sitzungsmanagement

Ereignisse im Lebenszyklus einer Sitzung

Behandlung von Ereignissen beim Beitritt und Verlassen von Sitzungen:

# Handle when the transport joins the session
@transport.event_handler("on_joined")
async def on_joined(transport, data):
    logger.info(f"Joined session {data['sessionId']}")
    # Initialize your application state
    await task.queue_frames([LLMMessagesFrame()])

# Handle when the transport leaves the session
@transport.event_handler("on_left")
async def on_left(transport):
    logger.info("Left session")
    # Clean up resources or save session data

# Handle session errors
@transport.event_handler("on_error")
async def on_error(transport, error):
    logger.error(f"Session error: {error}")

Veranstaltungen für Teilnehmer

Überwachen Sie die Teilnehmer beim Betreten und Verlassen der Sitzung:

# Handle first participant joining (useful for triggering initial interactions)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, data):
    logger.info(f"First participant joined: stream {data['streamId']}")
    # Start recording, initialize conversation, etc.

# Handle any participant joining
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, data):
    logger.info(f"Participant joined: stream {data['streamId']}")
    # Update participant list, send greeting, etc.

# Handle participant leaving
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, data):
    logger.info(f"Participant left: stream {data['streamId']}")
    # Update participant list, handle cleanup

Client-Verbindungsereignisse

Überwachung einzelner Stream-Teilnehmerverbindungen:

# Handle when a subscriber successfully connects to a stream
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, data):
    logger.info(f"Client connected to stream {data['subscriberId']}")

# Handle when a subscriber disconnects from a stream
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, data):
    logger.info(f"Client disconnected from stream {data['subscriberId']}")

Pipeline-Integration

Beispiel für eine vollständige Pipeline

Hier erfahren Sie, wie Sie den Vonage Video Connector Transport in eine komplette KI-Pipeline integrieren:

import asyncio
import os

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.aws_nova_sonic.aws import AWSNovaSonicLLMService
from pipecat.transports.vonage.video_connector import (
    VonageVideoConnectorTransport,
    VonageVideoConnectorTransportParams,
    SubscribeSettings,
)

async def main():
    # Configure transport
    transport = VonageVideoConnectorTransport(
        application_id=os.getenv("VONAGE_APPLICATION_ID"),
        session_id=os.getenv("VONAGE_SESSION_ID"),
        token=os.getenv("VONAGE_TOKEN"),
        params=VonageVideoConnectorTransportParams(
            audio_in_enabled=True,
            audio_out_enabled=True,
            publisher_name="AI Assistant",
            audio_in_sample_rate=16000,
            audio_out_sample_rate=24000,
            vad_analyzer=SileroVADAnalyzer(),
            audio_in_auto_subscribe=True,
        )
    )

    # Set up AI service
    llm = AWSNovaSonicLLMService(
        secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY", ""),
        access_key_id=os.getenv("AWS_ACCESS_KEY_ID", ""),
        region=os.getenv("AWS_REGION", ""),
        session_token=os.getenv("AWS_SESSION_TOKEN", ""),
    )

    # Create context for conversation
    context = OpenAILLMContext(
        messages=[
            {"role": "system", "content": "You are a helpful AI assistant."},
        ]
    )
    context_aggregator = llm.create_context_aggregator(context)

    # Build pipeline
    pipeline = Pipeline([
        transport.input(),           # Audio from Vonage session
        context_aggregator.user(),   # Process user input
        llm,                        # AI processing
        transport.output(),         # Audio back to session
    ])

    task = PipelineTask(pipeline)

    # Register event handlers
    @transport.event_handler("on_joined")
    async def on_joined(transport, data):
        print(f"Joined session: {data['sessionId']}")

    @transport.event_handler("on_first_participant_joined")
    async def on_first_participant_joined(transport, data):
        print(f"First participant joined: {data['streamId']}")

    @transport.event_handler("on_client_connected")
    async def on_client_connected(transport, data):
        await task.queue_frames([LLMRunFrame()])
        await llm.trigger_assistant_response()


    runner = PipelineRunner()
    await runner.run(task)

if __name__ == "__main__":
    asyncio.run(main())

Bewährte Praktiken

Optimierung der Leistung

  1. Wählen Sie geeignete Abtastraten:

    • 16kHz für Spracherkennung und die meisten KI-Dienste verwenden
    • Verwenden Sie 24kHz oder höher für eine bessere Text-zu-Sprache-Qualität
    • Vermeiden Sie unnötig hohe Abtastraten, die die Verarbeitungslast erhöhen.
  2. Optimierung der Pipeline-Verarbeitung:

    • Effiziente KI-Verarbeitungspipelines zur Minimierung von Latenzzeiten
    • Geeignete Rahmengrößen und Pufferverwaltung verwenden
    • Erwägen Sie den Einsatz von VAD, um unnötige Verarbeitung zu vermeiden

Fehlersuche und Überwachung

Protokollierung einschalten:

from loguru import logger

# Pipecat uses loguru for logging
logger.enable("pipecat")

# Set Vonage Video Connector log level
transport_params = VonageVideoConnectorTransportParams(
    video_connector_log_level="DEBUG",  # DEBUG, INFO, WARNING, ERROR
    # ... other parameters
)