skip to Main Content

I have a python service which uses azure service bus topic for sending and receiving messages. I am using azure-servicebus==7.12.2.

Max timeout I can set on azure portal is 5 minute, but the processing for the message on topic take around 60 minutes. So, I am using AutoLockRenewer as suggested online on couple of posts.

Here is my code

async def receive_messages_from_topic(
    servicebus_client: ServiceBusClient,
    topic_name: str,
    executor: Any,  # type: ignore
):
    async with servicebus_client:
        renewer = AutoLockRenewer(max_lock_renewal_duration=3600)
        async with servicebus_client.get_subscription_receiver(
            topic_name=topic_name,
            subscription_name=SUBSCRIPTION_NAME,
            auto_lock_renewer=renewer,  # type: ignore
        ) as receiver:
            logger.info(f"Listening to topic: {topic_name} for subscription: {SUBSCRIPTION_NAME}")
            while True:
                try:
                    messages = await receiver.receive_messages()
                    for message in messages:
                        # Process the message
                        logger.info(
                            f"Received message from topic: {topic_name}, msg: {str(message)}"
                        )

                        with make_temp_directory() as temp_dir:
                            try:
                                bytes_str = str(message)
                                bytes = ast.literal_eval(bytes_str)
                                executor(topic_name, bytes, temp_dir)
                                await receiver.complete_message(message)
                            except Exception as e:
                                logger.exception(
                                    f"Error executing job for topic: {topic_name} message: {message.message_id}, {e}"
                                )
                                await receiver.dead_letter_message(message)

                    if len(messages) == 0:
                        # Small sleep to avoid busy looping
                        await asyncio.sleep(60)
                except Exception as e:
                    logger.exception(f"Error processing message for topic: {topic_name}, {e}")

executor(topic_name, bytes, temp_dir) will complete the processing in 40 minutes, no issues or errors. But, when I try to mark the message as complete in the above loop it will throw exception

Traceback (most recent call last):
  File "/home/vscode/.cache/bazel/_bazel_vscode/d9fd8dd9485b3d1d1994c3846baefa35/execroot/_main/bazel-out/k8-opt-release/bin/tools/service_bus/run/run.runfiles/_main/tools/service_bus/run/run.py", line 72, in receive_messages_from_topic
    await receiver.complete_message(message)
  File "/home/vscode/.local/lib/python3.10/site-packages/azure/servicebus/aio/_servicebus_receiver_async.py", line 852, in complete_message
    await self._settle_message_with_retry(message, MESSAGE_COMPLETE)
  File "/home/vscode/.local/lib/python3.10/site-packages/azure/servicebus/aio/_servicebus_receiver_async.py", line 494, in _settle_message_with_retry
    raise MessageLockLostError(
azure.servicebus.exceptions.MessageLockLostError: The lock on the message lock has expired.

I tried setting AutoLockRenewer on message and session still same result.
https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/index.html#automatically-renew-message-or-session-locks

Not sure if I missing something or something is wrong with my implementation

2

Answers


  1. Chosen as BEST ANSWER

    The reason why AutoLockRenewer was not working is because the call to the executor is a blocking call, preventing it from running in the background. I changed the executor to async and it started working.

    Thanks to Microsoft for the answer https://github.com/Azure/azure-sdk-for-python/issues/37245


  2. I tried the code below to process messages from an Azure Service Bus Topic, completing them after 5 minutes. I’m using AutoLockRenewer to automatically extend the message lock for up to 60 minutes during the processing of messages.

    Code :

    import asyncio
    import logging
    from azure.servicebus.aio import ServiceBusClient, AutoLockRenewer
    from contextlib import contextmanager
    import tempfile
    import shutil
    import os
    import time
    
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    SUBSCRIPTION_NAME = "<sub_name>"
    TOPIC_NAME = "<topic_name>"
    CONNECTION_STRING = "<Connec_Str>"
    
    @contextmanager
    def make_temp_directory():
        temp_dir = tempfile.mkdtemp()
        try:
            yield temp_dir
        finally:
            shutil.rmtree(temp_dir)
    
    def executor(topic_name, message_data, temp_dir):
        logger.info(f"Processing message from topic: {topic_name}, data: {message_data}, temp dir: {temp_dir}")
        with open(os.path.join(temp_dir, 'message.txt'), 'w') as f:
            f.write(str(message_data))
    
    async def receive_messages_from_topic(servicebus_client: ServiceBusClient, topic_name: str, executor: any, duration_seconds: int = 300):
        async with servicebus_client:
            renewer = AutoLockRenewer(max_lock_renewal_duration=3600)
            async with servicebus_client.get_subscription_receiver(
                topic_name=topic_name,
                subscription_name=SUBSCRIPTION_NAME,
                auto_lock_renewer=renewer,
            ) as receiver:
                logger.info(f"Listening to topic: {topic_name} for subscription: {SUBSCRIPTION_NAME}")
                
                start_time = time.time()
                end_time = start_time + duration_seconds
                
                while time.time() < end_time:
                    try:
                        messages = await receiver.receive_messages()
                        if not messages:
                            await asyncio.sleep(10) 
                            continue
                        
                        for message in messages:
                            message_content = str(message)
                            logger.info(f"Received message from topic: {topic_name}, msg: {message_content}")
                            
                            with make_temp_directory() as temp_dir:
                                try:
                                    executor(topic_name, message_content, temp_dir)
                                    await receiver.complete_message(message)
                                except Exception as e:
                                    logger.exception(f"Error executing job for topic: {topic_name} message: {message.message_id}, {e}")
                                    await receiver.dead_letter_message(message)
                    except asyncio.CancelledError:
                        logger.info("Receive operation was cancelled.")
                        break
                    except Exception as e:
                        logger.exception(f"Error processing message for topic: {topic_name}, {e}")
    
    async def main():
        servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STRING)
        await receive_messages_from_topic(servicebus_client, TOPIC_NAME, executor)
    
    if __name__ == "__main__":
        try:
            asyncio.run(main())
        except KeyboardInterrupt:
            logger.info("Process interrupted by user.")
    

    Output :

    The following code ran successfully as shown below.

    enter image description here
    enter image description here

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search