ビデオ・コネクタ Pipecatとの統合

Pipecat 用 Vonage Video Connector トランスポートは、Vonage Video API セッションにシームレスに参加するアプリケーションの構築を可能にします。 アプリケーションを構築できます。このトランスポートを使用すると セッション参加者から音声と映像を受信し、処理された音声と映像をセッションに送り返すことができます。 セッションにリアルタイムで送り返すことができます。

Pipecatは、音声およびマルチモーダル会話AIアプリケーションを構築するためのフレームワークです。 Vonage Video Connectorトランスポートは、Pipecatのメディア処理パイプラインを次のようにブリッジします。 Vonage Video APIセッションを橋渡しし、幅広いユースケースを可能にします:

  • リアルタイムの音声およびビデオAIアシスタント
  • ライブテープ起こし・翻訳サービス
  • 通話録音と分析
  • オーディオとビデオのエフェクト処理
  • 自動モデレーションとコンテンツフィルタリング
  • カスタムメディアの処理と操作

トランスポートは、オーディオとビデオのフォーマット変換、セッション管理、WebRTC接続を行います。 接続を行うため、アプリケーション・ロジックの構築に集中できます。

このページには以下のセクションがあります:

パブリック・ベータ

Vonage Video Connector Pipecat統合はベータ段階です。コードは以下から入手可能です。 https://github.com/Vonage/pipecat.

必要条件

このトランスポートを使用するには、Vonage Video Connector pythonライブラリが必要です。 Linux AMD64 および ARM64 プラットフォームで動作します。

基本設定

認証とセッション・パラメーター

Vonage Video Connectorトランスポートを使用するには、以下のものが必要です:

  • アプリケーションID - Vonage Video API アプリケーション識別子
  • セッションID - 参加したい Video API セッションの ID。
  • トークン - セッションの有効な参加者トークン

これらのパラメータは、Vonage Video API ダッシュボードから取得するか、Vonage Video API サーバ SDK を使用して生成することができます。 を使用して生成できます。

トランスポートの初期化

from pipecat.transports.vonage.video_connector import (
    VonageVideoConnectorTransport,
    VonageVideoConnectorTransportParams,
)

# Initialize the transport
transport = VonageVideoConnectorTransport(
    application_id="your_application_id",
    session_id="your_session_id",
    token="your_participant_token",
    params=VonageVideoConnectorTransportParams()
)

輸送構成

基本的なオーディオとビデオのパラメーター

特定のオーディオおよびビデオ要件に合わせてトランスポートを設定します:

from pipecat.audio.vad.silero import SileroVADAnalyzer

transport_params = VonageVideoConnectorTransportParams(
    audio_in_enabled=True,                          # Enable receiving audio
    audio_out_enabled=True,                         # Enable sending audio
    video_in_enabled=True,                          # Enable receiving video
    video_out_enabled=True,                         # Enable sending video
    publisher_name="AI Assistant",                  # Name for the published stream
    audio_in_sample_rate=16000,                     # Input sample rate (Hz)
    audio_in_channels=1,                            # Input channels
    audio_out_sample_rate=24000,                    # Output sample rate (Hz)
    audio_out_channels=1,                           # Output channels
    video_out_width=1280,                           # Output video width
    video_out_height=720,                           # Output video height
    video_out_framerate=30,                         # Output video framerate
    video_out_color_format="RGB",                   # Output video color format
    vad_analyzer=SileroVADAnalyzer(),               # Voice activity detection
    audio_in_auto_subscribe=True,                   # Auto-subscribe to audio streams
    video_in_auto_subscribe=False,                  # Auto-subscribe to video streams
    video_in_preferred_resolution=(640, 480),       # Preferred input resolution
    video_in_preferred_framerate=15,                # Preferred input framerate
    publisher_enable_opus_dtx=False,                # Enable Opus DTX
    session_enable_migration=False,                 # Enable session migration
    video_connector_log_level="INFO",               # Log level
    clear_buffers_on_interruption=True,             # Clear buffers on interruption
)

transport = VonageVideoConnectorTransport(
    application_id,
    session_id,
    token,
    params=transport_params
)

