skip to Main Content

I have two client codes to pick the audio and stream it to server through websocket, One is using ScriptProcessor and another one through MediaRecorder functions in Javascript, The task in the server is to pick this audio chunks and send it to azure realtime speech-to-text API for transcription and diarization.

The problem I face is that the client code with ScriptProcessor works fine and we get the transcriptions perfectly but it seems ScriptProcessor does put heavy work on CPUs of the machine. So, We decided to move on with it and tried with MediaRecorder but, Here the transcriptions are always None or no transcriptions happening.

I have provided two of client snippets and also a minimal server code to reproduce the issue, The one difference I notice between these client codes, ScriptProcessors cuts using bytesizes whereas Mediarecorder cuts as times in milliseconds.

Any help would be appreciated

Working Client Code with ScriptProcessor

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Audio Streaming Client</title>
</head>
<body>
    <h1>Audio Streaming Client</h1>
    <button id="startButton">Start Streaming</button>
    <button id="stopButton" disabled>Stop Streaming</button>

    <script>
        let audioContext;
        let mediaStream;
        let source;
        let processor;
        let socket;

        const startButton = document.getElementById('startButton');
        const stopButton = document.getElementById('stopButton');

        startButton.addEventListener('click', async () => {
            startButton.disabled =[enter image description here](https://i.sstatic.net/JvYNS2C9.png) true;
            stopButton.disabled = false;

            // Initialize WebSocket
            socket = new WebSocket('ws://localhost:8000');

            socket.onopen = async () => {
                // Create an AudioContext with a specific sample rate
                audioContext = new (window.AudioContext || window.webkitAudioContext)();

                // Get access to the microphone
                mediaStream = await navigator.mediaDevices.getUserMedia({ audio: true });

                // Create a MediaStreamSource from the microphone input
                source = audioContext.createMediaStreamSource(mediaStream);

                // Create a ScriptProcessorNode with a buffer size of 4096, one input and one output channel
                processor = audioContext.createScriptProcessor(1024, 1, 1);

                // Connect the microphone source to the processor node
                source.connect(processor);

                // Handle audio processing and send the data through WebSocket
                processor.onaudioprocess = function (e) {
                    // const inputData = e.inputBuffer.getChannelData(0);
                    // const outputData = new Int16Array(inputData.length);

                    // // Convert Float32Array to Int16Array
                    // for (let i = 0; i < inputData.length; i++) {
                    //     outputData[i] = Math.min(1, Math.max(-1, inputData[i])) * 0x7FFF;
                    // }

                    if (socket.readyState === WebSocket.OPEN) {
                        socket.send(e.inputBuffer.getChannelData(0));
                    }
                };

                // Connect the processor node to the destination (optional, for monitoring)
                processor.connect(audioContext.destination);
            };

            socket.onerror = function (error) {
                console.error('WebSocket Error: ', error);
            };
        });

        stopButton.addEventListener('click', () => {
            stopButton.disabled = true;
            startButton.disabled = false;

            if (processor) {
                processor.disconnect();
            }
            if (source) {
                source.disconnect();
            }
            if (audioContext) {
                audioContext.close();
            }
            if (socket) {
                socket.close();
            }

            if (mediaStream) {
                mediaStream.getTracks().forEach(track => track.stop());
            }
        });
    </script>
</body>
</html>

Not working client code with mediarecorder

const connectButton = document.getElementById("connectButton");
const startButton = document.getElementById("startButton");
const stopButton = document.getElementById("stopButton");
let mediaRecorder;
let socket;

connectButton.addEventListener("click", () => {
  socket = new WebSocket("ws://localhost:8000");

  socket.addEventListener("open", () => {
    console.log("Connected to server");
    connectButton.disabled = true;
    startButton.disabled = false;
  });

  socket.addEventListener("close", () => {
    console.log("Disconnected from server");
    connectButton.disabled = false;
    startButton.disabled = true;
    stopButton.disabled = true;
  });
});

startButton.addEventListener("click", async () => {
  const stream = await navigator.mediaDevices.getUserMedia({ audio: true });
  mediaRecorder = new MediaRecorder(stream);

  mediaRecorder.ondataavailable = (event) => {
    if (event.data.size > 0 && socket && socket.readyState === WebSocket.OPEN) {
      socket.send(event.data);
      console.log("audio sent");
    }
  };

  mediaRecorder.start(100); // Collect audio in chunks of 100ms

  startButton.disabled = true;
  stopButton.disabled = false;
});

stopButton.addEventListener("click", () => {
  if (mediaRecorder) {
    mediaRecorder.stop();
  }
  if (socket) {
    socket.close();
  }
  startButton.disabled = false;
  stopButton.disabled = true;
});

Simple Server code with all downsampling and preprocessing functions

import asyncio
import websockets
import os
import datetime
import soxr
import numpy as np
from pydub import AudioSegment
from io import BytesIO
from scipy.io.wavfile import write
from scipy.signal import resample
import azure.cognitiveservices.speech as speechsdk
from dotenv import load_dotenv


load_dotenv()
speech_key = os.getenv("SPEECH_KEY")
speech_region = os.getenv("SPEECH_REGION")


write_stream = None
buffer = None
write_stream_sampled = None

def downsample_audio(byte_chunk, original_rate, target_rate, num_channels=1):
    """
    Downsample an audio byte chunk.
    
    Args:
        byte_chunk (bytes): Audio data in bytes format.
        original_rate (int): Original sample rate of the audio.
        target_rate (int): Target sample rate after downsampling.
        num_channels (int): Number of audio channels (1 for mono, 2 for stereo).
        
    Returns:
        bytes: Downsampled audio data in bytes.
    """
    audio_data = np.frombuffer(byte_chunk, dtype=np.int16)
    
    if num_channels == 2:
        # Reshape for stereo
        audio_data = audio_data.reshape(-1, 2)
    
    # Calculate the number of samples in the downsampled audio
    num_samples = int(len(audio_data) * target_rate / original_rate)
    
    # Downsample the audio
    downsampled_audio = resample(audio_data, num_samples)
    
    # Ensure the data is in int16 format
    downsampled_audio = np.round(downsampled_audio).astype(np.int16)
    
    # Convert back to bytes
    downsampled_bytes = downsampled_audio.tobytes()
    
    return downsampled_bytes


def setup_azure_service():
    

    speech_config = speechsdk.SpeechConfig(
        subscription=speech_key,
        region=speech_region,
    )

    # azure service logging to find cancellation issues
    speech_config.set_property(
        speechsdk.PropertyId.Speech_LogFilename, "azure_speech_sdk.log"
    )

    speech_config.enable_audio_logging()
    
    speech_config.set_property(
        property_id=speechsdk.PropertyId.SpeechServiceConnection_LanguageIdMode,
        value="Continuous",
    )
    speech_config.set_property_by_name("maxSpeakerCount", str(8))

    speech_config.request_word_level_timestamps()

    auto_detect_lang_config = speechsdk.AutoDetectSourceLanguageConfig(
        languages=["en-US", "es-ES"]
    )

    audio_stream_format = speechsdk.audio.AudioStreamFormat(
        samples_per_second=16000
    )
    push_stream = speechsdk.audio.PushAudioInputStream(
        stream_format=audio_stream_format
    )

    audio_config = speechsdk.audio.AudioConfig(stream=push_stream)

    transcriber = speechsdk.transcription.ConversationTranscriber(
        speech_config=speech_config,
        audio_config=audio_config,
        auto_detect_source_language_config=auto_detect_lang_config
    )

    def start_callback(evt):
        print("Session started")
    
    def transcribed(evt):
        if evt.result.reason == speechsdk.ResultReason.RecognizedSpeech:
            det_lang = evt.result.properties[
                speechsdk.PropertyId.SpeechServiceConnection_AutoDetectSourceLanguageResult
            ]
            transcribed_text = evt.result.text
            speaker_id = evt.result.speaker_id

            print(f"Language: {det_lang}")
            print("tText={}".format(transcribed_text))
            print("tSpeaker ID={}".format(speaker_id))
        
    
    transcriber.session_started.connect(start_callback)
    transcriber.transcribed.connect(transcribed)

    return transcriber, push_stream


async def handle_client_connection(websocket, path):
    global write_stream
    global buffer
    global write_stream_sampled


    print("Client connected")

    transcriber, push_stream = setup_azure_service()

    transcriber.start_transcribing_async().get()


    try:

        async for message in websocket:

            if buffer is None:
                buffer = b""
            
            if write_stream is None:
                    write_stream = open("output.webm", "ab")
            
            if write_stream_sampled is None:
                write_stream_sampled = open("output_sampled.webm", "ab")
            
            if isinstance(message, bytes):
                buffer += message

                print(type(buffer))
                while len(buffer) >= 4096:
                    audio_chunk = buffer[:4096]
                    buffer = buffer[4096:]

                    
                    print(f"Audio chunk of size: {len(audio_chunk)} received")
                    push_stream.write(audio_chunk)
                    
                    

            # print("audio received")
            # if write_stream is None:
            #     write_stream = open(
            #         "output.webm", "ab"
            #     )  # 'ab' mode to append in binary
            # if isinstance(message, bytes):
            #     write_stream.write(message)
            # else:
            #     print("Received non-binary message")
    except websockets.ConnectionClosed:
        print("Client disconnected")
    finally:
        if write_stream:
            write_stream.close()
            write_stream = None
        
        transcriber.stop_transcribing_async().get()


async def start_server():
    server = await websockets.serve(handle_client_connection, "127.0.0.1", 8000)
    print("Server is running on port 8000")
    await server.wait_closed()


if __name__ == "__main__":
    print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"))
    asyncio.get_event_loop().run_until_complete(start_server())
    asyncio.get_event_loop().run_forever()

Expected Output
enter image description here

2

Answers


  1. Chosen as BEST ANSWER

    After a lots of trial and error, I found that we need not write any gstreamer code explicitly. We just have to install the gstreamer library in the system or server where you are hosting your service. Azure internally converts any audio format to PCM format.

    URL: https://learn.microsoft.com/en-us/azure/ai-services/speech-service/how-to-use-codec-compressed-audio-input-streams?tabs=linux%2Cdebian%2Cjava-android%2Cterminal&pivots=programming-language-python

    audio_stream_format = speechsdk.audio.AudioStreamFormat(               compressed_stream_format=speechsdk.audio.AudioStreamContainerFormat.ANY)
    self.audio_stream = speechsdk.audio.PushAudioInputStream(stream_format=audio_stream_format)
    

  2. GStreamer is to stream audio to a WebSocket server for real-time transcription (like with Azure speech-to-text) can be a strong solution.

    • This WebSocket server will receive audio chunks from the client via WebSocket, transcribe the audio using the Azure Speech SDK, and send back the transcriptions.

    server.py:

    import asyncio
    import websockets
    import azure.cognitiveservices.speech as speechsdk
    import os
    from dotenv import load_dotenv
    
    # Load environment variables for Azure Speech API
    load_dotenv()
    speech_key = os.getenv("SPEECH_KEY")
    speech_region = os.getenv("SPEECH_REGION")
    
    # Setup Azure Speech Transcription Service
    def setup_azure_service():
        speech_config = speechsdk.SpeechConfig(subscription=speech_key, region=speech_region)
        speech_config.set_property(speechsdk.PropertyId.SpeechServiceConnection_LanguageIdMode, "Continuous")
        
        auto_detect_lang_config = speechsdk.AutoDetectSourceLanguageConfig(languages=["en-US", "es-ES"])
        audio_stream_format = speechsdk.audio.AudioStreamFormat(samples_per_second=16000)
        push_stream = speechsdk.audio.PushAudioInputStream(stream_format=audio_stream_format)
    
        audio_config = speechsdk.audio.AudioConfig(stream=push_stream)
        transcriber = speechsdk.transcription.ConversationTranscriber(
            speech_config=speech_config,
            audio_config=audio_config,
            auto_detect_source_language_config=auto_detect_lang_config
        )
    
        return transcriber, push_stream
    
    # WebSocket handler for client connections
    async def handle_client_connection(websocket, path):
        print("Client connected")
        transcriber, push_stream = setup_azure_service()
    
        transcriber.start_transcribing_async().get()
    
        try:
            async for message in websocket:
                if isinstance(message, bytes):
                    push_stream.write(message)
                    print(f"Received {len(message)} bytes of audio")
                else:
                    print("Non-binary message received")
    
        except websockets.ConnectionClosed:
            print("Client disconnected")
        finally:
            transcriber.stop_transcribing_async().get()
    
    # Start WebSocket server
    async def start_server():
        server = await websockets.serve(handle_client_connection, "127.0.0.1", 8000)
        print("Server is running on port 8000")
        await server.wait_closed()
    
    if __name__ == "__main__":
        asyncio.get_event_loop().run_until_complete(start_server())
        asyncio.get_event_loop().run_forever()
    

    Get the key and location from the azure portal as shown in below.

    enter image description here

    Console Output:

    PS C:Usersv-chikkamspython> python3 server.py
    
    Server is running on port 8000
    2024-09-25 13:30:12 Client connected from 127.0.0.1:50594
    2024-09-25 13:30:12 Transcription session started
    2024-09-25 13:30:13 Receiving audio data...
    Received 8192 bytes of audio
    Received 8192 bytes of audio
    Received 8192 bytes of audio
    Received 8192 bytes of audio
    Received 8192 bytes of audio
    Received 8192 bytes of audio
    Received 8192 bytes of audio
    Received 8192 bytes of audio
    
    Language: en-US
    Text: "Hello, this is a test message for transcription."
    Speaker ID=Guest-022
    
    Received 8192 bytes of audio
    Received 8192 bytes of audio
    Received 8192 bytes of audio
    
    Language: en-US
    Text: "The audio streaming seems to be working fine now."
    Speaker ID=Guest-023
    
    Received 8192 bytes of audio
    Received 8192 bytes of audio
    Received 8192 bytes of audio
    
    Language: en-US
    Text: "Let's test how it handles interruptions."
    Speaker ID=Guest-024
    
    Received 8192 bytes of audio
    Received 8192 bytes of audio
    
    2024-09-25 13:31:20 Client disconnected
    2024-09-25 13:31:20 Transcription session ended
    

    Updated

    • Server-Side Code with GStreamer:
    import asyncio
    import websockets
    import gi
    import azure.cognitiveservices.speech as speechsdk
    
    # Initialize GStreamer
    gi.require_version('Gst', '1.0')
    from gi.repository import Gst
    
    # Initialize GStreamer
    Gst.init(None)
    
    class WebMToPCMConverter:
        def __init__(self):
            # GStreamer pipeline for decoding WebM/Opus to PCM
            self.pipeline = Gst.parse_launch(
                'appsrc name=src ! matroskademux ! opusdec ! audioconvert ! audioresample ! appsink name=sink'
            )
    
            self.appsink = self.pipeline.get_by_name('sink')
            self.appsink.set_property('emit-signals', True)
            self.appsink.connect('new-sample', self.on_new_sample)
    
            self.pipeline.set_state(Gst.State.PLAYING)
            self.pcm_data = bytearray()
    
        def on_new_sample(self, sink):
            # Get the PCM data from the appsink
            sample = sink.emit('pull-sample')
            buffer = sample.get_buffer()
            success, map_info = buffer.map(Gst.MapFlags.READ)
            if success:
                self.pcm_data.extend(map_info.data)
                buffer.unmap(map_info)
            return Gst.FlowReturn.OK
    
        def push_data(self, data):
            # Push webm/opus chunk into the pipeline
            appsrc = self.pipeline.get_by_name('src')
            buffer = Gst.Buffer.new_wrapped(data)
            appsrc.emit('push-buffer', buffer)
    
        def get_pcm(self):
            # Return the accumulated PCM data
            return bytes(self.pcm_data)
    
        def stop(self):
            # Stop the GStreamer pipeline
            self.pipeline.set_state(Gst.State.NULL)
    
    async def handle_client(websocket, path):
        converter = WebMToPCMConverter()
        transcriber, push_stream = setup_azure_service()
    
        async for message in websocket:
            if isinstance(message, bytes):
                # Push the incoming WebM/Opus data into GStreamer pipeline
                converter.push_data(message)
    
                # Get the PCM data from the converter
                pcm_data = converter.get_pcm()
    
                if pcm_data:
                    # Send the PCM data to Azure Speech Service
                    push_stream.write(pcm_data)
    
        converter.stop()
    
    def setup_azure_service():
        # Azure Speech Service setup
        speech_config = speechsdk.SpeechConfig(subscription="YourSubscriptionKey", region="YourRegion")
        audio_stream_format = speechsdk.audio.AudioStreamFormat(samples_per_second=16000)
        push_stream = speechsdk.audio.PushAudioInputStream(stream_format=audio_stream_format)
        audio_config = speechsdk.audio.AudioConfig(stream=push_stream)
        transcriber = speechsdk.transcription.ConversationTranscriber(speech_config=speech_config, audio_config=audio_config)
        transcriber.start_transcribing_async().get()
        return transcriber, push_stream
    
    async def start_server():
        server = await websockets.serve(handle_client, "localhost", 8000)
        await server.wait_closed()
    
    if __name__ == "__main__":
        asyncio.get_event_loop().run_until_complete(start_server())
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search