Commit message

This commit is contained in:
sdt.tutorial 2024-04-24 11:24:33 +09:00
parent 35c5f9aa8e
commit 4c96bf5c7e
27 changed files with 1426 additions and 0 deletions

15
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,15 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
}
]
}

87
MainWindow.py Normal file
View File

@ -0,0 +1,87 @@
# Form implementation generated from reading ui file '.\MainWindow.ui'
#
# Created by: PyQt6 UI code generator 6.6.1
#
# WARNING: Any manual changes made to this file will be lost when pyuic6 is
# run again. Do not edit this file unless you know what you are doing.
from PyQt6 import QtCore, QtGui, QtWidgets
class Ui_MainWindow(object):
def setupUi(self, MainWindow):
MainWindow.setObjectName("MainWindow")
MainWindow.resize(1251, 650)
self.centralwidget = QtWidgets.QWidget(parent=MainWindow)
self.centralwidget.setObjectName("centralwidget")
self.graphWidget1 = PlotWidget(parent=self.centralwidget)
self.graphWidget1.setGeometry(QtCore.QRect(150, 40, 1061, 561))
self.graphWidget1.setObjectName("graphWidget1")
self.pb_Start1 = QtWidgets.QPushButton(parent=self.centralwidget)
self.pb_Start1.setGeometry(QtCore.QRect(20, 440, 93, 28))
self.pb_Start1.setObjectName("pb_Start1")
self.pb_Stop1 = QtWidgets.QPushButton(parent=self.centralwidget)
self.pb_Stop1.setGeometry(QtCore.QRect(20, 470, 93, 28))
self.pb_Stop1.setObjectName("pb_Stop1")
self.lineEdit_EnableDelayValue = QtWidgets.QLineEdit(parent=self.centralwidget)
self.lineEdit_EnableDelayValue.setGeometry(QtCore.QRect(20, 100, 113, 21))
self.lineEdit_EnableDelayValue.setObjectName("lineEdit_EnableDelayValue")
self.label = QtWidgets.QLabel(parent=self.centralwidget)
self.label.setGeometry(QtCore.QRect(20, 80, 101, 16))
self.label.setObjectName("label")
self.pushButton_EnableDelaySet = QtWidgets.QPushButton(parent=self.centralwidget)
self.pushButton_EnableDelaySet.setGeometry(QtCore.QRect(20, 130, 93, 28))
self.pushButton_EnableDelaySet.setObjectName("pushButton_EnableDelaySet")
self.pushButton_AverageStepSet = QtWidgets.QPushButton(parent=self.centralwidget)
self.pushButton_AverageStepSet.setGeometry(QtCore.QRect(20, 230, 93, 28))
self.pushButton_AverageStepSet.setObjectName("pushButton_AverageStepSet")
self.label_2 = QtWidgets.QLabel(parent=self.centralwidget)
self.label_2.setGeometry(QtCore.QRect(20, 180, 101, 16))
self.label_2.setObjectName("label_2")
self.pushButton_SampleRangeSet = QtWidgets.QPushButton(parent=self.centralwidget)
self.pushButton_SampleRangeSet.setGeometry(QtCore.QRect(20, 330, 93, 28))
self.pushButton_SampleRangeSet.setObjectName("pushButton_SampleRangeSet")
self.lineEdit_SampleRangeValue = QtWidgets.QLineEdit(parent=self.centralwidget)
self.lineEdit_SampleRangeValue.setGeometry(QtCore.QRect(20, 300, 113, 21))
self.lineEdit_SampleRangeValue.setObjectName("lineEdit_SampleRangeValue")
self.label_3 = QtWidgets.QLabel(parent=self.centralwidget)
self.label_3.setGeometry(QtCore.QRect(20, 280, 101, 16))
self.label_3.setObjectName("label_3")
self.comboBox_AverageStepVal = QtWidgets.QComboBox(parent=self.centralwidget)
self.comboBox_AverageStepVal.setGeometry(QtCore.QRect(20, 200, 94, 22))
self.comboBox_AverageStepVal.setEditable(False)
self.comboBox_AverageStepVal.setObjectName("comboBox_AverageStepVal")
self.lineEdit_TriggerCount = QtWidgets.QLineEdit(parent=self.centralwidget)
self.lineEdit_TriggerCount.setEnabled(True)
self.lineEdit_TriggerCount.setGeometry(QtCore.QRect(20, 390, 113, 21))
self.lineEdit_TriggerCount.setReadOnly(True)
self.lineEdit_TriggerCount.setObjectName("lineEdit_TriggerCount")
self.label_4 = QtWidgets.QLabel(parent=self.centralwidget)
self.label_4.setGeometry(QtCore.QRect(20, 370, 101, 16))
self.label_4.setObjectName("label_4")
MainWindow.setCentralWidget(self.centralwidget)
self.menubar = QtWidgets.QMenuBar(parent=MainWindow)
self.menubar.setGeometry(QtCore.QRect(0, 0, 1251, 26))
self.menubar.setObjectName("menubar")
MainWindow.setMenuBar(self.menubar)
self.statusbar = QtWidgets.QStatusBar(parent=MainWindow)
self.statusbar.setObjectName("statusbar")
MainWindow.setStatusBar(self.statusbar)
self.retranslateUi(MainWindow)
QtCore.QMetaObject.connectSlotsByName(MainWindow)
def retranslateUi(self, MainWindow):
_translate = QtCore.QCoreApplication.translate
MainWindow.setWindowTitle(_translate("MainWindow", "MainWindow"))
self.pb_Start1.setText(_translate("MainWindow", "Start"))
self.pb_Stop1.setText(_translate("MainWindow", "Stop"))
self.label.setText(_translate("MainWindow", "Enable Delay "))
self.pushButton_EnableDelaySet.setText(_translate("MainWindow", "set"))
self.pushButton_AverageStepSet.setText(_translate("MainWindow", "set"))
self.label_2.setText(_translate("MainWindow", "Average"))
self.pushButton_SampleRangeSet.setText(_translate("MainWindow", "set"))
self.label_3.setText(_translate("MainWindow", "Sample Range"))
self.label_4.setText(_translate("MainWindow", "Trigger Count"))
from pyqtgraph import PlotWidget

