skip to Main Content

The main purpose of my program is to connect to an incoming MQTT channel, and send the data received to my AWS Kinesis Stream called "MyKinesisStream".

Here is my code:

import argparse
import logging
import random

from paho.mqtt import client as mqtt_client
from stream_manager import (
    ExportDefinition,
    KinesisConfig,
    MessageStreamDefinition,
    ResourceNotFoundException,
    StrategyOnFull,
    StreamManagerClient, ReadMessagesOptions,
)

broker = 'localhost'
port = 1883
topic = "clients/test/hello/world"
client_id = f'python-mqtt-{random.randint(0, 100)}'
username = '...'
password = '...'

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

args = ""

def connect_mqtt() -> mqtt_client:
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %dn", rc)

    client = mqtt_client.Client(client_id)
    client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def sendDataToKinesis(
        stream_name: str,
        kinesis_stream_name: str,
        payload,
        batch_size: int = None,
):
    try:
        print("Debug: sendDataToKinesis with params:", stream_name + " | ", kinesis_stream_name, " | ", batch_size)
        print("payload:", payload)
        print("type payload:", type(payload))
    except Exception as e:
        print("Error while printing out the parameters", str(e))
        logger.exception(e)
    try:
        # Create a client for the StreamManager
        kinesis_client = StreamManagerClient()

        # Try deleting the stream (if it exists) so that we have a fresh start
        try:
            kinesis_client.delete_message_stream(stream_name=stream_name)
        except ResourceNotFoundException:
            pass

        exports = ExportDefinition(
            kinesis=[KinesisConfig(
                identifier="KinesisExport" + stream_name,
                kinesis_stream_name=kinesis_stream_name,
                batch_size=batch_size,
            )]
        )
        kinesis_client.create_message_stream(
            MessageStreamDefinition(
                name=stream_name,
                strategy_on_full=StrategyOnFull.OverwriteOldestData,
                export_definition=exports
            )
        )

        sequence_no = kinesis_client.append_message(stream_name=stream_name, data=payload)
        print(
            "Successfully appended message to stream with sequence number ", sequence_no
        )

        readValue = kinesis_client.read_messages(stream_name, ReadMessagesOptions(min_message_count=1, read_timeout_millis=1000))
        print("DEBUG read test: ", readValue)

    except Exception as e:
        print("Exception while running: " + str(e))
        logger.exception(e)
    finally:
        # Always close the client to avoid resource leaks
        print("closing connection")
        if kinesis_client:
            kinesis_client.close()


def subscribe(client: mqtt_client, args):
    def on_message(client, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
        sendDataToKinesis(args.greengrass_stream, args.kinesis_stream, msg.payload, args.batch_size)

    client.subscribe(topic)
    client.on_message = on_message


def run(args):
    mqtt_client_instance = connect_mqtt()
    subscribe(mqtt_client_instance, args)
    mqtt_client_instance.loop_forever()


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser()
    parser.add_argument('--greengrass-stream', required=False, default='...')
    parser.add_argument('--kinesis-stream', required=False, default='MyKinesisStream')
    parser.add_argument('--batch-size', required=False, type=int, default=500)
    return parser.parse_args()

if __name__ == '__main__':
    args = parse_args()
    run(args)

(the dotted parts … are commented out as they are sensitive information, but they are correct values.)

The problem is that it just won’t send any data to our kinesis stream. I get the following STDOUT from the run:

2022-11-25T12:13:47.640Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Connected to MQTT Broker!. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Received `{"machineId":2, .... "timestamp":"2022-10-24T12:21:34.8777249Z","value":true}` from `clients/test/hello/world` topic. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Debug: sendDataToKinesis with params: test |  MyKinesisStream  |  100. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. payload: b'{"machineId":2,... ,"timestamp":"2022-10-24T12:21:34.8777249Z","value":true}'. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. type payload: <class 'bytes'>. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Successfully appended message to stream with sequence number  0. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. DEBUG read test:  [<Class Message. stream_name: 'test', sequence_number: 0, ingest_time: 1669376980985, payload: b'{"machineId":2,"mach'>]. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. closing connection. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}

So we can see that the data arrives from MQTT, the python code executes the append message, and it seems that my kinesis streams have the information as it can read it in the next step… then closes the connection without any error.

But the problem is, that from AWS side, we cannot see the data arriving on the stream:
screnshot of the aws console

What can be the problem here? Our greengrass core is configured properly, can be accessed from the AWS, and the Component is running and healthy also:
Screenshot of IoT Core status
Screenshot of the state if the StreamManager component