音声アクティビティ検出(VAD)

音声処理を最適化するために、このトランスポートをVoice Activity Detectionと併用することをお勧めします:

from pipecat.audio.vad.silero import SileroVADAnalyzer

# Configure VAD for better audio processing
vad = SileroVADAnalyzer()
transport_params = VonageVideoConnectorTransportParams(
    vad_analyzer=vad,
    # ... other parameters
)

VADは、音声ストリームに音声が存在する場合にそれを検出することで、不要な処理を削減するのに役立ちます。 を検出することで、不要な処理を削減します。

中断時のバッファクリア

について clear_buffers_on_interruption パラメータは、パイプラインで割り込みフレームを受信したときに パイプラインで割り込みフレームを受信したときに、メディアバッファが自動的にクリアされるかどうかを決定します。

transport_params = VonageVideoConnectorTransportParams(
    clear_buffers_on_interruption=True,  # Default: True
    # ... other parameters
)

いつ有効にするか (Trueデフォルト):

  • 会話型AIアプリケーションで、ユーザーが割り込んできたときに再生を即座に止めたい場合。
  • ユーザーの入力に素早く反応する必要がある対話型音声アシスタント
  • リアルタイムのインタラクションを維持するために、古いオーディオ/ビデオを破棄する必要があるアプリケーション
  • メディア再生の完了よりも待ち時間の最小化が重要なシナリオ

無効にするタイミング (False):

  • すべてのメディアを保存したい録画またはストリーミングアプリケーション
  • 中断されても重要な情報の再生を完了する必要があるアプリケーション
  • メディアを中断することなく順次処理する必要があるバッチ処理シナリオ
  • カスタム割り込み処理ロジックを実装するユースケース

オーディオとビデオのハンドリング

オーディオおよびビデオ入力処理

トランスポートは、Vonage Videoセッションから入力されたオーディオとビデオを自動的にPipecatの内部メディアフォーマットに変換します。 をPipecatの内部メディアフォーマットに自動的に変換します:

# Audio and video input is handled automatically by the transport
# Incoming audio frames are converted to AudioRawFrame format
# Incoming video frames are converted to ImageRawFrame format
pipeline = Pipeline([
    transport.input(),     # Receives audio and video from Vonage session
    # ... your AI processing pipeline
])

オーディオおよびビデオ出力生成

音声とビデオをVonage Videoセッションに送り返します:

# Audio and video output is sent automatically through the pipeline
pipeline = Pipeline([
    # ... your AI processing pipeline
    transport.output(),    # Sends audio and video to Vonage session
])

ストリーム購読

トランスポートがセッション参加者のストリームを購読すると、パイプラインが処理できる Pipecat フレームが生成されます。 を生成します。オーディオとビデオでは動作が異なります:

ビデオストリーム:トランスポートは、加入ストリームごとに個別のビデオフレームを生成する、 を生成する。これにより、異なる参加者からのビデオをパイプライン をパイプラインで個別に処理できます。

オーディオ・ストリーム:トランスポートは現在、すべてのサブスクライブされたオーディオストリームをミックスしたオーディオフレームを生成している。 をミックスしたオーディオフレームを生成している。すべての参加者オーディオは、パイプラインが受信する1つのオーディオストリームに結合されます。 パイプラインが受信します。

デフォルトでは、トランスポートは自動的に audio_in_auto_subscribe そして video_in_auto_subscribe パラメータを使用します。どのストリームをサブスクライブするかを手動で制御することもできる。 を手動で制御することもできます。

手動ストリーム購読

購読するストリームをより詳細に制御する必要がある場合は、自動購読を無効にして、特定のストリームを手動で購読することができます。 を無効にし、特定のストリームを手動で購読することができます:

from pipecat.transports.vonage.video_connector import SubscribeSettings

# Disable auto-subscription
transport_params = VonageVideoConnectorTransportParams(
    audio_in_auto_subscribe=False,
    video_in_auto_subscribe=False,
    # ... other parameters
)