216
MainWindow.ui Normal file
View File

@ -0,0 +1,216 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>MainWindow</class>
<widget class="QMainWindow" name="MainWindow">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>1251</width>
<height>650</height>
</rect>
</property>
<property name="windowTitle">
<string>MainWindow</string>
</property>
<widget class="QWidget" name="centralwidget">
<widget class="PlotWidget" name="graphWidget1" native="true">
<property name="geometry">
<rect>
<x>150</x>
<y>40</y>
<width>1061</width>
<height>561</height>
</rect>
</property>
</widget>
<widget class="QPushButton" name="pb_Start1">
<property name="geometry">
<rect>
<x>20</x>
<y>440</y>
<width>93</width>
<height>28</height>
</rect>
</property>
<property name="text">
<string>Start</string>
</property>
</widget>
<widget class="QPushButton" name="pb_Stop1">
<property name="geometry">
<rect>
<x>20</x>
<y>470</y>
<width>93</width>
<height>28</height>
</rect>
</property>
<property name="text">
<string>Stop</string>
</property>
</widget>
<widget class="QLineEdit" name="lineEdit_EnableDelayValue">
<property name="geometry">
<rect>
<x>20</x>
<y>100</y>
<width>113</width>
<height>21</height>
</rect>
</property>
</widget>
<widget class="QLabel" name="label">
<property name="geometry">
<rect>
<x>20</x>
<y>80</y>
<width>101</width>
<height>16</height>
</rect>
</property>
<property name="text">
<string>Enable Delay </string>
</property>
</widget>
<widget class="QPushButton" name="pushButton_EnableDelaySet">
<property name="geometry">
<rect>
<x>20</x>
<y>130</y>
<width>93</width>
<height>28</height>
</rect>
</property>
<property name="text">
<string>set</string>
</property>
</widget>
<widget class="QPushButton" name="pushButton_AverageStepSet">
<property name="geometry">
<rect>
<x>20</x>
<y>230</y>
<width>93</width>
<height>28</height>
</rect>
</property>
<property name="text">
<string>set</string>
</property>
</widget>
<widget class="QLabel" name="label_2">
<property name="geometry">
<rect>
<x>20</x>
<y>180</y>
<width>101</width>
<height>16</height>
</rect>
</property>
<property name="text">
<string>Average</string>
</property>
</widget>
<widget class="QPushButton" name="pushButton_SampleRangeSet">
<property name="geometry">
<rect>
<x>20</x>
<y>330</y>
<width>93</width>
<height>28</height>
</rect>
</property>
<property name="text">
<string>set</string>
</property>
</widget>
<widget class="QLineEdit" name="lineEdit_SampleRangeValue">
<property name="geometry">
<rect>
<x>20</x>
<y>300</y>
<width>113</width>
<height>21</height>
</rect>
</property>
</widget>
<widget class="QLabel" name="label_3">
<property name="geometry">
<rect>
<x>20</x>
<y>280</y>
<width>101</width>
<height>16</height>
</rect>
</property>
<property name="text">
<string>Sample Range</string>
</property>
</widget>
<widget class="QComboBox" name="comboBox_AverageStepVal">
<property name="geometry">
<rect>
<x>20</x>
<y>200</y>
<width>94</width>
<height>22</height>
</rect>
</property>
<property name="editable">
<bool>false</bool>
</property>
</widget>
<widget class="QLineEdit" name="lineEdit_TriggerCount">
<property name="enabled">
<bool>true</bool>
</property>
<property name="geometry">
<rect>
<x>20</x>
<y>390</y>
<width>113</width>
<height>21</height>
</rect>
</property>
<property name="readOnly">
<bool>true</bool>
</property>
</widget>
<widget class="QLabel" name="label_4">
<property name="geometry">
<rect>
<x>20</x>
<y>370</y>
<width>101</width>
<height>16</height>
</rect>
</property>
<property name="text">
<string>Trigger Count</string>
</property>
</widget>
</widget>
<widget class="QMenuBar" name="menubar">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>1251</width>
<height>26</height>
</rect>
</property>
</widget>
<widget class="QStatusBar" name="statusbar"/>
</widget>
<customwidgets>
<customwidget>
<class>PlotWidget</class>
<extends>QWidget</extends>
<header>pyqtgraph</header>
<container>1</container>
</customwidget>
</customwidgets>
<resources/>
<connections/>
</ui>

185
SDT_Device/CCU.py Normal file
View File

@ -0,0 +1,185 @@
from SDT_Device.Protocol import Protocol
import threading
import time
class CCU(Protocol):
MAX_CHANNEL = 20
CMD = {
"PULSE_COUNT": 0x00,
"COINCIDENCE_COUNT": 0x01,
"INPUT_DELAY": 0x10,
"PULSE_WIDTH": 0x20,
"COINCIDENCE_CH_SELECT": 0x30,
"UPDATE_TIMER_PERIOD": 0x40,
"UPDATE_TRIG_SELECT": 0x50,
"START": 0x60,
"COUNTER_MUX": 0x70,
"INPUT_CH_MUX": 0x80,
"HEART_BIT": 0xF000,
}
def __init__(self, ip, port):
super().__init__(ip, port)
self.setDeviceId(self.DEVICE_ID["CCU"])
self.setDeviceSerial(0)
self.classGetCallback = self._classGetCallback
self.classDataCallback = self._classDataCallback
self._getEvent = threading.Event()
self._dataEvent = threading.Event()
self._readBuffer = []
self._pulseCount = [0 for i in range(20)]
self._coincidenceCount = [0 for i in range(20)]
self.updateCount = 0
self.updateCoinCount = 0
self.debugCount = 0
def _classGetCallback(self, command, payload):
self._readBuffer = payload
self._getEvent.set()
def _read(self):
self._getEvent.wait()
self._getEvent.clear()
return self._readBuffer
def _classDataCallback(self, command, payload):
if command == self.CMD["PULSE_COUNT"]:
self._pulseCount = payload
self.updateCount += 1
elif command == self.CMD["COINCIDENCE_COUNT"]:
self._coincidenceCount = payload
self.updateCoinCount += 1
self._dataEvent.set()
def readCountAllNonBlocking(self):
return self._pulseCount, self._coincidenceCount
def readCountAllBlocking(self):
self._dataEvent.wait()
self._dataEvent.clear()
return self._pulseCount, self._coincidenceCount
def setInputDelay(self, ch, delay):
delayList = self.getInputDelayAll()
delayList[ch] = delay
args = (self.CLASS["SET"], self.CMD["INPUT_DELAY"], delayList)
self.send(args)
def setInputDelayAll(self, delay: list):
args = (self.CLASS["SET"], self.CMD["INPUT_DELAY"], delay)
self.send(args)
def getInputDelay(self, ch):
return self.getInputDelayAll()[ch]
def getInputDelayAll(self):
args = (self.CLASS["GET"], self.CMD["INPUT_DELAY"])
self.send(args)
return self._read()
def setPulseWidth(self, ch, width):
widthList = self.getPulseWidthAll()
widthList[ch] = width
args = (self.CLASS["SET"], self.CMD["PULSE_WIDTH"], widthList)
self.send(args)
def setPulseWidthAll(self, width: list):
args = (self.CLASS["SET"], self.CMD["PULSE_WIDTH"], width)
self.send(args)
def getPulseWidth(self, ch):
return self.getPulseWidthAll()[ch]
def getPulseWidthAll(self):
args = (self.CLASS["GET"], self.CMD["PULSE_WIDTH"])
self.send(args)
return self._read()
def setCoincidenceChSelect(self, ch, chSelect):
coincidenceList = self.getCoincidenceChSelectAll()
coincidenceList[ch] = chSelect
args = (self.CLASS["SET"], self.CMD["COINCIDENCE_CH_SELECT"], coincidenceList)
self.send(args)
def setCoincidenceChSelectAll(self, chSelect: list):
args = (self.CLASS["SET"], self.CMD["COINCIDENCE_CH_SELECT"], chSelect)
self.send(args)
def getCoincidenceChSelect(self, ch):
return self.getCoincidenceChSelectAll()[ch]
def getCoincidenceChSelectAll(self):
args = (self.CLASS["GET"], self.CMD["COINCIDENCE_CH_SELECT"])
self.send(args)
return self._read()
def setUpdateTimerPeriod(self, period):
args = (self.CLASS["SET"], self.CMD["UPDATE_TIMER_PERIOD"], [period])
self.send(args)
def getUpdateTimerPeriod(self):
args = (self.CLASS["GET"], self.CMD["UPDATE_TIMER_PERIOD"])
self.send(args)
return self._read()[0]
def setTrigSelect(self, trigType):
args = (self.CLASS["SET"], self.CMD["UPDATE_TRIG_SELECT"], [trigType])
self.send(args)
def getTrigSelect(self):
args = (self.CLASS["GET"], self.CMD["UPDATE_TRIG_SELECT"])
self.send(args)
return self._read()
def setStart(self, enable):
args = (self.CLASS["SET"], self.CMD["START"], [enable])
self.send(args)
def getStart(self):
args = (self.CLASS["GET"], self.CMD["START"])
self.send(args)
return self._read()
# val = 0 -> use internal pulse window
# val = 1 -> use external pulse window
def setCounterMux(self, ch, val):
valList = self.getCounterMuxAll()
valList[ch] = val
args = (self.CLASS["SET"], self.CMD["COUNTER_MUX"], valList)
self.send(args)
def setCounterMuxAll(self, val: list):
args = (self.CLASS["SET"], self.CMD["COUNTER_MUX"], val)
self.send(args)
def getCounterMux(self, ch):
return self.getCounterMuxAll()[ch]
def getCounterMuxAll(self):
args = (self.CLASS["GET"], self.CMD["COUNTER_MUX"])
self.send(args)
return self._read()
def setInputChMux(self, ch, val):
valList = self.getInputChMuxAll()
valList[ch] = val
args = (self.CLASS["SET"], self.CMD["INPUT_CH_MUX"], valList)
self.send(args)
def setInputChMuxAll(self, val: list):
args = (self.CLASS["SET"], self.CMD["INPUT_CH_MUX"], val)
self.send(args)
def getInputChMux(self, ch):
return self.getInputChMuxAll()[ch]
def getInputChMuxAll(self):
args = (self.CLASS["GET"], self.CMD["INPUT_CH_MUX"])
self.send(args)
return self._read()

