The main purpose of my program is to connect to an incoming MQTT channel, and send the data received to my AWS Kinesis Stream called "MyKinesisStream".
Here is my code:
import argparse
import logging
import random
from paho.mqtt import client as mqtt_client
from stream_manager import (
ExportDefinition,
KinesisConfig,
MessageStreamDefinition,
ResourceNotFoundException,
StrategyOnFull,
StreamManagerClient, ReadMessagesOptions,
)
broker = 'localhost'
port = 1883
topic = "clients/test/hello/world"
client_id = f'python-mqtt-{random.randint(0, 100)}'
username = '...'
password = '...'
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
args = ""
def connect_mqtt() -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %dn", rc)
client = mqtt_client.Client(client_id)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
def sendDataToKinesis(
stream_name: str,
kinesis_stream_name: str,
payload,
batch_size: int = None,
):
try:
print("Debug: sendDataToKinesis with params:", stream_name + " | ", kinesis_stream_name, " | ", batch_size)
print("payload:", payload)
print("type payload:", type(payload))
except Exception as e:
print("Error while printing out the parameters", str(e))
logger.exception(e)
try:
# Create a client for the StreamManager
kinesis_client = StreamManagerClient()
# Try deleting the stream (if it exists) so that we have a fresh start
try:
kinesis_client.delete_message_stream(stream_name=stream_name)
except ResourceNotFoundException:
pass
exports = ExportDefinition(
kinesis=[KinesisConfig(
identifier="KinesisExport" + stream_name,
kinesis_stream_name=kinesis_stream_name,
batch_size=batch_size,
)]
)
kinesis_client.create_message_stream(
MessageStreamDefinition(
name=stream_name,
strategy_on_full=StrategyOnFull.OverwriteOldestData,
export_definition=exports
)
)
sequence_no = kinesis_client.append_message(stream_name=stream_name, data=payload)
print(
"Successfully appended message to stream with sequence number ", sequence_no
)
readValue = kinesis_client.read_messages(stream_name, ReadMessagesOptions(min_message_count=1, read_timeout_millis=1000))
print("DEBUG read test: ", readValue)
except Exception as e:
print("Exception while running: " + str(e))
logger.exception(e)
finally:
# Always close the client to avoid resource leaks
print("closing connection")
if kinesis_client:
kinesis_client.close()
def subscribe(client: mqtt_client, args):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
sendDataToKinesis(args.greengrass_stream, args.kinesis_stream, msg.payload, args.batch_size)
client.subscribe(topic)
client.on_message = on_message
def run(args):
mqtt_client_instance = connect_mqtt()
subscribe(mqtt_client_instance, args)
mqtt_client_instance.loop_forever()
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument('--greengrass-stream', required=False, default='...')
parser.add_argument('--kinesis-stream', required=False, default='MyKinesisStream')
parser.add_argument('--batch-size', required=False, type=int, default=500)
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
run(args)
(the dotted parts … are commented out as they are sensitive information, but they are correct values.)
The problem is that it just won’t send any data to our kinesis stream. I get the following STDOUT from the run:
2022-11-25T12:13:47.640Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Connected to MQTT Broker!. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Received `{"machineId":2, .... "timestamp":"2022-10-24T12:21:34.8777249Z","value":true}` from `clients/test/hello/world` topic. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Debug: sendDataToKinesis with params: test | MyKinesisStream | 100. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. payload: b'{"machineId":2,... ,"timestamp":"2022-10-24T12:21:34.8777249Z","value":true}'. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. type payload: <class 'bytes'>. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Successfully appended message to stream with sequence number 0. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. DEBUG read test: [<Class Message. stream_name: 'test', sequence_number: 0, ingest_time: 1669376980985, payload: b'{"machineId":2,"mach'>]. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. closing connection. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
So we can see that the data arrives from MQTT, the python code executes the append message, and it seems that my kinesis streams have the information as it can read it in the next step… then closes the connection without any error.
But the problem is, that from AWS side, we cannot see the data arriving on the stream:
What can be the problem here? Our greengrass core is configured properly, can be accessed from the AWS, and the Component is running and healthy also:
Update: we managed to get some messages out with the following code:
...
def sendDataToKinesis(
kinesis_client,
stream_name: str,
payload,
):
try:
print("payload:", payload)
print("type payload:", type(payload))
except Exception as e:
print("Error while printing out the parameters", str(e))
logger.exception(e)
try:
sequence_no = kinesis_client.append_message(stream_name=stream_name, data=payload)
print(
"Successfully appended message to stream with sequence number ", sequence_no
)
time.sleep(1)
except Exception as e:
print("Exception while running: " + str(e))
logger.exception(e)
# finally:
# # todo: Always close the client to avoid resource leaks!!!
# print("closing connection")
# if kinesis_client:
# kinesis_client.close()
def subscribe(client: mqtt_client, stream_name: str, args, kinesisClient):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
sendDataToKinesis(kinesisClient, stream_name, msg.payload)
client.subscribe(topic)
client.on_message = on_message
def create_kinensis_client(greengrass_stream, kinesis_stream, batch_size):
# Create a client for the StreamManager
kinesis_client = StreamManagerClient()
# Try deleting the stream (if it exists) so that we have a fresh start
try:
kinesis_client.delete_message_stream(stream_name=greengrass_stream)
except ResourceNotFoundException:
pass
exports = ExportDefinition(
kinesis=[KinesisConfig(
identifier="KinesisExport" + greengrass_stream,
kinesis_stream_name=kinesis_stream,
batch_size=batch_size,
)]
)
kinesis_client.create_message_stream(
MessageStreamDefinition(
name=greengrass_stream,
strategy_on_full=StrategyOnFull.OverwriteOldestData,
export_definition=exports
)
)
print("Debug:created stream with parasm ", greengrass_stream + " | ", kinesis_stream, " | ", batch_size)
return kinesis_client
def run(args):
kinesis_client = create_kinensis_client(args.greengrass_stream, args.kinesis_stream, args.batch_size)
mqtt_client_instance = connect_mqtt()
subscribe(mqtt_client_instance, args.greengrass_stream, args, kinesis_client)
mqtt_client_instance.loop_forever()
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument('--greengrass-stream', required=False, default='SiteWise_Stream_Kinesis')
parser.add_argument('--kinesis-stream', required=False, default='MyKinesisStream')
parser.add_argument('--batch-size', required=False, type=int, default=500)
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
print(f'args: {args.__dict__}')
run(args)
In this approach:
- we create the connection only once
- we do not close the connection,
- and wait 1 second before moving on after appending the message to the kinesis stream.
No need to say that this solution cannot be used in our production environment, but after a lot of random trying, this seems to work somehow. We still need to find the root cause, but it might be a python threading problem? We are out of guesses.
2
Answers
After contacting official AWS personnel, we got the following answer:
We also received a response on the AWS Q&A site:
From here: