全体アーキテクチャ

Voice AIエージェントの本質は、音声→テキスト→思考→テキスト→音声という変換パイプラインです。 LiveKit、Vapi、Retellといったフレームワークはこのパイプラインを数行で構築できますが、 内部で何が起きているかを理解しないまま使うと、レイテンシチューニングやエラーハンドリングで行き詰まります。

この記事では、フレームワークに頼らず各ステージを自分の手で繋ぎ、 「なぜフレームワークがあの設計になっているのか」を体感するためのハンズオンを行います。

graph LR
  A["🎤 ブラウザ<br/>マイク入力"] -->|PCM音声| B["WebSocket"]
  B -->|音声ストリーム| C["STT<br/>Deepgram/Whisper"]
  C -->|テキスト| D["LLM<br/>GPT-4o/Claude"]
  D -->|テキストチャンク| E["TTS<br/>ElevenLabs/OpenAI"]
  E -->|音声チャンク| F["WebSocket"]
  F -->|音声ストリーム| G["🔊 ブラウザ<br/>音声再生"]
Voice AIエージェントの全体パイプライン

ステップ1: ブラウザでのマイク入力キャプチャ

まずブラウザでマイクの音声を取得します。方法は大きく2つあります。 MediaRecorder APIはエンコード済み音声(webm/opus等)を取得でき実装が簡単ですが、 チャンク境界がコーデックに依存するため、リアルタイムSTTには不向きです。 一方Web Audio APIはPCM生データを直接取得でき、任意のサンプルレートで制御可能です。

方式 出力形式 リアルタイム性 用途
MediaRecorder API エンコード済み(webm, opus) チャンク間隔に依存 録音・ファイル保存向き
Web Audio API + AudioWorklet PCM生データ(Float32Array) フレーム単位で制御可能 リアルタイムSTT向き

Voice AIではWeb Audio API + AudioWorkletを使います。AudioWorkletはオーディオスレッドで動作し、メインスレッドをブロックしません。

// AudioWorkletProcessor: オーディオスレッドで動作
class MicProcessor extends AudioWorkletProcessor {
  process(inputs, outputs, parameters) {
    const input = inputs[0][0]; // モノラルチャンネル
    if (input) {
      // Float32 → Int16 PCM に変換(STTサービスが期待する形式)
      const pcm16 = new Int16Array(input.length);
      for (let i = 0; i < input.length; i++) {
        pcm16[i] = Math.max(-32768, Math.min(32767, input[i] * 32768));
      }
      this.port.postMessage(pcm16.buffer, [pcm16.buffer]);
    }
    return true; // trueを返してプロセッサを継続
  }
}
registerProcessor('mic-processor', MicProcessor);
// メインスレッド: マイクを取得してAudioWorkletに接続
const stream = await navigator.mediaDevices.getUserMedia({
  audio: { sampleRate: 16000, channelCount: 1, echoCancellation: true }
});

const audioCtx = new AudioContext({ sampleRate: 16000 });
await audioCtx.audioWorklet.addModule('/mic-processor.js');

const source = audioCtx.createMediaStreamSource(stream);
const workletNode = new AudioWorkletNode(audioCtx, 'mic-processor');

workletNode.port.onmessage = (event) => {
  // PCMデータをWebSocketに送信(ステップ2で実装)
  sendToWebSocket(event.data);
};

source.connect(workletNode);

ステップ2: WebSocketでのリアルタイム音声ストリーミング

HTTP/RESTでは音声のリアルタイム双方向通信は困難です。 WebSocketを使うことで、ブラウザからサーバーへの音声アップストリームと、 サーバーからブラウザへの音声ダウンストリームを同一コネクション上で実現します。

# Python サーバー側(FastAPI + WebSocket)
from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/ws/voice")
async def voice_endpoint(websocket: WebSocket):
    await websocket.accept()

    # 各ステージを非同期タスクとして起動
    stt_queue = asyncio.Queue()    # 音声 → STT
    llm_queue = asyncio.Queue()    # テキスト → LLM
    tts_queue = asyncio.Queue()    # 応答テキスト → TTS

    async def receive_audio():
        """ブラウザからの音声を受信してSTTキューに流す"""
        while True:
            data = await websocket.receive_bytes()
            await stt_queue.put(data)

    async def run_stt():
        """STTで書き起こしてLLMキューに流す"""
        # ステップ3で実装

    async def run_llm():
        """LLMで応答生成してTTSキューに流す"""
        # ステップ4で実装

    async def run_tts():
        """TTSで音声合成してブラウザに送信"""
        # ステップ5で実装

    await asyncio.gather(
        receive_audio(), run_stt(), run_llm(), run_tts()
    )

