import ssl import json import time import argparse import sys, signal import gpiod from pymodbus.client import ModbusTcpClient import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT import asyncio, pytz from datetime import datetime import threading, socket def Motor(chip, status, action): if action == 'On': status[0] = 1 else: # action == 'Off' status[0] = 0 chip.set_values(status) def Valve_Vent(chip, status, action): if action == 'On': status[1] = 1 else: # action == 'Off' status[1] = 0 chip.set_values(status) def Valve_MixedWater(chip, status, action): if action == 'On': status[2] = 1 else: # action == 'Off' status[2] = 0 chip.set_values(status) def Valve_PureWater(chip, status, action): if action == 'On': status[3] = 1 else: # action == 'Off' status[3] = 0 chip.set_values(status) def Measure_Weight(client): # print('In') try: result = client.read_holding_registers(1, 1) if result.isError(): print(f'Error: {result}') else: val = result.registers[0] val -= 1000 val /= 1000 # print(f'value: {val}') except Exception as e: pass return val def Calculate_Concentration(weight): global data data['data']['weight'] = weight result = (float(weight) * 1.883239171 * 128.5) - 126.11 # 1000 / 531 = 1.883239171 data['data']['concentration'] = result # print(f'{weight}, {result}') def Set_Zero(client): client.write_coil(1, 1) def Command_Read(): with open('./control.json', 'r') as f: cmd = json.load(f) if cmd['type'] == 'auto': Valve_Vent(chip=output_lines, status=status, action='Off') Motor(chip=output_lines, status=status, action='Off') # set zero # Set_Zero(client=client) time.sleep(5) start = Measure_Weight(client=client) time.sleep(5) # input mixed water Valve_MixedWater(chip=output_lines, status=status, action='On') time.sleep(19) Valve_MixedWater(chip=output_lines, status=status, action='Off') # time.sleep(20) time.sleep(10) # measure weight end = Measure_Weight(client=client) time.sleep(1) Calculate_Concentration(weight=(float(end)-float(start))) # vent mixed water Valve_Vent(chip=output_lines, status=status, action='On') time.sleep(0.5) Motor(chip=output_lines, status=status, action='On') time.sleep(40) Motor(chip=output_lines, status=status, action='Off') time.sleep(0.5) Valve_Vent(chip=output_lines, status=status, action='Off') # input pure water Valve_PureWater(chip=output_lines, status=status, action='On') time.sleep(19) Valve_PureWater(chip=output_lines, status=status, action='Off') time.sleep(0.5) # vent pure water Valve_Vent(chip=output_lines, status=status, action='On') time.sleep(0.5) Motor(chip=output_lines, status=status, action='On') time.sleep(40) Motor(chip=output_lines, status=status, action='Off') time.sleep(0.5) Valve_Vent(chip=output_lines, status=status, action='Off') time.sleep(5) else: # cmd['type'] == 'manual' Motor(chip=output_lines, status=status, action=cmd['device']['motor']['action']) Valve_Vent(chip=output_lines, status=status, action=cmd['device']['vent']['action']) Valve_MixedWater(chip=output_lines, status=status, action=cmd['device']['mixed']['action']) Valve_PureWater(chip=output_lines, status=status, action=cmd['device']['pure']['action']) if cmd['device']['measure']['action'] == 'On': result = Measure_Weight(client=client) Calculate_Concentration(result) if cmd['device']['setzero']['action'] == 'On': Set_Zero(client=client) def connectMQTT(clientID, projectCode): CLIENT_ID = clientID ENDPOINT = "avk03ee629rck-ats.iot.ap-northeast-2.amazonaws.com" PATH_TO_CERTIFICATE = f"/etc/sdt/cert/{projectCode}-certificate.pem" PATH_TO_PRIVATE_KEY = f"/etc/sdt/cert/{projectCode}-private.pem" PATH_TO_AMAZON_ROOT_CA_1 = f"/etc/sdt/cert/AmazonRootCA1.pem" myAWSIoTMQTTClient = AWSIoTPyMQTT.AWSIoTMQTTClient(CLIENT_ID) myAWSIoTMQTTClient.configureEndpoint(ENDPOINT, 8883) myAWSIoTMQTTClient.configureCredentials(PATH_TO_AMAZON_ROOT_CA_1, PATH_TO_PRIVATE_KEY, PATH_TO_CERTIFICATE) myAWSIoTMQTTClient.configureMQTTOperationTimeout(5) myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10) myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing myAWSIoTMQTTClient.configureDrainingFrequency(2) # Draining: 2 Hz return myAWSIoTMQTTClient def publishMsg(mqttClient, topic, msg): # Make the copip3 nnect() call # mqttClient.connect() while True: try: mqttClient.connect() break except Exception as e: print(f'Connection Fail: {e}') continue msg['timestamp'] = int(time.time() * 1000) # Publish message to server desired number of times. # print('Begin Publish') mqttClient.publish(topic=topic, payload=json.dumps(msg), QoS=1) while True: try: mqttClient.disconnect() break except Exception as e: print(f'Disconnection Fail: {e}') continue def runAction(projectCode, assetCode): # Write the app's actions in the "runAction" function. # Connect MQTT Broker # You have to rename client id. There are special rules. # Client Name: "device-app-*" # For Example # 1. device-app-test -> Good # 2. device-app-light-app -> Good # 3. device-test-app -> Bad mqttClient1 = connectMQTT("device-app-test1", projectCode) mqttClient2 = connectMQTT("device-app-test2", projectCode) mqttClient3 = connectMQTT("device-app-test3", projectCode) mqttClient4 = connectMQTT("device-app-test4", projectCode) mqttClient5 = connectMQTT("device-app-test5", projectCode) mqttlist = [mqttClient1, mqttClient2, mqttClient3, mqttClient4, mqttClient5] # If you have config's value, please make config.json file. # - Project Code's variable: projectCode(string) # - Asset Code's variable: assetCode(string) # - You may need it to create a topic. with open('./config.json', encoding='UTF-8') as f: jsonData = json.load(f) topic = f"sdtcloud/{projectCode}/{assetCode}/app/{jsonData['appId']}/data" cnt = 0 while True: start = time.time() Command_Read() # publishMsg(mqttlist[cnt], topic, data) end = time.time() cnt += 1 if cnt == 5: cnt = 0 diff = end - start if diff < 3: time.sleep(3 - diff) def handle_client(conn, ip, port): global data while True: try: recv = conn.recv(100) if not recv: # print(f"Connection with {addr} was reset. Waiting for new connection.") break message = recv.decode().strip() if message[:3] != 'STX' or message[-3:] != 'ETX': err_msg = 'STXERRORETX' conn.sendall(err_msg.encode("utf8")) else: if message[3] == 'R': # Transfer data from SDT to Sampyo now = datetime.now(pytz.timezone('Asia/Seoul')) time_str = now.strftime('%Y%m%d%H%M%S') h_weight = float(data['data']['weight']) h_concentration = float(data['data']['concentration']) data_weight = '{:.3f}'.format(h_weight) data_concent = '{:.3f}'.format(h_concentration) send_msg = 'STX' + time_str + '|' + data_weight + '|' + data_concent + 'ETX' try: with open('./control.json', 'r') as f: cmd = json.load(f) cmd['device']['measure']['action'] = 'On' with open('./control.json', 'w') as f: json.dump(cmd, f, indent=4) conn.sendall(send_msg.encode("utf8")) except Exception as e: err_msg = 'STXERRORETX' conn.sendall(err_msg.encode("utf8")) elif message[3] == 'S': # Start measurement try: with open('./control.json', 'r') as f: cmd = json.load(f) cmd['type'] = 'auto' with open('./control.json', 'w') as f: json.dump(cmd, f, indent=4) send_msg = 'STXOKETX' conn.sendall(send_msg.encode("utf8")) except Exception as e: err_msg = 'STXERRORETX' conn.sendall(err_msg.encode("utf8")) elif message[3] == 'T': # Stop measurement try: with open('./control.json', 'r') as f: cmd = json.load(f) cmd['type'] = 'manual' cmd['device']['measure']['action'] = 'Off' with open('./control.json', 'w') as f: json.dump(cmd, f, indent=4) send_msg = 'STXOKETX' conn.sendall(send_msg.encode("utf8")) except Exception as e: err_msg = 'STXERRORETX' conn.sendall(err_msg.encode("utf8")) else: err_msg = 'STXERRORETX' conn.sendall(err_msg.encode("utf8")) except ConnectionResetError: # print("Connection with " + ip + ":" + port + " was reset. Waiting for new connection.") break # print("Closing the connection") def start_server(): host = "25.7.57.1" port = 5000 soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) soc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: soc.bind((host, port)) except: sys.exit() soc.listen(1) # Only one connection at a time. while True: conn, addr = soc.accept() ip, port = str(addr[0]), str(addr[1]) print("Connected with " + ip + ":" + port) client_handler = threading.Thread(target=handle_client, args=(conn, ip, port)) client_handler.start() soc.close() def exit_handler(signum, frame): Motor(chip=output_lines, status=status, action='Off') Valve_Vent(chip=output_lines, status=status, action='Off') Valve_MixedWater(chip=output_lines, status=status, action='Off') Valve_PureWater(chip=output_lines, status=status, action='Off') client.close() sys.exit(0) if __name__ == "__main__": output_chip = gpiod.chip('gpiochip11') config = gpiod.line_request() config.consumer = 'output' config.request_type = gpiod.line_request.DIRECTION_OUTPUT output_lines = output_chip.get_lines([0, 1, 2, 3, 4, 5, 6, 7]) output_lines.request(config, default_vals=[0, 0, 0, 0, 0, 0, 0, 0]) status = [0, 0, 0, 0, 0, 0, 0, 0] signal.signal(signal.SIGINT, exit_handler) client = ModbusTcpClient('25.7.55.237', 5020) parser = argparse.ArgumentParser() parser.add_argument('-app',help='') args = parser.parse_args() # ROOT_PATH = f'/usr/local/sdt/app/{args.app}' data = { "timestamp": 0, "data":{ "weight": 0, "concentration": 0 } } ## Get ProjectCode and AssetCode with open(f'/etc/sdt/device.config/config.json', encoding='UTF-8') as f: codeData = json.load(f) ## Execution main funcion operation_thread = threading.Thread(target=runAction, args=(codeData["projectcode"], codeData["assetcode"])) operation_thread.start() ## Execution TCP/IP server start_server()