I have a python service which uses azure service bus topic for sending and receiving messages. I am using azure-servicebus==7.12.2.
Max timeout I can set on azure portal is 5 minute, but the processing for the message on topic take around 60 minutes. So, I am using AutoLockRenewer as suggested online on couple of posts.
Here is my code
async def receive_messages_from_topic(
servicebus_client: ServiceBusClient,
topic_name: str,
executor: Any, # type: ignore
):
async with servicebus_client:
renewer = AutoLockRenewer(max_lock_renewal_duration=3600)
async with servicebus_client.get_subscription_receiver(
topic_name=topic_name,
subscription_name=SUBSCRIPTION_NAME,
auto_lock_renewer=renewer, # type: ignore
) as receiver:
logger.info(f"Listening to topic: {topic_name} for subscription: {SUBSCRIPTION_NAME}")
while True:
try:
messages = await receiver.receive_messages()
for message in messages:
# Process the message
logger.info(
f"Received message from topic: {topic_name}, msg: {str(message)}"
)
with make_temp_directory() as temp_dir:
try:
bytes_str = str(message)
bytes = ast.literal_eval(bytes_str)
executor(topic_name, bytes, temp_dir)
await receiver.complete_message(message)
except Exception as e:
logger.exception(
f"Error executing job for topic: {topic_name} message: {message.message_id}, {e}"
)
await receiver.dead_letter_message(message)
if len(messages) == 0:
# Small sleep to avoid busy looping
await asyncio.sleep(60)
except Exception as e:
logger.exception(f"Error processing message for topic: {topic_name}, {e}")
executor(topic_name, bytes, temp_dir) will complete the processing in 40 minutes, no issues or errors. But, when I try to mark the message as complete in the above loop it will throw exception
Traceback (most recent call last):
File "/home/vscode/.cache/bazel/_bazel_vscode/d9fd8dd9485b3d1d1994c3846baefa35/execroot/_main/bazel-out/k8-opt-release/bin/tools/service_bus/run/run.runfiles/_main/tools/service_bus/run/run.py", line 72, in receive_messages_from_topic
await receiver.complete_message(message)
File "/home/vscode/.local/lib/python3.10/site-packages/azure/servicebus/aio/_servicebus_receiver_async.py", line 852, in complete_message
await self._settle_message_with_retry(message, MESSAGE_COMPLETE)
File "/home/vscode/.local/lib/python3.10/site-packages/azure/servicebus/aio/_servicebus_receiver_async.py", line 494, in _settle_message_with_retry
raise MessageLockLostError(
azure.servicebus.exceptions.MessageLockLostError: The lock on the message lock has expired.
I tried setting AutoLockRenewer on message and session still same result.
https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/index.html#automatically-renew-message-or-session-locks
Not sure if I missing something or something is wrong with my implementation
2
Answers
The reason why AutoLockRenewer was not working is because the call to the executor is a blocking call, preventing it from running in the background. I changed the executor to async and it started working.
Thanks to Microsoft for the answer https://github.com/Azure/azure-sdk-for-python/issues/37245
I tried the code below to process messages from an Azure Service Bus Topic, completing them after 5 minutes. I’m using
AutoLockRenewer
to automatically extend the message lock for up to 60 minutes during the processing of messages.Code :
Output :
The following code ran successfully as shown below.