dts-py-app/SDT_Device/DTS.py

149 lines
4.4 KiB
Python
Raw Permalink Normal View History

2024-04-24 02:24:33 +00:00
from SDT_Device.Protocol import Protocol
import threading
import copy
import timeit
class DTS(Protocol):
MAX_CHANNEL = 4
CMD = {
"ADC_CH_DATA_BASE": 0x00,
"SYSTEM_RESET": 0x10,
"DEBUG_DATA": 0x20,
"AVG_ENABLE_DELAY": 0x30,
"AVG_SAMPLE_RANGE": 0x31,
"AVG_STEP": 0x32,
"AVG_START": 0x33,
"AVG_TIMER_TICK": 0x34,
"HEART_BIT": 0xF000,
}
def __init__(self, ip, port):
super().__init__(ip, port)
self.setDeviceId(self.DEVICE_ID["DTS"])
self.setDeviceSerial(0)
self.setMaxLen(4 * 65536 + 20)
self.classGetCallback = self._classGetCallback
self.classDataCallback = self._classDataCallback
self.classHeartBitCallback = self._classHeartBitCallback
self._getEvnet = threading.Event()
self._dataEvnet = threading.Event()
self._readBuffer = []
self.adcDataCh0 = []
self.adcDataCh1 = []
self.adcDataCh2 = []
self.adcDataCh3 = []
self.hbCount = 0
self.dataCount = [0 for _ in range(4)]
def _classHeartBitCallback(self):
self.hbCount += 1
def _classGetCallback(self, command, payload):
self._readBuffer = payload
self._getEvnet.set()
def _read(self):
self._getEvnet.wait()
self._getEvnet.clear()
return self._readBuffer
def _classDataCallback(self, command, payload):
if command == (self.CMD["ADC_CH_DATA_BASE"] + 0):
self.adcDataCh0 = payload
self.dataCount[0] += 1
elif command == (self.CMD["ADC_CH_DATA_BASE"] + 1):
self.adcDataCh1 = payload
self.dataCount[1] += 1
elif command == (self.CMD["ADC_CH_DATA_BASE"] + 2):
self.adcDataCh2 = payload
self.dataCount[2] += 1
elif command == (self.CMD["ADC_CH_DATA_BASE"] + 3):
self.adcDataCh3 = payload
self.dataCount[3] += 1
# self._dataEvnet.set()
def ReadAdcData(self, ch):
# self._waitAdcData()
ret = []
if ch == 0:
ret = self.adcDataCh0
elif ch == 1:
ret = self.adcDataCh1
elif ch == 2:
ret = self.adcDataCh2
elif ch == 3:
ret = self.adcDataCh3
return ret
def _waitAdcData(self):
self._dataEvnet.wait()
self._dataEvnet.clear()
def setSystemReset(self): # 1번
args = (self.CLASS["SET"], self.CMD["SYSTEM_RESET"], [1])
self.send(args)
def getSystemReset(self):
args = (self.CLASS["GET"], self.CMD["SYSTEM_RESET"])
self.send(args)
return self._read()[0]
def getDebugData(self):
args = (self.CLASS["GET"], self.CMD["DEBUG_DATA"])
self.send(args)
return self._read()
def setAvgEnableDelay(self, ch, val): # 3번
args = (self.CLASS["SET"], self.CMD["AVG_ENABLE_DELAY"], [ch, val])
self.send(args)
def getAvgEnableDelay(self, ch):
args = (self.CLASS["GET"], self.CMD["AVG_ENABLE_DELAY"], [ch])
self.send(args)
return self._read()[1]
def setAvgSampleRange(self, ch, val): # 4번
args = (self.CLASS["SET"], self.CMD["AVG_SAMPLE_RANGE"], [ch, val])
self.send(args)
def getAvgSampleRange(self, ch):
args = (self.CLASS["GET"], self.CMD["AVG_SAMPLE_RANGE"], [ch])
self.send(args)
return self._read()[1]
def setAvgStep(self, ch, val): # 5번
args = (self.CLASS["SET"], self.CMD["AVG_STEP"], [ch, val])
self.send(args)
def getAvgStep(self, ch):
args = (self.CLASS["GET"], self.CMD["AVG_STEP"], [ch])
self.send(args)
return self._read()[1]
def setAvgStart(self, ch, val): # 2번 변수 바꿔서 6번
args = (self.CLASS["SET"], self.CMD["AVG_START"], [ch, val])
self.send(args)
def getAvgStart(self, ch):
args = (self.CLASS["GET"], self.CMD["AVG_START"], [ch])
self.send(args)
return self._read()[1]
def setTimerTick(self, ch, val): # 7번
args = (self.CLASS["SET"], self.CMD["AVG_TIMER_TICK"], [ch, val])
self.send(args)
def getTimerTick(self, ch):
args = (self.CLASS["GET"], self.CMD["AVG_TIMER_TICK"], [ch])
self.send(args)
return self._read()[1]