148
SDT_Device/DTS.py Normal file
View File

@ -0,0 +1,148 @@
from SDT_Device.Protocol import Protocol
import threading
import copy
import timeit
class DTS(Protocol):
MAX_CHANNEL = 4
CMD = {
"ADC_CH_DATA_BASE": 0x00,
"SYSTEM_RESET": 0x10,
"DEBUG_DATA": 0x20,
"AVG_ENABLE_DELAY": 0x30,
"AVG_SAMPLE_RANGE": 0x31,
"AVG_STEP": 0x32,
"AVG_START": 0x33,
"AVG_TIMER_TICK": 0x34,
"HEART_BIT": 0xF000,
}
def __init__(self, ip, port):
super().__init__(ip, port)
self.setDeviceId(self.DEVICE_ID["DTS"])
self.setDeviceSerial(0)
self.setMaxLen(4 * 65536 + 20)
self.classGetCallback = self._classGetCallback
self.classDataCallback = self._classDataCallback
self.classHeartBitCallback = self._classHeartBitCallback
self._getEvnet = threading.Event()
self._dataEvnet = threading.Event()
self._readBuffer = []
self.adcDataCh0 = []
self.adcDataCh1 = []
self.adcDataCh2 = []
self.adcDataCh3 = []
self.hbCount = 0
self.dataCount = [0 for _ in range(4)]
def _classHeartBitCallback(self):
self.hbCount += 1
def _classGetCallback(self, command, payload):
self._readBuffer = payload
self._getEvnet.set()
def _read(self):
self._getEvnet.wait()
self._getEvnet.clear()
return self._readBuffer
def _classDataCallback(self, command, payload):
if command == (self.CMD["ADC_CH_DATA_BASE"] + 0):
self.adcDataCh0 = payload
self.dataCount[0] += 1
elif command == (self.CMD["ADC_CH_DATA_BASE"] + 1):
self.adcDataCh1 = payload
self.dataCount[1] += 1
elif command == (self.CMD["ADC_CH_DATA_BASE"] + 2):
self.adcDataCh2 = payload
self.dataCount[2] += 1
elif command == (self.CMD["ADC_CH_DATA_BASE"] + 3):
self.adcDataCh3 = payload
self.dataCount[3] += 1
# self._dataEvnet.set()
def ReadAdcData(self, ch):
# self._waitAdcData()
ret = []
if ch == 0:
ret = self.adcDataCh0
elif ch == 1:
ret = self.adcDataCh1
elif ch == 2:
ret = self.adcDataCh2
elif ch == 3:
ret = self.adcDataCh3
return ret
def _waitAdcData(self):
self._dataEvnet.wait()
self._dataEvnet.clear()
def setSystemReset(self): # 1번
args = (self.CLASS["SET"], self.CMD["SYSTEM_RESET"], [1])
self.send(args)
def getSystemReset(self):
args = (self.CLASS["GET"], self.CMD["SYSTEM_RESET"])
self.send(args)
return self._read()[0]
def getDebugData(self):
args = (self.CLASS["GET"], self.CMD["DEBUG_DATA"])
self.send(args)
return self._read()
def setAvgEnableDelay(self, ch, val): # 3번
args = (self.CLASS["SET"], self.CMD["AVG_ENABLE_DELAY"], [ch, val])
self.send(args)
def getAvgEnableDelay(self, ch):
args = (self.CLASS["GET"], self.CMD["AVG_ENABLE_DELAY"], [ch])
self.send(args)
return self._read()[1]
def setAvgSampleRange(self, ch, val): # 4번
args = (self.CLASS["SET"], self.CMD["AVG_SAMPLE_RANGE"], [ch, val])
self.send(args)
def getAvgSampleRange(self, ch):
args = (self.CLASS["GET"], self.CMD["AVG_SAMPLE_RANGE"], [ch])
self.send(args)
return self._read()[1]
def setAvgStep(self, ch, val): # 5번
args = (self.CLASS["SET"], self.CMD["AVG_STEP"], [ch, val])
self.send(args)
def getAvgStep(self, ch):
args = (self.CLASS["GET"], self.CMD["AVG_STEP"], [ch])
self.send(args)
return self._read()[1]
def setAvgStart(self, ch, val): # 2번 변수 바꿔서 6번
args = (self.CLASS["SET"], self.CMD["AVG_START"], [ch, val])
self.send(args)
def getAvgStart(self, ch):
args = (self.CLASS["GET"], self.CMD["AVG_START"], [ch])
self.send(args)
return self._read()[1]
def setTimerTick(self, ch, val): # 7번
args = (self.CLASS["SET"], self.CMD["AVG_TIMER_TICK"], [ch, val])
self.send(args)
def getTimerTick(self, ch):
args = (self.CLASS["GET"], self.CMD["AVG_TIMER_TICK"], [ch])
self.send(args)
return self._read()[1]

266
SDT_Device/Protocol.py Normal file
View File

