gseps-image-acquisition/utils/publisher.py

87 lines
3.4 KiB
Python

# ==============================================================================
# | |
# | @ 2023. SDT Inc. all rights reserved |
# | Author : Jinsung Lee |
# | |
# ==============================================================================
import json
import uuid
import logging.handlers
from awscrt import mqtt
from awsiot import mqtt_connection_builder
######################################################################
# Save Log #
######################################################################
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
log_max_size = 1024000
log_file_count = 3
log_fileHandler = logging.handlers.RotatingFileHandler(
filename=f"./logs/mqtt_publish.log",
maxBytes=log_max_size,
backupCount=log_file_count,
mode='a')
log_fileHandler.setFormatter(formatter)
logger.addHandler(log_fileHandler)
######################################################################
# Config #
######################################################################
with open('./mqtt_config.json', 'r') as f:
config = json.load(f)
with open('./config.json', encoding='UTF-8') as f:
jsonData = json.load(f)
with open(f'/etc/sdt/device.config/config.json', encoding='UTF-8') as f:
codeData = json.load(f)
MQTT_ENDPOINT = config['mqtt_endpoint']
MQTT_PORT = config['mqtt_port']
MQTT_TOPIC = f"sdtcloud/{codeData['projectcode']}/{codeData['assetcode']}/app/{jsonData['appId']}/data"
MQTT_CERT = f"/etc/sdt/cert/{codeData['projectcode']}-certificate.pem"
MQTT_KEY = f"/etc/sdt/cert/{codeData['projectcode']}-private.pem"
MQTT_ROOT_CA = "/etc/sdt/cert/AmazonRootCA1.pem"
UUID = uuid.uuid4()
received_count = 0
class mqtt_client():
def __init__(self):
self.mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=MQTT_ENDPOINT,
cert_filepath=MQTT_CERT,
pri_key_filepath=MQTT_KEY,
ca_filepath=MQTT_ROOT_CA,
client_id=f'blokworks-client-{UUID}')
self.mqtt_connection.connect()
def publish(self, message):
# self.mqtt_connection.connect()
pub_future, packet_id = self.mqtt_connection.publish(topic=MQTT_TOPIC,
payload=message,
qos=mqtt.QoS.AT_LEAST_ONCE)
# self.mqtt_connection.disconnect()
def subscribe(self):
subscribe_future, packet_id = self.mqtt_connection.subscribe(topic=MQTT_TOPIC,
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_message_received)
subscribe_result = subscribe_future.result()
def disconnect(self):
self.mqtt_connection.disconnect()
def on_message_received(topic, payload, **kwargs):
global received_count
received_count += 1
print(f"{received_count:04d} Received message from topic '{topic}': {payload}")