ここで重要なのはasyncio.gatherによる並行処理です。 4つのタスクは非同期キューで繋がれ、パイプラインとして同時に動作します。 STTが書き起こしている最中にLLMが前の発話に応答し、TTSが音声を合成する、という並列動作が可能になります。

ステップ3: STT統合(音声→テキスト変換)

STTサービスには大きく分けてバッチ方式ストリーミング方式があります。 Voice AIでは応答のレイテンシが体験を決定づけるため、ストリーミング方式が必須です。

サービス 方式 レイテンシ 特徴
Whisper API(OpenAI) バッチ 数秒(音声全体を送信後に処理) 高精度だがリアルタイム不向き
Deepgram Nova-2 ストリーミング 300ms以下(中間結果あり) WebSocket接続、interim results対応
Google Cloud STT v2 ストリーミング 500ms以下 gRPC接続、多言語対応
Whisper.cpp(ローカル) チャンク処理 処理時間はマシン依存 APIコスト不要、プライバシー重視
# ストリーミングSTTの実装例(Deepgram WebSocket)
import websockets
import json

async def run_stt(stt_queue, llm_queue):
    url = "wss://api.deepgram.com/v1/listen?model=nova-2&language=ja"
    headers = {"Authorization": f"Token {DEEPGRAM_API_KEY}"}

    async with websockets.connect(url, extra_headers=headers) as dg_ws:
        async def send_audio():
            while True:
                audio_chunk = await stt_queue.get()
                await dg_ws.send(audio_chunk)

        async def receive_transcript():
            async for message in dg_ws:
                result = json.loads(message)
                # is_final=True の結果のみLLMに渡す
                if result.get("is_final"):
                    transcript = result["channel"]["alternatives"][0]["transcript"]
                    if transcript.strip():
                        await llm_queue.put(transcript)

        await asyncio.gather(send_audio(), receive_transcript())

ステップ4: LLMによる応答生成

STTからテキストを受け取ったら、LLMで応答を生成します。 ここでの最大のポイントはストリーミング出力をいかに早くTTSに渡すかです。 LLMの全応答を待ってからTTSに渡すと数秒のレイテンシが生じます。 そこで、文単位でチャンク分割し、1文完成するたびにTTSキューに流します。

# LLM応答のストリーミング+文単位チャンク分割
async def run_llm(llm_queue, tts_queue):
    conversation_history = [
        {"role": "system", "content": "あなたは親切な日本語の音声アシスタントです。回答は簡潔に。"}
    ]

    while True:
        user_text = await llm_queue.get()
        conversation_history.append({"role": "user", "content": user_text})

        # ストリーミングでLLM応答を取得
        stream = openai.chat.completions.create(
            model="gpt-4o",
            messages=conversation_history,
            stream=True
        )

        # 文単位でバッファリングしてTTSに送信
        sentence_buffer = ""
        sentence_delimiters = {"。", "!", "?", ".", "!", "?", "\n"}

        for chunk in stream:
            token = chunk.choices[0].delta.content or ""
            sentence_buffer += token

            # 文末記号が見つかったら1文をTTSに送信
            if any(d in token for d in sentence_delimiters):
                await tts_queue.put(sentence_buffer.strip())
                sentence_buffer = ""

        # 残りのバッファも送信
        if sentence_buffer.strip():
            await tts_queue.put(sentence_buffer.strip())

文単位のチャンク分割は単純に見えますが、Voice AIの体感レイテンシを大きく左右します。 チャンクが大きすぎるとTTSの開始が遅れ、小さすぎると不自然な途切れが生じます。 句読点(。!?)で分割するのが日本語では良いバランスです。

ステップ5: TTSによる音声合成

TTSもSTTと同様、ストリーミング対応が重要です。 テキスト全体の合成完了を待たず、合成済みの音声チャンクを逐次送信する方式により、 TTFB(Time To First Byte)を最小化できます。

# ストリーミングTTSの実装例
async def run_tts(tts_queue, websocket):
    while True:
        text = await tts_queue.get()

        # ストリーミングTTS APIを呼び出し
        async with httpx.AsyncClient() as client:
            async with client.stream(
                "POST",
                "https://api.openai.com/v1/audio/speech",
                headers={"Authorization": f"Bearer {OPENAI_API_KEY}"},
                json={
                    "model": "tts-1",
                    "voice": "nova",
                    "input": text,
                    "response_format": "pcm"  # 生PCMで受信
                }
            ) as response:
                async for audio_chunk in response.aiter_bytes(chunk_size=4096):
                    # 音声チャンクを即座にブラウザに送信
                    await websocket.send_bytes(audio_chunk)

