dts-py-app/SDT_Device/CCU.py

186 lines
5.8 KiB
Python
Raw Permalink Normal View History

2024-04-24 02:24:33 +00:00
from SDT_Device.Protocol import Protocol
import threading
import time
class CCU(Protocol):
MAX_CHANNEL = 20
CMD = {
"PULSE_COUNT": 0x00,
"COINCIDENCE_COUNT": 0x01,
"INPUT_DELAY": 0x10,
"PULSE_WIDTH": 0x20,
"COINCIDENCE_CH_SELECT": 0x30,
"UPDATE_TIMER_PERIOD": 0x40,
"UPDATE_TRIG_SELECT": 0x50,
"START": 0x60,
"COUNTER_MUX": 0x70,
"INPUT_CH_MUX": 0x80,
"HEART_BIT": 0xF000,
}
def __init__(self, ip, port):
super().__init__(ip, port)
self.setDeviceId(self.DEVICE_ID["CCU"])
self.setDeviceSerial(0)
self.classGetCallback = self._classGetCallback
self.classDataCallback = self._classDataCallback
self._getEvent = threading.Event()
self._dataEvent = threading.Event()
self._readBuffer = []
self._pulseCount = [0 for i in range(20)]
self._coincidenceCount = [0 for i in range(20)]
self.updateCount = 0
self.updateCoinCount = 0
self.debugCount = 0
def _classGetCallback(self, command, payload):
self._readBuffer = payload
self._getEvent.set()
def _read(self):
self._getEvent.wait()
self._getEvent.clear()
return self._readBuffer
def _classDataCallback(self, command, payload):
if command == self.CMD["PULSE_COUNT"]:
self._pulseCount = payload
self.updateCount += 1
elif command == self.CMD["COINCIDENCE_COUNT"]:
self._coincidenceCount = payload
self.updateCoinCount += 1
self._dataEvent.set()
def readCountAllNonBlocking(self):
return self._pulseCount, self._coincidenceCount
def readCountAllBlocking(self):
self._dataEvent.wait()
self._dataEvent.clear()
return self._pulseCount, self._coincidenceCount
def setInputDelay(self, ch, delay):
delayList = self.getInputDelayAll()
delayList[ch] = delay
args = (self.CLASS["SET"], self.CMD["INPUT_DELAY"], delayList)
self.send(args)
def setInputDelayAll(self, delay: list):
args = (self.CLASS["SET"], self.CMD["INPUT_DELAY"], delay)
self.send(args)
def getInputDelay(self, ch):
return self.getInputDelayAll()[ch]
def getInputDelayAll(self):
args = (self.CLASS["GET"], self.CMD["INPUT_DELAY"])
self.send(args)
return self._read()
def setPulseWidth(self, ch, width):
widthList = self.getPulseWidthAll()
widthList[ch] = width
args = (self.CLASS["SET"], self.CMD["PULSE_WIDTH"], widthList)
self.send(args)
def setPulseWidthAll(self, width: list):
args = (self.CLASS["SET"], self.CMD["PULSE_WIDTH"], width)
self.send(args)
def getPulseWidth(self, ch):
return self.getPulseWidthAll()[ch]
def getPulseWidthAll(self):
args = (self.CLASS["GET"], self.CMD["PULSE_WIDTH"])
self.send(args)
return self._read()
def setCoincidenceChSelect(self, ch, chSelect):
coincidenceList = self.getCoincidenceChSelectAll()
coincidenceList[ch] = chSelect
args = (self.CLASS["SET"], self.CMD["COINCIDENCE_CH_SELECT"], coincidenceList)
self.send(args)
def setCoincidenceChSelectAll(self, chSelect: list):
args = (self.CLASS["SET"], self.CMD["COINCIDENCE_CH_SELECT"], chSelect)
self.send(args)
def getCoincidenceChSelect(self, ch):
return self.getCoincidenceChSelectAll()[ch]
def getCoincidenceChSelectAll(self):
args = (self.CLASS["GET"], self.CMD["COINCIDENCE_CH_SELECT"])
self.send(args)
return self._read()
def setUpdateTimerPeriod(self, period):
args = (self.CLASS["SET"], self.CMD["UPDATE_TIMER_PERIOD"], [period])
self.send(args)
def getUpdateTimerPeriod(self):
args = (self.CLASS["GET"], self.CMD["UPDATE_TIMER_PERIOD"])
self.send(args)
return self._read()[0]
def setTrigSelect(self, trigType):
args = (self.CLASS["SET"], self.CMD["UPDATE_TRIG_SELECT"], [trigType])
self.send(args)
def getTrigSelect(self):
args = (self.CLASS["GET"], self.CMD["UPDATE_TRIG_SELECT"])
self.send(args)
return self._read()
def setStart(self, enable):
args = (self.CLASS["SET"], self.CMD["START"], [enable])
self.send(args)
def getStart(self):
args = (self.CLASS["GET"], self.CMD["START"])
self.send(args)
return self._read()
# val = 0 -> use internal pulse window
# val = 1 -> use external pulse window
def setCounterMux(self, ch, val):
valList = self.getCounterMuxAll()
valList[ch] = val
args = (self.CLASS["SET"], self.CMD["COUNTER_MUX"], valList)
self.send(args)
def setCounterMuxAll(self, val: list):
args = (self.CLASS["SET"], self.CMD["COUNTER_MUX"], val)
self.send(args)
def getCounterMux(self, ch):
return self.getCounterMuxAll()[ch]
def getCounterMuxAll(self):
args = (self.CLASS["GET"], self.CMD["COUNTER_MUX"])
self.send(args)
return self._read()
def setInputChMux(self, ch, val):
valList = self.getInputChMuxAll()
valList[ch] = val
args = (self.CLASS["SET"], self.CMD["INPUT_CH_MUX"], valList)
self.send(args)
def setInputChMuxAll(self, val: list):
args = (self.CLASS["SET"], self.CMD["INPUT_CH_MUX"], val)
self.send(args)
def getInputChMux(self, ch):
return self.getInputChMuxAll()[ch]
def getInputChMuxAll(self):
args = (self.CLASS["GET"], self.CMD["INPUT_CH_MUX"])
self.send(args)
return self._read()