@ -0,0 +1,266 @@
import struct
import socket
import threading
from enum import Enum
from queue import Queue
class State(Enum):
STX = 0
LEN = 1
DEVICE_ID = 2
DEVICE_SERIAL = 3
CLASS = 4
COMMAND = 5
DATA = 6
ETX = 7
class Packet:
def __init__(self):
self.stx = 0
self.len = 0
self.deviceId = 0
self.deviceSerial = 0
self.classType = 0
self.command = 0
self.data = []
self.etx = 0
class Protocol:
DEVICE_ID = {"CCU": 0x1, "TTMU": 0x2, "PG": 0x3, "QC": 0x4, "DTS": 0x5}
CLASS = {
"DATA": 0x44,
"GET": 0x47,
"SET": 0x53,
"EVENT": 0x45,
"HEART_BIT": 0x48,
}
HEADER_LEN = 20
STX = 0x2
ETX = 0x3
def __init__(self, ip, port):
self.state = State.STX
self.dataIndex = 0
self.deviceId = 0
self.deviceSerial = 0
self.packet = Packet()
self.maxLen = 0
self.recvCount = 0
self.classGetCallback = None
self.classHeartBitCallback = None
self.classDataCallback = None
self.dataQueue = Queue()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((ip, port))
recv_thread = threading.Thread(target=self._recvThread)
recv_thread.daemon = True
recv_thread.start()
parsing_thread = threading.Thread(target=self._parsingThread)
parsing_thread.daemon = True
parsing_thread.start()
def setMaxLen(self, len):
self.maxLen = len
def setDeviceId(self, deviceId):
self.deviceId = deviceId
def getDeviceId(self):
return self.deviceId
def setDeviceSerial(self, deviceSerial):
self.deviceSerial = deviceSerial
def getDeviceSerial(self):
return self.deviceSerial
def send(self, arg):
print("Build: ", self._buildPacket(*arg))
# print("Build: ", int.from_bytes(self._buildPacket(*arg), byteorder='little'))
self.sock.sendall(self._buildPacket(*arg))
def _buildPacket(self, classType, cmd, data=None):
length = self.HEADER_LEN
print(f"Build Packet: {classType} / {cmd}")
payload = []
if data is not None:
payload = data
length = self.HEADER_LEN + 4 * len(payload)
string_format = f"<{7+len(payload)}I"
return struct.pack(
string_format,
self.STX,
length,
self.deviceId,
self.deviceSerial,
classType,
cmd,
*payload,
self.ETX,
)
def parsing(self, word):
ret = False
# print("[TEST] Parsing")
switch_dict = {
State.STX: self.case_stx,
State.LEN: self.case_len,
State.DEVICE_ID: self.case_producId,
State.DEVICE_SERIAL: self.case_productSerial,
State.CLASS: self.case_class,
State.COMMAND: self.case_command,
State.DATA: self.case_data,
State.ETX: self.case_etx,
}
print(f"Parsing {self.state} / {word}")
if self.state in switch_dict:
ret = switch_dict[self.state](word)
else:
self.case_default()
return ret
def case_stx(self, word):
if word == self.STX:
self.dataIndex = 0
self.packet = Packet()
self.packet.stx = word
self.state = State.LEN
return False
def case_len(self, word):
self.packet.len = word
if 20 <= self.packet.len <= self.maxLen:
self.state = State.DEVICE_ID
else:
self.state = State.STX
return False
def case_producId(self, word):
self.packet.deviceId = word
if self.packet.deviceId == self.deviceId:
self.state = State.DEVICE_SERIAL
else:
self.state = State.STX
return False
def case_productSerial(self, word):
self.packet.deviceSerial = word
if self.packet.deviceSerial == self.deviceSerial:
self.state = State.CLASS
else:
self.state = State.STX
return False
def case_class(self, word):
self.packet.classType = word
if self.packet.classType in self.CLASS.values():
self.state = State.COMMAND
else:
self.state = State.STX
return False
def case_command(self, word):
self.packet.command = word
if (self.packet.len - self.HEADER_LEN) == 0:
self.state = State.ETX
elif (self.packet.len - self.HEADER_LEN) > self.maxLen:
self.state = State.STX
else:
self.state = State.DATA
return False
def case_data(self, word):
self.packet.data.append(word)
if (len(self.packet.data) * 4) >= (self.packet.len - self.HEADER_LEN):
self.state = State.ETX
return False
def case_etx(self, word):
ret = False
self.packet.etx = word
self.state = State.STX
if self.packet.etx == self.ETX:
ret = True
return ret
def case_default(self):
self.state = State.STX
def _recvThread(self):
while True:
recvData = self.sock.recv(16384)
print(f"[REcived] {recvData}")
self.recvCount += len(recvData)
self.dataQueue.put(recvData)
def _parsingThread(self):
self.func1()
def func1(self):
oldArray = bytearray()
while True:
newArray = self.dataQueue.get()
oldArray = oldArray + newArray
endIdx = len(oldArray) - len(oldArray) % 4
dataArray = oldArray[:endIdx]
oldArray = oldArray[endIdx:]
for i in range(0, endIdx, 4):
int_value = int.from_bytes(dataArray[i : i + 4], byteorder="little")
print(f"[IntValue] {int_value}")
if self.parsing(int_value) == True:
self.commandProc()
def func2(self):
oldArray = bytearray()
while True:
newArray = self.dataQueue.get()
if len(newArray) % 4 != 0:
oldArray = oldArray + newArray
return 0
else:
oldArray = newArray
string_format = f"<{int(len(oldArray)/4)}I"
intArray = struct.unpack(string_format, oldArray)
for i in range(len(intArray)):
if self.parsing(intArray[i]) == True:
self.commandProc()
def commandProc(self):
classType = self.packet.classType
command = self.packet.command
payload = self.packet.data
if classType == self.CLASS["GET"]:
print("GET")
if self.classGetCallback != None:
self.classGetCallback(command, payload)
elif classType == self.CLASS["HEART_BIT"]:
print("HearBeat")
self._setHeartBit()
if self.classHeartBitCallback != None:
self.classHeartBitCallback()
elif classType == self.CLASS["DATA"]:
print("Data")
if self.classDataCallback != None:
self.classDataCallback(command, payload)
def _setHeartBit(self):
args = (self.CLASS["HEART_BIT"], self.CMD["HEART_BIT"])
print(self.CLASS["HEART_BIT"], "/" , self.CMD["HEART_BIT"])
self.send(args)