ステップ6: 音声の再生とバッファリング戦略

サーバーから送られてくる音声チャンクをブラウザ側で途切れなく再生するのは、 実装上最も難しいパートの一つです。 Web Audio APIのAudioBufferSourceNodeはワンショット再生しかできないため、 チャンクごとにノードを生成し、再生タイミングを正確にスケジューリングする必要があります。

// ブラウザ側: チャンクの連続再生
class AudioChunkPlayer {
  constructor(sampleRate = 24000) {
    this.audioCtx = new AudioContext({ sampleRate });
    this.nextStartTime = 0;
  }

  playChunk(pcmData) {
    // Int16 PCM → Float32 に変換
    const float32 = new Float32Array(pcmData.length);
    for (let i = 0; i < pcmData.length; i++) {
      float32[i] = pcmData[i] / 32768;
    }

    // AudioBufferを生成
    const buffer = this.audioCtx.createBuffer(1, float32.length, this.audioCtx.sampleRate);
    buffer.getChannelData(0).set(float32);

    const source = this.audioCtx.createBufferSource();
    source.buffer = buffer;
    source.connect(this.audioCtx.destination);

    // 次のチャンクの再生開始時刻を計算
    const now = this.audioCtx.currentTime;
    const startTime = Math.max(now, this.nextStartTime);
    source.start(startTime);

    // 次のチャンクはこのチャンクの終了直後に開始
    this.nextStartTime = startTime + buffer.duration;
  }

  // 割り込み(ユーザーが話し始めた時に再生を中断)
  interrupt() {
    this.audioCtx.close();
    this.audioCtx = new AudioContext({ sampleRate: 24000 });
    this.nextStartTime = 0;
  }
}

nextStartTimeでチャンク間のギャップを防ぎつつ、 ネットワーク遅延でチャンクの到着が遅れた場合はMath.max(now, this.nextStartTime)で 現在時刻まで飛ばして再生を続行します。 完全にギャップレスな再生を実現するには、数チャンク分のバッファリングを入れるのも有効です。

フレームワークが隠している複雑さ

ここまでの実装を通じて、フレームワークが抽象化してくれている以下の課題が見えてきます。

割り込み処理(Barge-in)

AIが話している最中にユーザーが割り込むと、再生中の音声を即座に停止し、 進行中のTTS・LLM処理をキャンセルし、新しいユーザー発話のSTTを開始する必要があります。 これは全ステージを横断する状態管理が必要で、素の実装では非常に複雑です。

音声区間検出(VAD)

ユーザーが「話し終わった」タイミングを判定するVAD(Voice Activity Detection)は、 単純な音量閾値では不十分です。WebRTCのVADやSileroVADなどの専用モデルを使い、 適切な無音区間(通常500ms〜1000ms)で発話終了を判定します。

エラーリカバリ

STT/LLM/TTSのどのステージでもWebSocket切断やAPIエラーが発生し得ます。 ユーザーに「もう一度お願いします」と通知しつつ、パイプライン全体を安全にリセットする処理は、 フレームワークなしでは非常に手間がかかります。

レイテンシの内訳と最適化

Voice AIの体感品質は「ユーザーが話し終わってからAIの音声が聞こえ始めるまでの時間」で決まります。 各ステージの典型的なレイテンシは以下の通りです。

ステージ 典型的なレイテンシ 最適化の方向性
VAD(発話終了検出) 500〜1000ms 無音区間の閾値を短くする(誤検出とのトレードオフ)
STT(最終結果の確定) 200〜500ms ストリーミングSTT + エンドポイント検出の高速化
LLM(最初のトークン生成) 200〜800ms 小さいモデルの使用、プロンプトの短縮
文チャンク蓄積 100〜500ms 最初の文を短くするようプロンプトで誘導
TTS(TTFB) 100〜500ms ストリーミングTTS、低レイテンシモデルの選定
ネットワーク往復 20〜100ms エッジサーバーの活用

合計すると1.1秒〜3.4秒。人間の会話では0.5秒以内の応答が自然とされるため、 各ステージを徹底的にストリーミング化し、パイプラインを並列実行することが不可欠です。

理解度チェック

問題 0 / 50%
Q1

Voice AIパイプラインでブラウザとサーバー間のリアルタイム双方向通信に使うプロトコルは___である。