from SDT_Device.Protocol import Protocol import threading import time class CCU(Protocol): MAX_CHANNEL = 20 CMD = { "PULSE_COUNT": 0x00, "COINCIDENCE_COUNT": 0x01, "INPUT_DELAY": 0x10, "PULSE_WIDTH": 0x20, "COINCIDENCE_CH_SELECT": 0x30, "UPDATE_TIMER_PERIOD": 0x40, "UPDATE_TRIG_SELECT": 0x50, "START": 0x60, "COUNTER_MUX": 0x70, "INPUT_CH_MUX": 0x80, "HEART_BIT": 0xF000, } def __init__(self, ip, port): super().__init__(ip, port) self.setDeviceId(self.DEVICE_ID["CCU"]) self.setDeviceSerial(0) self.classGetCallback = self._classGetCallback self.classDataCallback = self._classDataCallback self._getEvent = threading.Event() self._dataEvent = threading.Event() self._readBuffer = [] self._pulseCount = [0 for i in range(20)] self._coincidenceCount = [0 for i in range(20)] self.updateCount = 0 self.updateCoinCount = 0 self.debugCount = 0 def _classGetCallback(self, command, payload): self._readBuffer = payload self._getEvent.set() def _read(self): self._getEvent.wait() self._getEvent.clear() return self._readBuffer def _classDataCallback(self, command, payload): if command == self.CMD["PULSE_COUNT"]: self._pulseCount = payload self.updateCount += 1 elif command == self.CMD["COINCIDENCE_COUNT"]: self._coincidenceCount = payload self.updateCoinCount += 1 self._dataEvent.set() def readCountAllNonBlocking(self): return self._pulseCount, self._coincidenceCount def readCountAllBlocking(self): self._dataEvent.wait() self._dataEvent.clear() return self._pulseCount, self._coincidenceCount def setInputDelay(self, ch, delay): delayList = self.getInputDelayAll() delayList[ch] = delay args = (self.CLASS["SET"], self.CMD["INPUT_DELAY"], delayList) self.send(args) def setInputDelayAll(self, delay: list): args = (self.CLASS["SET"], self.CMD["INPUT_DELAY"], delay) self.send(args) def getInputDelay(self, ch): return self.getInputDelayAll()[ch] def getInputDelayAll(self): args = (self.CLASS["GET"], self.CMD["INPUT_DELAY"]) self.send(args) return self._read() def setPulseWidth(self, ch, width): widthList = self.getPulseWidthAll() widthList[ch] = width args = (self.CLASS["SET"], self.CMD["PULSE_WIDTH"], widthList) self.send(args) def setPulseWidthAll(self, width: list): args = (self.CLASS["SET"], self.CMD["PULSE_WIDTH"], width) self.send(args) def getPulseWidth(self, ch): return self.getPulseWidthAll()[ch] def getPulseWidthAll(self): args = (self.CLASS["GET"], self.CMD["PULSE_WIDTH"]) self.send(args) return self._read() def setCoincidenceChSelect(self, ch, chSelect): coincidenceList = self.getCoincidenceChSelectAll() coincidenceList[ch] = chSelect args = (self.CLASS["SET"], self.CMD["COINCIDENCE_CH_SELECT"], coincidenceList) self.send(args) def setCoincidenceChSelectAll(self, chSelect: list): args = (self.CLASS["SET"], self.CMD["COINCIDENCE_CH_SELECT"], chSelect) self.send(args) def getCoincidenceChSelect(self, ch): return self.getCoincidenceChSelectAll()[ch] def getCoincidenceChSelectAll(self): args = (self.CLASS["GET"], self.CMD["COINCIDENCE_CH_SELECT"]) self.send(args) return self._read() def setUpdateTimerPeriod(self, period): args = (self.CLASS["SET"], self.CMD["UPDATE_TIMER_PERIOD"], [period]) self.send(args) def getUpdateTimerPeriod(self): args = (self.CLASS["GET"], self.CMD["UPDATE_TIMER_PERIOD"]) self.send(args) return self._read()[0] def setTrigSelect(self, trigType): args = (self.CLASS["SET"], self.CMD["UPDATE_TRIG_SELECT"], [trigType]) self.send(args) def getTrigSelect(self): args = (self.CLASS["GET"], self.CMD["UPDATE_TRIG_SELECT"]) self.send(args) return self._read() def setStart(self, enable): args = (self.CLASS["SET"], self.CMD["START"], [enable]) self.send(args) def getStart(self): args = (self.CLASS["GET"], self.CMD["START"]) self.send(args) return self._read() # val = 0 -> use internal pulse window # val = 1 -> use external pulse window def setCounterMux(self, ch, val): valList = self.getCounterMuxAll() valList[ch] = val args = (self.CLASS["SET"], self.CMD["COUNTER_MUX"], valList) self.send(args) def setCounterMuxAll(self, val: list): args = (self.CLASS["SET"], self.CMD["COUNTER_MUX"], val) self.send(args) def getCounterMux(self, ch): return self.getCounterMuxAll()[ch] def getCounterMuxAll(self): args = (self.CLASS["GET"], self.CMD["COUNTER_MUX"]) self.send(args) return self._read() def setInputChMux(self, ch, val): valList = self.getInputChMuxAll() valList[ch] = val args = (self.CLASS["SET"], self.CMD["INPUT_CH_MUX"], valList) self.send(args) def setInputChMuxAll(self, val: list): args = (self.CLASS["SET"], self.CMD["INPUT_CH_MUX"], val) self.send(args) def getInputChMux(self, ch): return self.getInputChMuxAll()[ch] def getInputChMuxAll(self): args = (self.CLASS["GET"], self.CMD["INPUT_CH_MUX"]) self.send(args) return self._read()