View File

@ -0,0 +1,237 @@
from SDT_Device.Protocol import Protocol
import threading
import csv
class PulseGenerator(Protocol):
MAX_CHANNEL = 12
CMD = {
"SESSION_COMPLETE": 0x00,
"DEVICE_RESET": 0x10,
"DEVICE_START": 0x11,
"SESSION_PERIOD": 0x12,
"BASE_CHANNEL_ENABLE": 0x20,
"SESSION_COUNT": 0x30,
"BASE_CHANNEL_DATA_SIZE": 0x40,
"BASE_CHANNEL_SESSION_POSITION": 0x50,
"BASE_CHANNEL_OUTPUT_DELAY": 0x62,
"BASE_CHANNEL_DATA": 0x70,
"HEART_BIT": 0xF000,
}
def __init__(self, ip, port):
super().__init__(ip, port)
self.setDeviceId(self.DEVICE_ID["PG"])
self.setDeviceSerial(0)
self.heartbitCount = 0
self.classGetCallback = self._classGetCallback
self.classHeartBitCallback = self._heartBitCallback
self.classDataCallback = self._dataCallback
self._getEvent = threading.Event()
self._dataEvent = threading.Event()
self._readBuffer = []
def _read(self):
self._getEvent.wait()
self._getEvent.clear()
return self._readBuffer
def _classGetCallback(self, command, payload):
self._readBuffer = payload
self._getEvent.set()
def _heartBitCallback(self):
self.heartbitCount += 1
def _dataCallback(self, command, payload):
if command == self.CMD["SESSION_COMPLETE"]:
self._dataEvent.set()
def WaitUntilSessionIsCompleted(self):
self._dataEvent.wait()
self._dataEvent.clear()
# enable = 0에서 reset 적용, 1에서 해제
def setReset(self, enable):
args = (self.CLASS["SET"], self.CMD["DEVICE_RESET"], [enable])
self.send(args)
def getReset(self):
args = (self.CLASS["GET"], self.CMD["DEVICE_RESET"])
self.send(args)
return self._read()[0]
def setStart(self, enable):
args = (self.CLASS["SET"], self.CMD["DEVICE_START"], [enable])
self.send(args)
def getStart(self):
args = (self.CLASS["GET"], self.CMD["DEVICE_START"])
self.send(args)
return self._read()[0]
def setSessionPeriod(self, period, unit="raw"):
if unit != "raw":
if unit.lower() == "s":
period *= int(1e9)
elif unit.lower() == "ms":
period *= int(1e6)
elif unit.lower() == "us":
period *= int(1e3)
if period < 5:
period = 5
_round = round(period / 5)
else:
_round = period
args = (self.CLASS["SET"], self.CMD["SESSION_PERIOD"], [_round])
self.send(args)
def getSessionPeriod(self, unit="raw"):
args = (self.CLASS["GET"], self.CMD["SESSION_PERIOD"])
self.send(args)
period = self._read()[0]
if unit.lower() == "s":
period /= int(1e9)
elif unit.lower() == "ms":
period /= int(1e6)
elif unit.lower() == "us":
period /= int(1e3)
period *= 5
return period
def setChannelEnable(self, ch, enable):
args = (self.CLASS["SET"], self.CMD["BASE_CHANNEL_ENABLE"] + ch, [enable])
self.send(args)
def getChannelEnable(self, ch):
args = (self.CLASS["GET"], self.CMD["BASE_CHANNEL_ENABLE"] + ch)
self.send(args)
return self._read()[0]
def setSessionCount(self, count):
args = (self.CLASS["SET"], self.CMD["SESSION_COUNT"], [count])
self.send(args)
def getSessionCount(self):
args = (self.CLASS["GET"], self.CMD["SESSION_COUNT"])
self.send(args)
return self._read()[0]
def setChannelDataSize(self, ch, size):
if size >= 2048:
size = 2048
args = (self.CLASS["SET"], self.CMD["BASE_CHANNEL_DATA_SIZE"] + ch, [size])
self.send(args)
def getChannelDataSize(self, ch):
args = (self.CLASS["GET"], self.CMD["BASE_CHANNEL_DATA_SIZE"] + ch)
self.send(args)
return self._read()[0]
def setChannelSessionPosition(self, ch, position):
args = (
self.CLASS["SET"],
self.CMD["BASE_CHANNEL_SESSION_POSITION"] + ch,
[position],
)
self.send(args)
def getChannelSessionPosition(self, ch):
args = (self.CLASS["GET"], self.CMD["BASE_CHANNEL_SESSION_POSITION"] + ch)
self.send(args)
return self._read()[0]
def setChannelOutputDelay(self, ch, delayTime):
args = (
self.CLASS["SET"],
self.CMD["BASE_CHANNEL_OUTPUT_DELAY"] + ch,
[delayTime],
)
self.send(args)
def getChannelOutputDelay(self, ch):
args = (self.CLASS["GET"], self.CMD["BASE_CHANNEL_OUTPUT_DELAY"] + ch)
self.send(args)
return self._read()
def setChannelData(self, ch, data, unit="raw"):
rawData = []
if len(data) > 0:
unitContant = 1
if unit != "raw":
if unit.lower() == "s":
unitContant *= int(1e9)
elif unit.lower() == "ms":
unitContant *= int(1e6)
elif unit.lower() == "us":
unitContant *= int(1e3)
for value in data:
value *= unitContant
if unit != "raw":
if value <= 5:
value = 5
_round = round(value / 5) - 1
else:
_round = value
rawData.append(_round)
args = (
self.CLASS["SET"],
self.CMD["BASE_CHANNEL_DATA"] + ch,
rawData,
)
self.send(args)
def getChannelData(self, ch, unit="raw"):
args = (self.CLASS["GET"], self.CMD["BASE_CHANNEL_DATA"] + ch)
self.send(args)
data_ns = []
rawData = self._read()
unit_const = 1
if unit.lower() == "s":
unit_const /= int(1e9)
elif unit.lower() == "ms":
unit_const /= int(1e6)
elif unit.lower() == "us":
unit_const /= int(1e3)
unit_const *= 5
if len(rawData) > 0:
for value in rawData:
if unit.lower() != "raw":
nano_sec_data = (value + 1) * unit_const
else:
nano_sec_data = value
data_ns.append(round(nano_sec_data, 9))
return data_ns
def readPulseDataFromCSV(self, ch, path):
ret = []
with open(path, newline="") as csvfile:
csvreader = csv.reader(csvfile)
next(csvreader)
for row in csvreader:
if row[ch] != "":
ret.append(int(row[ch]))
else:
continue
return ret

