skip to Main Content

We are using Azure Conversation Transcriber for realtime speech to text with diarization. We need to incorporate the pause_resume feature. We tried different ways but nothing worked.

Azure only provides stop_transcribing_async() function that completely stops the current session.

I have attached the code we tried but it is not working, Any help will be appreciated. I have attached a block of code that contains the logic for pausing and resuming. Please do advice what else method we could follow.

In the below code, we stop the transcriber completely once "pause" message is passed and restart the same once the "resume" message is detected.

async def receive_audio(uuid, path):

    audio_queue = Queue(maxsize=0)
    
    transcriber_state = False
    try:
        conversation_transcriber, push_stream = create_conversation_transcriber(
            CONNECTIONS.connections[uuid]
        )

        # Start continuous recognition
        conversation_transcriber.start_transcribing_async().get()
        transcriber_state = True
        
        while True:
            # Receive audio data from the WebSocket
            websocket = CONNECTIONS.connections[uuid]["websocket"]
            data = await websocket.recv()
            
            logger.info(CONNECTIONS.connections[uuid]['state'])
            if isinstance(data, str):

                logger.info(f"Current State: {CONNECTIONS.connections[uuid]['state']}")
                if data == "inactive":
                    logger.info("Pausing the transcriber...")
                    conversation_transcriber.stop_transcribing_async().get()
                    push_stream.close()
                    transcriber_state = False
                
                elif data == "active" and not transcriber_state:
                    logger.info(f"Resuming the transcriber...")
                    conversation_transcriber, push_stream = create_conversation_transcriber()                
                    conversation_transcriber.start_transcribing_async().get()
                    transcriber_state = True
                
                CONNECTIONS.connections[uuid]["state"] = data
                    
            
            if CONNECTIONS.connections[uuid]["state"] == "active":
                audio_queue.put_nowait(data)
                while not audio_queue.empty():
                    chunk = get_chunk_from_queue(q=audio_queue, chunk_size=4096)
                    CONNECTIONS.connections[uuid]["audio_buffer"] += chunk
                    push_stream.write(chunk)

    except websockets.exceptions.ConnectionClosed as e:
        logger.info("Connection closed")
        logger.info(e)
        conversation_transcriber.stop_transcribing_async().get()
        push_stream.close()
    except Exception as e:
        logger.error(f"Error in receive_audio: {e}")

    finally:
        await websocket.close(code=1000)

2

