dts-py-app/SDT_Device/PulseGenerator.py

238 lines
6.9 KiB
Python
Raw Permalink Normal View History

2024-04-24 02:24:33 +00:00
from SDT_Device.Protocol import Protocol
import threading
import csv
class PulseGenerator(Protocol):
MAX_CHANNEL = 12
CMD = {
"SESSION_COMPLETE": 0x00,
"DEVICE_RESET": 0x10,
"DEVICE_START": 0x11,
"SESSION_PERIOD": 0x12,
"BASE_CHANNEL_ENABLE": 0x20,
"SESSION_COUNT": 0x30,
"BASE_CHANNEL_DATA_SIZE": 0x40,
"BASE_CHANNEL_SESSION_POSITION": 0x50,
"BASE_CHANNEL_OUTPUT_DELAY": 0x62,
"BASE_CHANNEL_DATA": 0x70,
"HEART_BIT": 0xF000,
}
def __init__(self, ip, port):
super().__init__(ip, port)
self.setDeviceId(self.DEVICE_ID["PG"])
self.setDeviceSerial(0)
self.heartbitCount = 0
self.classGetCallback = self._classGetCallback
self.classHeartBitCallback = self._heartBitCallback
self.classDataCallback = self._dataCallback
self._getEvent = threading.Event()
self._dataEvent = threading.Event()
self._readBuffer = []
def _read(self):
self._getEvent.wait()
self._getEvent.clear()
return self._readBuffer
def _classGetCallback(self, command, payload):
self._readBuffer = payload
self._getEvent.set()
def _heartBitCallback(self):
self.heartbitCount += 1
def _dataCallback(self, command, payload):
if command == self.CMD["SESSION_COMPLETE"]:
self._dataEvent.set()
def WaitUntilSessionIsCompleted(self):
self._dataEvent.wait()
self._dataEvent.clear()
# enable = 0에서 reset 적용, 1에서 해제
def setReset(self, enable):
args = (self.CLASS["SET"], self.CMD["DEVICE_RESET"], [enable])
self.send(args)
def getReset(self):
args = (self.CLASS["GET"], self.CMD["DEVICE_RESET"])
self.send(args)
return self._read()[0]
def setStart(self, enable):
args = (self.CLASS["SET"], self.CMD["DEVICE_START"], [enable])
self.send(args)
def getStart(self):
args = (self.CLASS["GET"], self.CMD["DEVICE_START"])
self.send(args)
return self._read()[0]
def setSessionPeriod(self, period, unit="raw"):
if unit != "raw":
if unit.lower() == "s":
period *= int(1e9)
elif unit.lower() == "ms":
period *= int(1e6)
elif unit.lower() == "us":
period *= int(1e3)
if period < 5:
period = 5
_round = round(period / 5)
else:
_round = period
args = (self.CLASS["SET"], self.CMD["SESSION_PERIOD"], [_round])
self.send(args)
def getSessionPeriod(self, unit="raw"):
args = (self.CLASS["GET"], self.CMD["SESSION_PERIOD"])
self.send(args)
period = self._read()[0]
if unit.lower() == "s":
period /= int(1e9)
elif unit.lower() == "ms":
period /= int(1e6)
elif unit.lower() == "us":
period /= int(1e3)
period *= 5
return period
def setChannelEnable(self, ch, enable):
args = (self.CLASS["SET"], self.CMD["BASE_CHANNEL_ENABLE"] + ch, [enable])
self.send(args)
def getChannelEnable(self, ch):
args = (self.CLASS["GET"], self.CMD["BASE_CHANNEL_ENABLE"] + ch)
self.send(args)
return self._read()[0]
def setSessionCount(self, count):
args = (self.CLASS["SET"], self.CMD["SESSION_COUNT"], [count])
self.send(args)
def getSessionCount(self):
args = (self.CLASS["GET"], self.CMD["SESSION_COUNT"])
self.send(args)
return self._read()[0]
def setChannelDataSize(self, ch, size):
if size >= 2048:
size = 2048
args = (self.CLASS["SET"], self.CMD["BASE_CHANNEL_DATA_SIZE"] + ch, [size])
self.send(args)
def getChannelDataSize(self, ch):
args = (self.CLASS["GET"], self.CMD["BASE_CHANNEL_DATA_SIZE"] + ch)
self.send(args)
return self._read()[0]
def setChannelSessionPosition(self, ch, position):
args = (
self.CLASS["SET"],
self.CMD["BASE_CHANNEL_SESSION_POSITION"] + ch,
[position],
)
self.send(args)
def getChannelSessionPosition(self, ch):
args = (self.CLASS["GET"], self.CMD["BASE_CHANNEL_SESSION_POSITION"] + ch)
self.send(args)
return self._read()[0]
def setChannelOutputDelay(self, ch, delayTime):
args = (
self.CLASS["SET"],
self.CMD["BASE_CHANNEL_OUTPUT_DELAY"] + ch,
[delayTime],
)
self.send(args)
def getChannelOutputDelay(self, ch):
args = (self.CLASS["GET"], self.CMD["BASE_CHANNEL_OUTPUT_DELAY"] + ch)
self.send(args)
return self._read()
def setChannelData(self, ch, data, unit="raw"):
rawData = []
if len(data) > 0:
unitContant = 1
if unit != "raw":
if unit.lower() == "s":
unitContant *= int(1e9)
elif unit.lower() == "ms":
unitContant *= int(1e6)
elif unit.lower() == "us":
unitContant *= int(1e3)
for value in data:
value *= unitContant
if unit != "raw":
if value <= 5:
value = 5
_round = round(value / 5) - 1
else:
_round = value
rawData.append(_round)
args = (
self.CLASS["SET"],
self.CMD["BASE_CHANNEL_DATA"] + ch,
rawData,
)
self.send(args)
def getChannelData(self, ch, unit="raw"):
args = (self.CLASS["GET"], self.CMD["BASE_CHANNEL_DATA"] + ch)
self.send(args)
data_ns = []
rawData = self._read()
unit_const = 1
if unit.lower() == "s":
unit_const /= int(1e9)
elif unit.lower() == "ms":
unit_const /= int(1e6)
elif unit.lower() == "us":
unit_const /= int(1e3)
unit_const *= 5
if len(rawData) > 0:
for value in rawData:
if unit.lower() != "raw":
nano_sec_data = (value + 1) * unit_const
else:
nano_sec_data = value
data_ns.append(round(nano_sec_data, 9))
return data_ns
def readPulseDataFromCSV(self, ch, path):
ret = []
with open(path, newline="") as csvfile:
csvreader = csv.reader(csvfile)
next(csvreader)
for row in csvreader:
if row[ch] != "":
ret.append(int(row[ch]))
else:
continue
return ret