1
SDT_Device/__init__.py Normal file
View File

@ -0,0 +1 @@
version=1.0

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

48
client.py Normal file
View File

@ -0,0 +1,48 @@
import struct
import socket
# 서버의 호스트와 포트 설정
SERVER_HOST = '192.168.0.20'
SERVER_PORT = 5001
HEART_BIT = 0x48
HEART_BIT_CMD = 0xF000
HEADER_LEN = 20
STX = 0x2
ETX = 0x3
# 소켓 생성
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
# 서버에 연결
s.connect((SERVER_HOST, SERVER_PORT))
while True:
# # 서버로부터 데이터 수신
data = s.recv(1024)
print('수신된 데이터:', data)
data = []
length = HEADER_LEN
payload = []
deviceId = 5
deviceSerial = 0
classType = HEART_BIT
cmd = HEART_BIT_CMD
if data is not None:
payload = data
length = HEADER_LEN + 4 * len(payload)
string_format = f"<{7+len(payload)}I"
# 서버에게 메시지 전송
s.sendall(struct.pack(
string_format,
STX,
length,
deviceId,
deviceSerial,
classType,
cmd,
*payload,
ETX,
))
print("전송 완료!!")

12
framework.yaml Normal file
View File

@ -0,0 +1,12 @@
version: bwc/v2 # bwc 버전 정보입니다.
spec:
appName: dts-py-app # 앱의 이름입니다.
runFile: main.py # 앱의 실행 파일입니다.
env:
bin: python3 # 앱을 실행할 바이너라 파일 종류입니다.(장비에 따라 다르므로 확인 후 정의해야 합니다.)
virtualEnv: dts-env # 사용할 가상환경 이름입니다.
runtime: python3.9
package: requirements.txt # 설치할 Python 패키지 정보 파일입니다.(기본 값은 requirement.txt 입니다.)
stackbase:
tagName: v1.0.0 # Stackbase(gitea)에 릴리즈 태그명 입니다.
repoName: dts-py-app # Stackbase(gitea)에 저장될 저장소 이릅니다.

124
main.py Normal file
View File

