skip to Main Content

I drafted an Azure Function in Python, model V2.

I have successfully published a message into a topic.

Now I would like to set a SessionID for a published message, but can not figure out how. How do I publish a message with a SessionId?

My attempt with JSON in "my_json_string" variable was not recognized by Azure.

import logging
import azure.functions as func
from datetime import datetime
import json


app = func.FunctionApp()

# vs output into queue for python
# https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus-output?tabs=python-v2%2Cisolated-process%2Cnodejs-v4%2Cextensionv5&pivots=programming-language-python


@app.route(route="http_trigger_topic", auth_level=func.AuthLevel.ANONYMOUS)
@app.service_bus_topic_output(arg_name="message",
                              connection="ServiceBusConnection",
                              topic_name="alfdevapi6topic")
def http_trigger_topic(req: func.HttpRequest, message: func.Out[str]) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')
    myMessage = "Hi alf this is my message via the queue to you."
    logging.info(myMessage)
    
    input_msg = req.params.get('message')

    now = datetime.now()
    print("now =", now)

    dt_string = now.strftime("%d/%m/%Y %H:%M:%S")

    my_json_string = f"""
    {{
        "body": "{myMessage}",
        "customProperties": {{
            "messagenumber": 0,
            "timePublish": "{now}",
        }},
        "brokerProperties": {{
            "SessionId": "1"
        }}
    }}
    """


    message.set(my_json_string)


    return func.HttpResponse(
        "This function should process queue messages.",
        status_code=200
    )

2

Answers


  1. Chosen as BEST ANSWER

    The following combination I derived from Rithwik.

    Requirements.txt

    azure-functions
    datetime
    azure-servicebus
    

    Function, which combines @app.route(route="http_trigger") with def http_trigger(req: func.HttpRequest) -> func.HttpResponse: and servicebus from azure.servicebus import ServiceBusClient, ServiceBusMessage

    Function to send

    import logging
    import azure.functions as func
    from datetime import datetime
    import json
    from azure.servicebus import ServiceBusClient, ServiceBusMessage
    import os
    
    app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
    
    # vs output into queue for python
    # https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus-output?tabs=python-v2%2Cisolated-process%2Cnodejs-v4%2Cextensionv5&pivots=programming-language-python
    
    
    TOPIC_NAME_A = "alfdevapi6topic" 
    CONN_STRING = os.environ['ServiceBusConnection']
    SESSION_ID = "008"
    
    
    @app.route(route="http_trigger")
    def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
        logging.info('Python HTTP trigger function processed a request.')
        myMessage = "Hi alf this is my message via the queue to you."
    
        jsn_message_envelope =  generateMessage(myMessage, SESSION_ID)
        logging.info(f"Will send envelope: {jsn_message_envelope}")
    
        servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONN_STRING)
        rith_sen = servicebus_client.get_topic_sender(TOPIC_NAME_A)
        rith_msg = ServiceBusMessage(jsn_message_envelope)
        rith_msg.session_id = SESSION_ID
        with rith_sen:
            rith_sen.send_messages(rith_msg)
        servicebus_client.close()
    
    
        return func.HttpResponse(
                "This HTTP triggered function executed successfully. Pass a name in the query string or in the request body for a personalized response.",
                status_code=200
        )
    
    
    def generateMessage(myMessage, mySessionId):
        now = datetime.now()
        print("now =", now)
        logging.info(f"Time stamp: {now}")
    
        dt_string = now.strftime("%d/%m/%Y %H:%M:%S")
    
        my_json_string = f"""
        {{
            "body": "{myMessage}",
            "customProperties": {{
                "timePublish": "{dt_string}",
            }},
            "brokerProperties": {{
                "SessionId": "{mySessionId}"
            }}
        }}
        """
    
        return my_json_string
    

  2. I do agree with @Skip that func.Out cannot send the message with session id.

    But alternatively you can use below code using SDK:

    import azure.functions as func
    import logging
    from azure.servicebus import ServiceBusClient, ServiceBusMessage
    
    app = func.FunctionApp()
    @app.service_bus_topic_trigger(arg_name="azservicebus", subscription_name="mysubscription", topic_name="mysbtopic",
                                   connection="sbfiltersid_SERVICEBUS") 
    def servicebus_topic_trigger(azservicebus: func.ServiceBusMessage):
        logging.info('Python ServiceBus Topic trigger processed a message: %s',
                    azservicebus.get_body().decode('utf-8'))
        rith_con = "Endpoint=sb://sbfiltersid.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=MT5eFCEndpoint=sb://sbfiltersid.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=MT5eFCUG+Ng="
        rith_qn = "rithwik"
        servicebus_client = ServiceBusClient.from_connection_string(rith_con)
        rith_sen = servicebus_client.get_queue_sender(rith_qn)
        rith_msg = ServiceBusMessage("Hello Rithwik Bojja")
        rith_msg.session_id = "008"
        with rith_sen:
            rith_sen.send_messages(rith_msg)
        servicebus_client.close()
    

    Output:

    enter image description here

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search