# Manually subscribe when a participant joins
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, data):
    stream_id = data['streamId']
    logger.info(f"Participant joined with stream {stream_id}, subscribing...")
    await transport.subscribe_to_stream(
        stream_id,
        SubscribeSettings(
            subscribe_to_audio=True,
            subscribe_to_video=True,
            preferred_resolution=(640, 480),
            preferred_framerate=15
        )
    )

自動購読を利用する場合 (audio_in_auto_subscribe=True, video_in_auto_subscribe=Trueデフォルト):

  • すべての参加者からすべてのストリームを受信したいシンプルなアプリケーション
  • セッションの参加者全員と対話する必要のある音声アシスタントやビデオアシスタント
  • すべての参加者をキャプチャする必要がある記録または監視アプリケーション
  • コードの複雑さを最小限に抑えることが選択的購読よりも重要な使用例
  • すべての参加者が平等に扱われるべきアプリケーション

手動購読を利用する場合 (audio_in_auto_subscribe=False, video_in_auto_subscribe=False):

  • 参加者のメタデータやセッションロジックに基づいて選択的に購読する必要があるアプリケーション
  • 特定のストリームのみを購読して帯域幅を最適化したいシナリオ
  • 参加者ごとのカスタム購読設定が必要な使用例(異なる品質レベル)
  • 加入前に参加者の確認や認証が必要なアプリケーション
  • 受信するストリームをきめ細かく制御したい複雑なマルチパーティシナリオ

サイマルキャストによるビデオ品質のコントロール

ビデオストリームに加入する際、受信するビデオの画質は preferred_resolution そして preferred_framerate パラメータがある。これらのパラメータは これらのパラメータは、パブリッシャーが サイマルキャスト・ストリーム(複数の品質レイヤー)を送信する場合に特に有 用である。

サイマルキャスト・ストリーム 複数の空間的、時間的レイヤーを含む:

  • 空間レイヤー:異なる解像度(例:1280x720、640x480、320x240)
  • 時間レイヤー:異なるフレームレート(例:30fps、15fps、7.5fps)

お好みの解像度とフレームレートを指定することで、帯域幅の使用量と処理要件を最適化できます。 処理要件を最適化できます:

# Subscribe to a lower quality stream for bandwidth efficiency
await transport.subscribe_to_stream(
    stream_id,
    SubscribeSettings(
        subscribe_to_video=True,
        preferred_resolution=(320, 240),  # Request low spatial layer
        preferred_framerate=15            # Request lower temporal layer
    )
)

# Subscribe to a high quality stream for better visual fidelity
await transport.subscribe_to_stream(
    stream_id,
    SubscribeSettings(
        subscribe_to_video=True,
        preferred_resolution=(1280, 720),  # Request high spatial layer
        preferred_framerate=30             # Request higher temporal layer
    )
)

重要な注意事項

  • パブリッシャーがサイマルキャストをサポートしていないか、要求されたレイヤーが利用できない場合、サーバーは は最も近い品質のものを提供します。
  • 低解像度とフレームレートにより、帯域幅の消費と処理のオーバーヘッドを削減
  • これらの設定はビデオ配信にのみ影響し、パブリッシャーの出力品質をコントロールするものではありません。
  • を使用してグローバルプリファレンスを設定することもできます。 video_in_preferred_resolution そして video_in_preferred_framerate 自動購読ストリームのトランスポートパラメータで

セッション管理

セッション・ライフサイクル・イベント

セッションの参加と離脱のイベントを処理する:

# Handle when the transport joins the session
@transport.event_handler("on_joined")
async def on_joined(transport, data):
    logger.info(f"Joined session {data['sessionId']}")
    # Initialize your application state
    await task.queue_frames([LLMMessagesFrame()])

# Handle when the transport leaves the session
@transport.event_handler("on_left")
async def on_left(transport):
    logger.info("Left session")
    # Clean up resources or save session data

# Handle session errors
@transport.event_handler("on_error")
async def on_error(transport, error):
    logger.error(f"Session error: {error}")

参加者イベント

セッションへの参加者と退出者を監視する:

