-
Notifications
You must be signed in to change notification settings - Fork 0
/
serialhandler.py
91 lines (71 loc) · 2.94 KB
/
serialhandler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import time
import json
import serial
import serial.tools.list_ports
from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
from PyQt5.QtGui import *
class SerialHandler(QThread):
sign_node = pyqtSignal(int, dict)
port_name: str
def __init__(self, port_name, parent=None):
super(SerialHandler, self).__init__(parent)
self.port_name = port_name
self.working = True
def __del__(self):
# 线程状态改变与线程终止
self.working = False
self.wait()
def run(self):
last_str = '' # 记录上一次收到的字符串
self.com = serial.Serial(self.port_name, 115200)
while self.working == True:
serial_str = self.com.readline().decode('utf-8')
str_list = serial_str.split('&') # 分割字符串
if len(str_list) != 5: # 数据缺失
print('数据缺失!')
continue
if (serial_str != last_str): # 检测数据是否重复
last_str = serial_str
try: # 转换数据
seq: int = int(str_list[0])
humi: float = round(float(str_list[1]), 1)
temp: float = round(float(str_list[2]), 1)
light: float = round(float(str_list[3]), 1)
except Exception:
print('数据错误!')
continue
# 检测数据合理性
if humi > 100 or temp > 50 or light > 100000:
print('数据不合理!')
continue
# 修正读数
config = json.loads(
open('./config.json', 'r', encoding='utf-8').read())
humi += config['node' + str(seq)]['humi']
temp += config['node' + str(seq)]['temp']
light += config['node' + str(seq)]['light']
humi = round(humi, 2)
temp = round(temp, 2)
light = round(light, 2)
print(f"节点:{seq} 湿度:{humi}% 温度:{temp}°C 光照度:{light}lx")
# 写入 json 文件
data = json.loads(
open('./data.json', 'r', encoding='utf-8').read())
data['node' + str(seq)]['humi'] = humi
data['node' + str(seq)]['temp'] = temp
data['node' + str(seq)]['light'] = light
data['time'] = time.strftime(
'%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
with open('./data.json', 'w') as fp:
fp.write(json.dumps(data))
fp.close()
# 获取文本
node_data = {
"seq": seq,
"humi": humi,
"temp": temp,
"light": light
}
# 发射信号
self.sign_node.emit(seq, node_data)