Answers


  1. Incorporating a pause and resume feature for the Azure Conversation Transcriber requires handling the stop_transcribing_async and start_transcribing_async methods appropriately. Your current approach stops and restarts the transcriber but does it in a way that might cause issues with the state management and the audio queue.

    Login or Signup to reply.
  2. Here you can control the flow of audio data by pausing the input stream (i.e., stop feeding audio to the push stream). This simulates a pause in transcription without completely stopping the transcriber session.

    • When we resume, the transcriber can pick up from where it left off, though it’s important to check that the SDK can handle this correctly. You may need to investigate how much state is maintained in the transcription session.

    App.py:

    import logging
    import azure.cognitiveservices.speech as speechsdk
    import asyncio
    from queue import Queue
    import websockets
    
    # Assume CONNECTIONS is a global dict to manage websocket connections.
    CONNECTIONS = {}
    
    async def receive_audio(uuid, path):
        audio_queue = Queue(maxsize=0)
        transcriber_state = False  # False means transcriber is paused
        conversation_transcriber = None
        push_stream = None
    
        try:
            # Get the WebSocket connection and initialize the transcriber
            websocket = CONNECTIONS[uuid]["websocket"]
            connection_details = CONNECTIONS[uuid]
            
            conversation_transcriber, push_stream = create_conversation_transcriber(connection_details)
    
            # Start continuous recognition
            conversation_transcriber.start_transcribing_async().get()
            transcriber_state = True
            logging.info("Started transcribing...")
    
            while True:
                # Receive control messages or audio data
                data = await websocket.recv()
    
                if isinstance(data, str):
                    # Handle 'inactive' and 'active' state changes (pause/resume)
                    logging.info(f"Received state: {data}")
                    if data == "inactive" and transcriber_state:
                        # Pausing: keep the transcriber alive, but stop sending audio
                        logging.info("Pausing the transcriber... (not stopping)")
                        transcriber_state = False
    
                    elif data == "active" and not transcriber_state:
                        # Resuming: continue sending audio to the transcriber
                        logging.info("Resuming the transcriber...")
                        transcriber_state = True
                    
                    CONNECTIONS[uuid]["state"] = data
                
                # If transcriber is active, continue pushing audio data
                if CONNECTIONS[uuid]["state"] == "active":
                    audio_queue.put_nowait(data)
                    while not audio_queue.empty():
                        chunk = get_chunk_from_queue(q=audio_queue, chunk_size=4096)
                        CONNECTIONS[uuid]["audio_buffer"] += chunk
                        push_stream.write(chunk)  # Keep writing to the open stream
    
        except websockets.exceptions.ConnectionClosed as e:
            logging.info("WebSocket connection closed.")
            if conversation_transcriber:
                conversation_transcriber.stop_transcribing_async().get()
            if push_stream:
                push_stream.close()
        except Exception as e:
            logging.error(f"Error in receive_audio: {e}")
        finally:
            await websocket.close(code=1000)
            logging.info("WebSocket closed.")
    
    def create_conversation_transcriber(connection_details):
        """Create a conversation transcriber with Azure speech configuration."""
        speech_config = speechsdk.SpeechConfig(
            subscription=connection_details['subscription_key'],
            region=connection_details['region']
        )
        audio_format = speechsdk.audio.AudioStreamFormat(samples_per_second=16000, bits_per_sample=16, channels=1)
        push_stream = speechsdk.audio.PushAudioInputStream(audio_format)
        audio_config = speechsdk.audio.AudioConfig(stream=push_stream)
        
        transcriber = speechsdk.transcription.ConversationTranscriber(speech_config, audio_config)
        
        return transcriber, push_stream
    
    # Helper function to get chunk from queue
    def get_chunk_from_queue(q, chunk_size):
        return q.get_nowait()
    
    async def main():
        # Initialize CONNECTIONS dictionary with a dummy WebSocket and credentials
        CONNECTIONS['dummy_uuid'] = {
            'websocket': await websockets.connect('ws://localhost:8000'),  # Example WebSocket endpoint
            'subscription_key': 'your_azure_subscription_key',
            'region': 'your_azure_region',
            'audio_buffer': bytearray(),
            'state': 'active'  # Initial state
        }
    
        # Start receiving audio for this connection
        await receive_audio('dummy_uuid', 'path/to/audio')
    
    if __name__ == "__main__":
        logging.basicConfig(level=logging.INFO)
        logging.info("Starting the application...")
        
        # Run the asyncio event loop to execute the main function
        asyncio.run(main())
    
    • when you receive a "pause" command, you can buffer the incoming audio data and delay pushing it to the transcriber until a "resume" command is received.

    • CONNECTIONS[uuid]["state"] controls the flow of audio to the transcriber. When the state is "inactive," the audio stream is not fed to the transcriber.

    Console Log:

    ![enter image description here](Here you can control the flow of audio data by pausing the input stream (i.e., stop feeding audio to the push stream). This simulates a pause in transcription without completely stopping the transcriber session.

    • When we resume, the transcriber can pick up from where it left off, though it’s important to check that the SDK can handle this correctly. You may need to investigate how much state is maintained in the transcription session.

    App.py:

    import logging
    import azure.cognitiveservices.speech as speechsdk
    import asyncio
    from queue import Queue
    import websockets
    
    # Assume CONNECTIONS is a global dict to manage websocket connections.
    CONNECTIONS = {}
    
    async def receive_audio(uuid, path):
        audio_queue = Queue(maxsize=0)
        transcriber_state = False  # False means transcriber is paused
        conversation_transcriber = None
        push_stream = None
    
        try:
            # Get the WebSocket connection and initialize the transcriber
            websocket = CONNECTIONS[uuid]["websocket"]
            connection_details = CONNECTIONS[uuid]
            
            conversation_transcriber, push_stream = create_conversation_transcriber(connection_details)
    
            # Start continuous recognition
            conversation_transcriber.start_transcribing_async().get()
            transcriber_state = True
            logging.info("Started transcribing...")
    
            while True:
                # Receive control messages or audio data
                data = await websocket.recv()
    
                if isinstance(data, str):
                    # Handle 'inactive' and 'active' state changes (pause/resume)
                    logging.info(f"Received state: {data}")
                    if data == "inactive" and transcriber_state:
                        # Pausing: keep the transcriber alive, but stop sending audio
                        logging.info("Pausing the transcriber... (not stopping)")
                        transcriber_state = False
    
                    elif data == "active" and not transcriber_state:
                        # Resuming: continue sending audio to the transcriber
                        logging.info("Resuming the transcriber...")
                        transcriber_state = True
                    
                    CONNECTIONS[uuid]["state"] = data
                
                # If transcriber is active, continue pushing audio data
                if CONNECTIONS[uuid]["state"] == "active":
                    audio_queue.put_nowait(data)
                    while not audio_queue.empty():
                        chunk = get_chunk_from_queue(q=audio_queue, chunk_size=4096)
                        CONNECTIONS[uuid]["audio_buffer"] += chunk
                        push_stream.write(chunk)  # Keep writing to the open stream
    
        except websockets.exceptions.ConnectionClosed as e:
            logging.info("WebSocket connection closed.")
            if conversation_transcriber:
                conversation_transcriber.stop_transcribing_async().get()
            if push_stream:
                push_stream.close()
        except Exception as e:
            logging.error(f"Error in receive_audio: {e}")
        finally:
            await websocket.close(code=1000)
            logging.info("WebSocket closed.")
    
    def create_conversation_transcriber(connection_details):
        """Create a conversation transcriber with Azure speech configuration."""
        speech_config = speechsdk.SpeechConfig(
            subscription=connection_details['subscription_key'],
            region=connection_details['region']
        )
        audio_format = speechsdk.audio.AudioStreamFormat(samples_per_second=16000, bits_per_sample=16, channels=1)
        push_stream = speechsdk.audio.PushAudioInputStream(audio_format)
        audio_config = speechsdk.audio.AudioConfig(stream=push_stream)
        
        transcriber = speechsdk.transcription.ConversationTranscriber(speech_config, audio_config)
        
        return transcriber, push_stream
    
    # Helper function to get chunk from queue
    def get_chunk_from_queue(q, chunk_size):
        return q.get_nowait()
    
    async def main():
        # Initialize CONNECTIONS dictionary with a dummy WebSocket and credentials
        CONNECTIONS['dummy_uuid'] = {
            'websocket': await websockets.connect('ws://localhost:8000'),  # Example WebSocket endpoint
            'subscription_key': 'your_azure_subscription_key',
            'region': 'your_azure_region',
            'audio_buffer': bytearray(),
            'state': 'active'  # Initial state
        }
    
        # Start receiving audio for this connection
        await receive_audio('dummy_uuid', 'path/to/audio')
    
    if __name__ == "__main__":
        logging.basicConfig(level=logging.INFO)
        logging.info("Starting the application...")
        
        # Run the asyncio event loop to execute the main function
        asyncio.run(main())
    
    • when you receive a "pause" command, you can buffer the incoming audio data and delay pushing it to the transcriber until a "resume" command is received.

    • CONNECTIONS[uuid]["state"] controls the flow of audio to the transcriber. When the state is "inactive," the audio stream is not fed to the transcriber.

    Console Log:

    enter image description here)

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search