sampyo-dio/test.py

376 lines
12 KiB
Python

import ssl
import json
import time
import argparse
import sys, signal
import gpiod
from pymodbus.client import ModbusTcpClient
import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
import asyncio, pytz
from datetime import datetime
import threading, socket
def Motor(chip, status, action):
if action == 'On':
status[0] = 1
else: # action == 'Off'
status[0] = 0
chip.set_values(status)
def Valve_Vent(chip, status, action):
if action == 'On':
status[1] = 1
else: # action == 'Off'
status[1] = 0
chip.set_values(status)
def Valve_MixedWater(chip, status, action):
if action == 'On':
status[2] = 1
else: # action == 'Off'
status[2] = 0
chip.set_values(status)
def Valve_PureWater(chip, status, action):
if action == 'On':
status[3] = 1
else: # action == 'Off'
status[3] = 0
chip.set_values(status)
def Measure_Weight(client):
# print('In')
try:
result = client.read_holding_registers(1, 1)
if result.isError():
print(f'Error: {result}')
else:
val = result.registers[0]
val -= 1000
val /= 1000
# print(f'value: {val}')
except Exception as e:
pass
return val
def Calculate_Concentration(weight):
global data
data['data']['weight'] = weight
result = (float(weight) * 1.883239171 * 128.5) - 126.11 # 1000 / 531 = 1.883239171
data['data']['concentration'] = result
# print(f'{weight}, {result}')
def Set_Zero(client):
client.write_coil(1, 1)
def Command_Read():
with open('./control.json', 'r') as f:
cmd = json.load(f)
if cmd['type'] == 'auto':
Valve_Vent(chip=output_lines, status=status, action='Off')
Motor(chip=output_lines, status=status, action='Off')
# set zero
# Set_Zero(client=client)
time.sleep(5)
start = Measure_Weight(client=client)
time.sleep(5)
# input mixed water
Valve_MixedWater(chip=output_lines, status=status, action='On')
time.sleep(19)
Valve_MixedWater(chip=output_lines, status=status, action='Off')
# time.sleep(20)
time.sleep(10)
# measure weight
end = Measure_Weight(client=client)
time.sleep(1)
Calculate_Concentration(weight=(float(end)-float(start)))
# vent mixed water
Valve_Vent(chip=output_lines, status=status, action='On')
time.sleep(0.5)
Motor(chip=output_lines, status=status, action='On')
time.sleep(40)
Motor(chip=output_lines, status=status, action='Off')
time.sleep(0.5)
Valve_Vent(chip=output_lines, status=status, action='Off')
# input pure water
Valve_PureWater(chip=output_lines, status=status, action='On')
time.sleep(19)
Valve_PureWater(chip=output_lines, status=status, action='Off')
time.sleep(0.5)
# vent pure water
Valve_Vent(chip=output_lines, status=status, action='On')
time.sleep(0.5)
Motor(chip=output_lines, status=status, action='On')
time.sleep(40)
Motor(chip=output_lines, status=status, action='Off')
time.sleep(0.5)
Valve_Vent(chip=output_lines, status=status, action='Off')
time.sleep(5)
else: # cmd['type'] == 'manual'
Motor(chip=output_lines, status=status, action=cmd['device']['motor']['action'])
Valve_Vent(chip=output_lines, status=status, action=cmd['device']['vent']['action'])
Valve_MixedWater(chip=output_lines, status=status, action=cmd['device']['mixed']['action'])
Valve_PureWater(chip=output_lines, status=status, action=cmd['device']['pure']['action'])
if cmd['device']['measure']['action'] == 'On':
result = Measure_Weight(client=client)
Calculate_Concentration(result)
if cmd['device']['setzero']['action'] == 'On':
Set_Zero(client=client)
def connectMQTT(clientID, projectCode):
CLIENT_ID = clientID
ENDPOINT = "avk03ee629rck-ats.iot.ap-northeast-2.amazonaws.com"
PATH_TO_CERTIFICATE = f"/etc/sdt/cert/{projectCode}-certificate.pem"
PATH_TO_PRIVATE_KEY = f"/etc/sdt/cert/{projectCode}-private.pem"
PATH_TO_AMAZON_ROOT_CA_1 = f"/etc/sdt/cert/AmazonRootCA1.pem"
myAWSIoTMQTTClient = AWSIoTPyMQTT.AWSIoTMQTTClient(CLIENT_ID)
myAWSIoTMQTTClient.configureEndpoint(ENDPOINT, 8883)
myAWSIoTMQTTClient.configureCredentials(PATH_TO_AMAZON_ROOT_CA_1, PATH_TO_PRIVATE_KEY, PATH_TO_CERTIFICATE)
myAWSIoTMQTTClient.configureMQTTOperationTimeout(5)
myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10)
myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing
myAWSIoTMQTTClient.configureDrainingFrequency(2) # Draining: 2 Hz
return myAWSIoTMQTTClient
def publishMsg(mqttClient, topic, msg):
# Make the copip3 nnect() call
# mqttClient.connect()
while True:
try:
mqttClient.connect()
break
except Exception as e:
print(f'Connection Fail: {e}')
continue
msg['timestamp'] = int(time.time() * 1000)
# Publish message to server desired number of times.
# print('Begin Publish')
mqttClient.publish(topic=topic, payload=json.dumps(msg), QoS=1)
while True:
try:
mqttClient.disconnect()
break
except Exception as e:
print(f'Disconnection Fail: {e}')
continue
def runAction(projectCode, assetCode):
# Write the app's actions in the "runAction" function.
# Connect MQTT Broker
# You have to rename client id. There are special rules.
# Client Name: "device-app-*"
# For Example
# 1. device-app-test -> Good
# 2. device-app-light-app -> Good
# 3. device-test-app -> Bad
mqttClient1 = connectMQTT("device-app-test1", projectCode)
mqttClient2 = connectMQTT("device-app-test2", projectCode)
mqttClient3 = connectMQTT("device-app-test3", projectCode)
mqttClient4 = connectMQTT("device-app-test4", projectCode)
mqttClient5 = connectMQTT("device-app-test5", projectCode)
mqttlist = [mqttClient1, mqttClient2, mqttClient3, mqttClient4, mqttClient5]
# If you have config's value, please make config.json file.
# - Project Code's variable: projectCode(string)
# - Asset Code's variable: assetCode(string)
# - You may need it to create a topic.
with open('./config.json', encoding='UTF-8') as f:
jsonData = json.load(f)
topic = f"sdtcloud/{projectCode}/{assetCode}/app/{jsonData['appId']}/data"
cnt = 0
while True:
start = time.time()
Command_Read()
# publishMsg(mqttlist[cnt], topic, data)
end = time.time()
cnt += 1
if cnt == 5:
cnt = 0
diff = end - start
if diff < 3:
time.sleep(3 - diff)
def handle_client(conn, ip, port):
global data
while True:
try:
recv = conn.recv(100)
if not recv:
# print(f"Connection with {addr} was reset. Waiting for new connection.")
break
message = recv.decode().strip()
if message[:3] != 'STX' or message[-3:] != 'ETX':
err_msg = 'STXERRORETX'
conn.sendall(err_msg.encode("utf8"))
else:
if message[3] == 'R': # Transfer data from SDT to Sampyo
now = datetime.now(pytz.timezone('Asia/Seoul'))
time_str = now.strftime('%Y%m%d%H%M%S')
h_weight = float(data['data']['weight'])
h_concentration = float(data['data']['concentration'])
data_weight = '{:.3f}'.format(h_weight)
data_concent = '{:.3f}'.format(h_concentration)
send_msg = 'STX' + time_str + '|' + data_weight + '|' + data_concent + 'ETX'
try:
with open('./control.json', 'r') as f:
cmd = json.load(f)
cmd['device']['measure']['action'] = 'On'
with open('./control.json', 'w') as f:
json.dump(cmd, f, indent=4)
conn.sendall(send_msg.encode("utf8"))
except Exception as e:
err_msg = 'STXERRORETX'
conn.sendall(err_msg.encode("utf8"))
elif message[3] == 'S': # Start measurement
try:
with open('./control.json', 'r') as f:
cmd = json.load(f)
cmd['type'] = 'auto'
with open('./control.json', 'w') as f:
json.dump(cmd, f, indent=4)
send_msg = 'STXOKETX'
conn.sendall(send_msg.encode("utf8"))
except Exception as e:
err_msg = 'STXERRORETX'
conn.sendall(err_msg.encode("utf8"))
elif message[3] == 'T': # Stop measurement
try:
with open('./control.json', 'r') as f:
cmd = json.load(f)
cmd['type'] = 'manual'
cmd['device']['measure']['action'] = 'Off'
with open('./control.json', 'w') as f:
json.dump(cmd, f, indent=4)
send_msg = 'STXOKETX'
conn.sendall(send_msg.encode("utf8"))
except Exception as e:
err_msg = 'STXERRORETX'
conn.sendall(err_msg.encode("utf8"))
else:
err_msg = 'STXERRORETX'
conn.sendall(err_msg.encode("utf8"))
except ConnectionResetError:
# print("Connection with " + ip + ":" + port + " was reset. Waiting for new connection.")
break
# print("Closing the connection")
def start_server():
host = "25.7.57.1"
port = 5000
soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
soc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
soc.bind((host, port))
except:
sys.exit()
soc.listen(1) # Only one connection at a time.
while True:
conn, addr = soc.accept()
ip, port = str(addr[0]), str(addr[1])
print("Connected with " + ip + ":" + port)
client_handler = threading.Thread(target=handle_client, args=(conn, ip, port))
client_handler.start()
soc.close()
def exit_handler(signum, frame):
Motor(chip=output_lines, status=status, action='Off')
Valve_Vent(chip=output_lines, status=status, action='Off')
Valve_MixedWater(chip=output_lines, status=status, action='Off')
Valve_PureWater(chip=output_lines, status=status, action='Off')
client.close()
sys.exit(0)
if __name__ == "__main__":
output_chip = gpiod.chip('gpiochip11')
config = gpiod.line_request()
config.consumer = 'output'
config.request_type = gpiod.line_request.DIRECTION_OUTPUT
output_lines = output_chip.get_lines([0, 1, 2, 3, 4, 5, 6, 7])
output_lines.request(config, default_vals=[0, 0, 0, 0, 0, 0, 0, 0])
status = [0, 0, 0, 0, 0, 0, 0, 0]
signal.signal(signal.SIGINT, exit_handler)
client = ModbusTcpClient('25.7.55.237', 5020)
parser = argparse.ArgumentParser()
parser.add_argument('-app',help='')
args = parser.parse_args()
# ROOT_PATH = f'/usr/local/sdt/app/{args.app}'
data = {
"timestamp": 0,
"data":{
"weight": 0,
"concentration": 0
}
}
## Get ProjectCode and AssetCode
with open(f'/etc/sdt/device.config/config.json', encoding='UTF-8') as f:
codeData = json.load(f)
## Execution main funcion
operation_thread = threading.Thread(target=runAction, args=(codeData["projectcode"], codeData["assetcode"]))
operation_thread.start()
## Execution TCP/IP server
start_server()