import signal, sys, time, os import serial import asyncio from pymodbus.server.async_io import ModbusTcpServer from pymodbus.datastore import ModbusSequentialDataBlock from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext import argparse async def measurement_weight(port, data): command_hex = '4430314B570D0A' command_bytes = bytes.fromhex(command_hex) while True: try: port.write(command_bytes) time.sleep(0.02) result = port.readline() result = str(result, 'utf-8') result = result.replace(' ', '') result = float(result.replace('\r\n', '')) result *= 1000 print(result) if result < 0.0: result = 0.0 data.setValues(3, 1, [int(result)]) except Exception as e: print(f'measurement_weight: \n{e}') pass await asyncio.sleep(1) async def set_zero(port, data): command_hex = '4430314B5A0D0A' command_bytes = bytes.fromhex(command_hex) while True: try: val = data.getValues(1, 1, count=1) if val[0] == 1: port.write(command_bytes) result = port.readline() data.setValues(1, 1, [0]) else: print(f'set_zero: \n{e}') pass except: pass await asyncio.sleep(1) def exit_handler(signum, frame): print('Stop') sys.exit(0) async def run_server(port, data): # TCP 서버 시작 server = ModbusTcpServer(context, address=("25.7.55.237", 5020)) update_coil_task = asyncio.create_task(measurement_weight(port=port, data=data)) update_hr_task = asyncio.create_task(set_zero(port=port, data=data)) await server.serve_forever() await asyncio.gather(update_coil_task, update_hr_task) if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('-app',help='') args = parser.parse_args() ROOT_PATH = f'/usr/local/sdt/app/{args.app}' # Define serial port serial_port = serial.Serial('/dev/ttyMAX1', 115200, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=1) # For detect Ctrl+C signal.signal(signal.SIGINT, exit_handler) # Create ModbusTCP server store = ModbusSlaveContext( co=ModbusSequentialDataBlock(0, [0] * 100), # Coils hr=ModbusSequentialDataBlock(0, [0] * 100) # Holding Registers ) context = ModbusServerContext(slaves=store, single=True) loop = asyncio.get_event_loop() loop.run_until_complete(run_server(port=serial_port, data=store)) # while True: # start = time.time() # measurement_weight(port=serial_port, data=store) # print(registers[0]) # end = time.time() # diff = end - start # if diff < 1: # time.sleep(1 - diff)