Update: we managed to get some messages out with the following code:

...

def sendDataToKinesis(
        kinesis_client,
        stream_name: str,
        payload,
):
    try:
        print("payload:", payload)
        print("type payload:", type(payload))
    except Exception as e:
        print("Error while printing out the parameters", str(e))
        logger.exception(e)
    try:

        sequence_no = kinesis_client.append_message(stream_name=stream_name, data=payload)
        print(
            "Successfully appended message to stream with sequence number ", sequence_no
        )

        time.sleep(1)

    except Exception as e:
        print("Exception while running: " + str(e))
        logger.exception(e)
    # finally:
    #     # todo: Always close the client to avoid resource leaks!!!
    #     print("closing connection")
    #     if kinesis_client:
    #         kinesis_client.close()


def subscribe(client: mqtt_client, stream_name: str, args, kinesisClient):
    def on_message(client, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
        sendDataToKinesis(kinesisClient, stream_name, msg.payload)

    client.subscribe(topic)
    client.on_message = on_message


def create_kinensis_client(greengrass_stream, kinesis_stream, batch_size):
    # Create a client for the StreamManager
    kinesis_client = StreamManagerClient()

    # Try deleting the stream (if it exists) so that we have a fresh start
    try:
        kinesis_client.delete_message_stream(stream_name=greengrass_stream)
    except ResourceNotFoundException:
        pass

    exports = ExportDefinition(
        kinesis=[KinesisConfig(
            identifier="KinesisExport" + greengrass_stream,
            kinesis_stream_name=kinesis_stream,
            batch_size=batch_size,
        )]
    )
    kinesis_client.create_message_stream(
        MessageStreamDefinition(
            name=greengrass_stream,
            strategy_on_full=StrategyOnFull.OverwriteOldestData,
            export_definition=exports
        )
    )
    print("Debug:created stream with parasm ", greengrass_stream + " | ", kinesis_stream, " | ", batch_size)

    return kinesis_client


def run(args):
    kinesis_client = create_kinensis_client(args.greengrass_stream, args.kinesis_stream, args.batch_size)
    mqtt_client_instance = connect_mqtt()
    subscribe(mqtt_client_instance, args.greengrass_stream, args, kinesis_client)
    mqtt_client_instance.loop_forever()


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser()
    parser.add_argument('--greengrass-stream', required=False, default='SiteWise_Stream_Kinesis')
    parser.add_argument('--kinesis-stream', required=False, default='MyKinesisStream')
    parser.add_argument('--batch-size', required=False, type=int, default=500)
    return parser.parse_args()


if __name__ == '__main__':
    args = parse_args()
    print(f'args: {args.__dict__}')

    run(args)

In this approach:

  • we create the connection only once
  • we do not close the connection,
  • and wait 1 second before moving on after appending the message to the kinesis stream.

No need to say that this solution cannot be used in our production environment, but after a lot of random trying, this seems to work somehow. We still need to find the root cause, but it might be a python threading problem? We are out of guesses.

2

Answers


  1. Chosen as BEST ANSWER

    After contacting official AWS personnel, we got the following answer:

    So looking at the code a bit further its seems that The API call the streams manager library is making to Kinesis is done asynchronously. What this means for your program is that when you try to call kinesis_client.append_message(stream_name, payload) the function is it is executed locally but the results will only be sent to AWS later, however the code proceeds with executing the next line. This in the end causes the stream to be closed and destroyed via kinesis_client.close() before the data is published to the AWS side of the stream. This seems to be an oddity of how Python handles streams. Since the API call is also asynchronous and you don’t have access to the future(the future is not passed to the caller from the streams library) you have no way of knowing that the publish to AWS failed due to the stream being closed. The reason why your infinite loop client works is that the infinite loop gives the asynchronous call time to complete. The same goes for you example where you don’t close the client.

    As to the solution I am not sure what would be the correct way to proceed. Looking at how streams are meant to be used it is not unreasonable to keep the stream open as long as the MQTT library is still processing messages. You will just need to be careful with error handling to ensure you don’t have any leaks or lingering connections to the stream should the program stop functioning.


  2. We also received a response on the AWS Q&A site:

    From here:

    You are deleting your stream every time you append a message to it.
    Since the stream only ever contains a single message, you likely
    aren’t hitting the batch_size minimum in order for StreamManager to
    upload.

    You’ll want create your StreamManager client and stream initialization
    a single time, and then re-use them when appending data. You may also
    want to consider reducing your batch size.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search