# Handle first participant joining (useful for triggering initial interactions)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, data):
    logger.info(f"First participant joined: stream {data['streamId']}")
    # Start recording, initialize conversation, etc.

# Handle any participant joining
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, data):
    logger.info(f"Participant joined: stream {data['streamId']}")
    # Update participant list, send greeting, etc.

# Handle participant leaving
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, data):
    logger.info(f"Participant left: stream {data['streamId']}")
    # Update participant list, handle cleanup

クライアント接続イベント

個々のストリーム加入者の接続を監視する:

# Handle when a subscriber successfully connects to a stream
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, data):
    logger.info(f"Client connected to stream {data['subscriberId']}")

# Handle when a subscriber disconnects from a stream
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, data):
    logger.info(f"Client disconnected from stream {data['subscriberId']}")

パイプラインの統合

パイプラインの例

Vonage Video Connectorトランスポートを完全なAIパイプラインと統合する方法をご紹介します:

import asyncio
import os

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.aws_nova_sonic.aws import AWSNovaSonicLLMService
from pipecat.transports.vonage.video_connector import (
    VonageVideoConnectorTransport,
    VonageVideoConnectorTransportParams,
    SubscribeSettings,
)

async def main():
    # Configure transport
    transport = VonageVideoConnectorTransport(
        application_id=os.getenv("VONAGE_APPLICATION_ID"),
        session_id=os.getenv("VONAGE_SESSION_ID"),
        token=os.getenv("VONAGE_TOKEN"),
        params=VonageVideoConnectorTransportParams(
            audio_in_enabled=True,
            audio_out_enabled=True,
            publisher_name="AI Assistant",
            audio_in_sample_rate=16000,
            audio_out_sample_rate=24000,
            vad_analyzer=SileroVADAnalyzer(),
            audio_in_auto_subscribe=True,
        )
    )

    # Set up AI service
    llm = AWSNovaSonicLLMService(
        secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY", ""),
        access_key_id=os.getenv("AWS_ACCESS_KEY_ID", ""),
        region=os.getenv("AWS_REGION", ""),
        session_token=os.getenv("AWS_SESSION_TOKEN", ""),
    )

    # Create context for conversation
    context = OpenAILLMContext(
        messages=[
            {"role": "system", "content": "You are a helpful AI assistant."},
        ]
    )
    context_aggregator = llm.create_context_aggregator(context)

    # Build pipeline
    pipeline = Pipeline([
        transport.input(),           # Audio from Vonage session
        context_aggregator.user(),   # Process user input
        llm,                        # AI processing
        transport.output(),         # Audio back to session
    ])

    task = PipelineTask(pipeline)

    # Register event handlers
    @transport.event_handler("on_joined")
    async def on_joined(transport, data):
        print(f"Joined session: {data['sessionId']}")

    @transport.event_handler("on_first_participant_joined")
    async def on_first_participant_joined(transport, data):
        print(f"First participant joined: {data['streamId']}")

    @transport.event_handler("on_client_connected")
    async def on_client_connected(transport, data):
        await task.queue_frames([LLMRunFrame()])
        await llm.trigger_assistant_response()


    runner = PipelineRunner()
    await runner.run(task)

if __name__ == "__main__":
    asyncio.run(main())

ベストプラクティス

パフォーマンスの最適化

  1. 適切なサンプルレートを選択する:

    • 音声認識とほとんどのAIサービスには16kHzを使用
    • 音声合成の品質を高めるには、24kHz以上を使用する。
    • 処理負荷を高める不必要な高サンプルレートは避ける
  2. パイプライン処理の最適化:

    • AI処理パイプラインを効率的に保ち、待ち時間を最小限に抑える
    • 適切なフレームサイズとバッファ管理
    • 不必要な処理を減らすためにVADの使用を検討する

デバッグとモニタリング

ロギングを有効にする:

from loguru import logger

# Pipecat uses loguru for logging
logger.enable("pipecat")

# Set Vonage Video Connector log level
transport_params = VonageVideoConnectorTransportParams(
    video_connector_log_level="DEBUG",  # DEBUG, INFO, WARNING, ERROR
    # ... other parameters
)