@ -0,0 +1,124 @@
from SDT_Device.DTS import DTS
import sys
from PyQt6 import QtCore
from PyQt6.QtCore import Qt
from PyQt6.QtWidgets import QApplication, QMainWindow
from MainWindow import Ui_MainWindow
from pyqtgraph import PlotWidget, plot
import pyqtgraph as pg
from random import randint
import threading
import time
import struct
class MainWindow(QMainWindow, Ui_MainWindow):
def __init__(self):
super().__init__()
self.setupUi(self)
self.show()
self.pb_Start1.clicked.connect(self.SendStartPacket1)
self.pb_Stop1.clicked.connect(self.SendStopPacket1)
self.pushButton_EnableDelaySet.clicked.connect(self.SetEnableDelay)
self.pushButton_AverageStepSet.clicked.connect(self.SetAverage)
self.pushButton_SampleRangeSet.clicked.connect(self.SetSampleRange)
self.comboboxMinVal = 8
for i in range(self.comboboxMinVal, 12):
self.comboBox_AverageStepVal.addItem(str(2**i))
ip = "192.168.0.20"
port = 5001
self.dts = DTS(ip, port)
self.dtsInit()
self.x1 = []
self.y1 = []
self.data_line1 = self.graphWidget1.plot(self.x1, self.y1)
self.timer = QtCore.QTimer()
self.timer.setInterval(250)
self.timer.timeout.connect(self.update_plot_data)
self.timer.start()
self.tick = 0
self.prevDataCount = 0
def SetEnableDelay(self):
val = int(self.lineEdit_EnableDelayValue.text())
self.dts.setAvgEnableDelay(0, 10 + val)
self.dts.setAvgStart(0, 0)
self.dts.setAvgStart(0, 1)
def SetAverage(self):
val = self.comboBox_AverageStepVal.currentIndex() + self.comboboxMinVal
self.dts.setAvgStep(0, val)
self.dts.setAvgStart(0, 0)
self.dts.setAvgStart(0, 1)
def SetSampleRange(self):
val = int(self.lineEdit_SampleRangeValue.text())
self.dts.setAvgSampleRange(0, val)
self.dts.setAvgStart(0, 0)
self.dts.setAvgStart(0, 1)
def SendStartPacket1(self):
print("Send Start1")
self.dts.setAvgStart(0, 1)
def SendStopPacket1(self):
print("Send Stop1")
self.dts.setAvgStart(0, 0)
def update_plot_data(self):
self.y1 = self.dts.ReadAdcData(0)
size = len(self.y1)
self.x1 = list(range(size))
self.data_line1.setData(self.x1, self.y1)
print(self.dts.dataCount)
if self.prevDataCount != self.dts.dataCount[0]:
self.prevDataCount = self.dts.dataCount[0]
current_time = time.time()
formatted_time = time.strftime("%H:%M:%S", time.localtime(current_time))
print(f"[{formatted_time}] dataCount={self.dts.dataCount[0]}")
debugData = self.dts.getDebugData()
self.lineEdit_TriggerCount.setText(str(debugData[4]))
def dtsInit(self):
self.dts.setSystemReset()
self.comboBox_AverageStepVal.setCurrentText(f"{self.comboboxMinVal}")
avgStepVal = self.comboBox_AverageStepVal.currentIndex() + self.comboboxMinVal
self.lineEdit_SampleRangeValue.setText("50000")
sampleRangeVal = int(self.lineEdit_SampleRangeValue.text())
self.lineEdit_EnableDelayValue.setText("0")
enableDelayVal = int(self.lineEdit_EnableDelayValue.text())
ch = 0
self.dts.setAvgEnableDelay(ch, 10 + enableDelayVal)
self.dts.setAvgSampleRange(ch, sampleRangeVal)
self.dts.setAvgStep(ch, avgStepVal)
self.dts.setAvgStart(ch, 0)
self.dts.setTimerTick(ch, 100_000_000)
def mainInit():
app = QApplication(sys.argv)
w = MainWindow()
app.exec()
if __name__ == "__main__":
mainInit()
# pyuic6 .\MainWindow.ui -o MainWindow.py

6
requirements.txt Normal file
View File

@ -0,0 +1,6 @@
# Write package's name that need your app.
awsiotsdk
pyyaml
pyqt6
pyqtgraph
#sdtcloudpubsub

26
server.py Normal file
View File

@ -0,0 +1,26 @@
import socket
# 호스트와 포트 설정
HOST = '192.168.1.45'
PORT = 8088
print(f"TEST: {[0, 100_000_000]}")
# 소켓 생성
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
# 소켓을 주소와 연결
s.bind((HOST, PORT))
# 연결 대기
s.listen()
print('서버가 시작되었습니다. 클라이언트 연결을 기다립니다...')
# 클라이언트로부터 연결 수락
conn, addr = s.accept()
with conn:
print('클라이언트와 연결됨:', addr)
# 클라이언트에게 메시지 전송
# conn.sendall(b'안녕하세요, 클라이언트!')
# 클라이언트로부터 데이터 수신
data = conn.recv(16384)
print('수신된 데이터:', data)

55
test.py Normal file
View File

@ -0,0 +1,55 @@
from SDT_Device.DTS import DTS
import time
import sys
import json
import sdtcloudpubsub
# with open("./info.json","r") as f:
# info = f.read()
# ch = info['ch']
# ch = 0
# ip = info['ip']
# port = info['port']
# enableDelayVal = info['enableDelayVal']
# sampleRangeVal = info['sampleRangeVal']
# avgStepVal = info['avgStepVal']
# Default Setup for Test
ch = 0
ip= "192.168.0.20"
port = 5001
enableDelayVal = 10
sampleRangeVal = 100
avgStepVal = 10
sdtcloud = sdtcloudpubsub.sdtcloudpubsub()
mqttClient = sdtcloud.setClient(f"device-app-{uuid.uuid1()}") # parameter is client ID(string)
# Run Start
dts = DTS(ip,port)
dts.setSystemReset()
dts.setAvgStart(ch, 0)
dts.setAvgEnableDelay(ch, 10 + enableDelayVal)
dts.setAvgSampleRange(ch, sampleRangeVal)
dts.setAvgStep(ch, avgStepVal)
# Get Data Start
dts. setAvgStart(ch, 1)
dts.setTimerTick(ch, 100_000_000)
while(1):
print("Run")
y1 = dts.ReadAdcData(0)
print(y1)
msg = {
"sensor_data": y1
}
sdtcloud.pubMessage(mqttClient, msg)
time.sleep(1)