865 lines
28 KiB
Python
Executable File
865 lines
28 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
|
||
import os
|
||
import sys
|
||
import json
|
||
import string
|
||
import serial
|
||
import serial.tools.list_ports
|
||
|
||
import tkinter
|
||
import tkgen.gengui
|
||
import tkinter.scrolledtext
|
||
import tkinter.filedialog
|
||
|
||
import threading
|
||
import time
|
||
import datetime
|
||
import queue
|
||
import logging
|
||
from typing import Optional, Dict, Any, List, Union
|
||
|
||
# 设置中文环境
|
||
import _locale
|
||
_locale._getdefaultlocale = (lambda *args: ['zh_CN', 'utf8'])
|
||
|
||
# 配置日志
|
||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 工具函数
|
||
def tsnow() -> int:
|
||
"""获取当前时间戳(毫秒)"""
|
||
return int(time.time() * 1000)
|
||
|
||
def strnow() -> str:
|
||
"""获取当前时间字符串"""
|
||
return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
|
||
|
||
def tohex(data: bytes) -> str:
|
||
"""将字节数据转换为十六进制字符串"""
|
||
return ' '.join(f'{x:02X}' for x in data)
|
||
|
||
def human_string(data: bytes, is_hex: bool = False, encoding: str = 'utf-8') -> str:
|
||
"""将字节数据转换为可读字符串"""
|
||
return tohex(data) if is_hex else data.decode(encoding, 'ignore')
|
||
|
||
def uint16(b: bytes) -> int:
|
||
"""从字节读取无符号16位整数"""
|
||
return b[0] * 256 + b[1]
|
||
|
||
def int16(b: bytes) -> int:
|
||
"""从字节读取有符号16位整数"""
|
||
d = b[0] * 256 + b[1]
|
||
return d - 0x10000 if d >= 0x8000 else d
|
||
|
||
|
||
class ThreadSafeTextHandler:
|
||
"""线程安全的文本处理器"""
|
||
|
||
def __init__(self, text_widget):
|
||
self.text_widget = text_widget
|
||
self.message_queue = queue.Queue()
|
||
self.update_interval = 100 # 毫秒
|
||
self.max_lines = 10000 # 最大行数限制
|
||
self._start_updater()
|
||
|
||
def _start_updater(self):
|
||
"""启动UI更新器"""
|
||
self._update_text()
|
||
|
||
def _update_text(self):
|
||
"""从队列中获取消息并更新UI"""
|
||
try:
|
||
while True:
|
||
try:
|
||
message = self.message_queue.get_nowait()
|
||
self._safe_insert(message)
|
||
except queue.Empty:
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"更新文本时出错: {e}")
|
||
|
||
# 继续定时更新
|
||
self.text_widget.after(self.update_interval, self._update_text)
|
||
|
||
def _safe_insert(self, message: str):
|
||
"""安全插入文本"""
|
||
try:
|
||
# 检查并限制文本长度
|
||
lines = int(self.text_widget.index('end-1c').split('.')[0])
|
||
if lines > self.max_lines:
|
||
self.text_widget.delete('1.0', f'{lines - self.max_lines // 2}.0')
|
||
|
||
self.text_widget.insert('end', message)
|
||
self.text_widget.see('end')
|
||
except Exception as e:
|
||
logger.error(f"插入文本时出错: {e}")
|
||
|
||
def put_message(self, message: str):
|
||
"""向队列中添加消息"""
|
||
try:
|
||
self.message_queue.put(message)
|
||
except Exception as e:
|
||
logger.error(f"添加消息到队列时出错: {e}")
|
||
|
||
|
||
class UIProcessor:
|
||
"""UI处理器"""
|
||
|
||
def __init__(self, app):
|
||
self.root = app
|
||
self._setup_widgets()
|
||
self._setup_variables()
|
||
self._bind_events()
|
||
|
||
# 初始化文本处理器
|
||
self.text_handler = ThreadSafeTextHandler(self.text_recv)
|
||
|
||
logger.info("UI处理器初始化完成")
|
||
|
||
def _setup_widgets(self):
|
||
"""设置UI组件"""
|
||
# 串口相关
|
||
self.combobox_port = self.root.get('cbbox-com')
|
||
self.entry_baud = self.root.get('entry-baud')
|
||
self.btn_onoff = self.root.get('btn-onoff')
|
||
self.canvas_led = self.root.get('canvas-led')
|
||
|
||
# 数据显示相关
|
||
self.text_recv = self.root.get('text-recv')
|
||
self.label_status = self.root.get('label-status')
|
||
|
||
# 复选框
|
||
self.ckbtn_shex = self.root.get('ckbtn-shex')
|
||
self.ckbtn_rhex = self.root.get('ckbtn-rhex')
|
||
self.ckbtn_sendshow = self.root.get('ckbtn-sendshow')
|
||
self.ckbtn_time = self.root.get('ckbtn-time')
|
||
self.ckbtn_split = self.root.get('ckbtn-split')
|
||
self.ckbtn_0d = self.root.get('ckbtn-0d')
|
||
self.ckbtn_0a = self.root.get('ckbtn-0a')
|
||
self.ckbtn_cycle = self.root.get('ckbtn-cycle')
|
||
|
||
# 输入框
|
||
self.entry_split = self.root.get('entry-split')
|
||
self.entry_cycle = self.root.get('entry-cycle')
|
||
self.entry_encoding = self.root.get('entry-encoding')
|
||
self.entry_sendText = self.root.get('entry-sendText')
|
||
|
||
def _setup_variables(self):
|
||
"""设置变量"""
|
||
self.last_recv_ticks = 0
|
||
self.last_recv_data = b''
|
||
self.wait_send_data = {'text': b'', 'rts': None, 'dtr': None}
|
||
|
||
# 状态标签变量
|
||
self.status_var = tkinter.StringVar()
|
||
self.label_status.config(textvariable=self.status_var)
|
||
|
||
def _bind_events(self):
|
||
"""绑定事件"""
|
||
self.combobox_port.bind("<<ComboboxSelected>>",
|
||
lambda e: self.log(f"选择端口: {self.combobox_port.get()}"))
|
||
|
||
def set_send_data(self, text: Optional[str] = None, encoding: Optional[str] = None,
|
||
hex_flag: Optional[bool] = None, rts: Optional[bool] = None,
|
||
dtr: Optional[bool] = None):
|
||
"""设置发送数据"""
|
||
if text is not None:
|
||
self.entry_sendText.var.set(text)
|
||
if encoding is not None:
|
||
self.entry_encoding.var.set(encoding)
|
||
if hex_flag is not None:
|
||
self.ckbtn_shex.var.set(1 if hex_flag else 0)
|
||
|
||
self.wait_send_data.update({
|
||
'rts': rts,
|
||
'dtr': dtr
|
||
})
|
||
|
||
def get_send_data(self, cache: bool = True) -> Dict[str, Any]:
|
||
"""获取发送数据"""
|
||
if not cache and self.root.pack:
|
||
logger.info(f"发送数据包: {self.root.pack}")
|
||
self.set_send_data(**self.root.pack)
|
||
self.root.pack = None
|
||
|
||
try:
|
||
data = self.entry_sendText.var.get()
|
||
encoding = self.entry_encoding.var.get()
|
||
|
||
# 处理十六进制数据
|
||
if self.ckbtn_shex.var.get():
|
||
dat = bytes.fromhex(data) if data.strip() else b''
|
||
else:
|
||
dat = data.encode(encoding, 'ignore')
|
||
|
||
# 添加行结束符
|
||
if self.ckbtn_0d.var.get():
|
||
dat += b'\r'
|
||
if self.ckbtn_0a.var.get():
|
||
dat += b'\n'
|
||
|
||
self.wait_send_data['text'] = dat
|
||
except Exception as e:
|
||
logger.error(f"处理发送数据时出错: {e}")
|
||
self.wait_send_data['text'] = b''
|
||
return self.wait_send_data
|
||
|
||
def dmesg(self, category: str, data: bytes):
|
||
"""显示消息(线程安全)"""
|
||
try:
|
||
if category == 'send' and not self.ckbtn_sendshow.var.get():
|
||
return
|
||
|
||
timestamp = f"[{strnow()}] " if self.ckbtn_time.var.get() else ""
|
||
encoding = self.entry_encoding.var.get()
|
||
|
||
if category == 'send':
|
||
prefix = "> " if self.ckbtn_time.var.get() else ""
|
||
content = human_string(data, self.ckbtn_shex.var.get(), encoding)
|
||
else: # recv
|
||
prefix = "< " if self.ckbtn_time.var.get() else ""
|
||
content = human_string(data, self.ckbtn_rhex.var.get(), encoding)
|
||
for cb in self.root.unpack.values():
|
||
try:
|
||
if cb: content += eval(cb['value'],{'data':data, 'uint16':uint16,'int16':int16}) or ''
|
||
except: pass
|
||
|
||
message = f"\n{timestamp}{prefix}{content}"
|
||
self.text_handler.put_message(message)
|
||
|
||
except Exception as e:
|
||
logger.error(f"显示消息时出错: {e}")
|
||
|
||
def save_current_config(self):
|
||
"""保存当前配置"""
|
||
try:
|
||
# 保存所有相关配置
|
||
configs_to_save = [
|
||
'entry-split', 'entry-cycle', 'entry-baud',
|
||
'entry-encoding', 'entry-uservar'
|
||
]
|
||
|
||
for config_key in configs_to_save:
|
||
self.save_config(config_key)
|
||
|
||
logger.info("当前配置已保存")
|
||
|
||
except Exception as e:
|
||
logger.error(f"保存当前配置时出错: {e}")
|
||
|
||
def serial_open(self):
|
||
"""串口打开时的UI更新"""
|
||
self.entry_baud.configure(state='disabled')
|
||
self.combobox_port.configure(state='disabled')
|
||
self.btn_onoff.configure(text='关闭串口')
|
||
self.canvas_led.create_oval(4, 4, 19, 19, fill='lightgreen')
|
||
|
||
def serial_close(self):
|
||
"""串口关闭时的UI更新"""
|
||
self.entry_baud.configure(state='normal')
|
||
self.combobox_port.configure(state='normal')
|
||
self.btn_onoff.configure(text='打开串口')
|
||
self.canvas_led.create_oval(4, 4, 19, 19, fill='red')
|
||
|
||
def read_serial_port(self) -> str:
|
||
"""读取选择的串口"""
|
||
return self.combobox_port.get()
|
||
|
||
def read_serial_baud(self) -> int:
|
||
"""读取波特率"""
|
||
try:
|
||
return int(self.entry_baud.var.get())
|
||
except ValueError:
|
||
return 9600
|
||
|
||
def set_serial_port_list(self, port_list: List[Any]):
|
||
"""设置串口列表"""
|
||
current_text = self.combobox_port.get()
|
||
ports = [str(port[0]) for port in port_list]
|
||
|
||
self.combobox_port['values'] = ports
|
||
|
||
if current_text in ports:
|
||
self.combobox_port.set(current_text)
|
||
elif ports:
|
||
self.combobox_port.set(ports[-1])
|
||
|
||
def clear_recv_text(self):
|
||
"""清空接收文本"""
|
||
self.text_recv.delete('1.0', 'end')
|
||
|
||
def save_recv_text(self):
|
||
"""保存接收文本"""
|
||
try:
|
||
filename = filedialog.asksaveasfilename(
|
||
defaultextension='.txt',
|
||
initialfile=f'scommlog-{tsnow()}'
|
||
)
|
||
if filename:
|
||
with open(filename, 'w', encoding='utf-8') as f:
|
||
f.write(self.text_recv.get('1.0', 'end'))
|
||
self.log(f"文件已保存: {filename}")
|
||
except Exception as e:
|
||
logger.error(f"保存文件时出错: {e}")
|
||
self.log(f"保存失败: {e}")
|
||
|
||
def should_send_cycle(self) -> bool:
|
||
"""检查是否应该循环发送"""
|
||
return self.ckbtn_cycle.var.get()
|
||
|
||
def get_cycle_interval(self) -> float:
|
||
"""获取循环发送间隔(秒)"""
|
||
try:
|
||
cycle_text = self.entry_cycle.var.get().replace('ms', '')
|
||
return max(0.01, float(cycle_text) / 1000.0) # 最小间隔10ms
|
||
except ValueError:
|
||
return 1.0
|
||
|
||
def log(self, message: str):
|
||
"""记录状态消息"""
|
||
logger.info(message)
|
||
display_msg = message if len(message) <= 64 else message[:64] + ' ...'
|
||
self.status_var.set(display_msg)
|
||
|
||
def save_config(self, key: str, value: Any = None):
|
||
"""保存配置"""
|
||
try:
|
||
if value is None:
|
||
# 从控件获取值
|
||
value = self.root.get(key).var.get()
|
||
config_key = key.split('-')[-1]
|
||
else:
|
||
config_key, value = key, value
|
||
|
||
if self.root.usercfg.get(config_key) == value:
|
||
return
|
||
|
||
self.root.usercfg[config_key] = value
|
||
|
||
with open('usercfg.json', 'w', encoding='utf-8') as f:
|
||
json.dump(self.root.usercfg, f, indent=4, ensure_ascii=False)
|
||
|
||
except Exception as e:
|
||
logger.error(f"保存配置时出错: {e}")
|
||
|
||
|
||
class SerialCommunicator:
|
||
"""串口通信器"""
|
||
|
||
def __init__(self, app):
|
||
self.ui = UIProcessor(app)
|
||
self.com = serial.Serial()
|
||
self.threads = []
|
||
self.running = threading.Event()
|
||
self.data_ready = threading.Event() # 用于发送线程
|
||
self.shutdown_event = threading.Event()
|
||
|
||
# 统计信息
|
||
self.send_count = 0
|
||
self.recv_count = 0
|
||
|
||
logger.info("串口通信器初始化完成")
|
||
|
||
def detect_serial_ports(self):
|
||
"""检测串口"""
|
||
if not hasattr(self, '_detecting') or not self._detecting:
|
||
self._detecting = True
|
||
thread = threading.Thread(target=self._detect_ports_process, daemon=True)
|
||
thread.start()
|
||
self.threads.append(thread)
|
||
|
||
def _detect_ports_process(self):
|
||
"""检测串口进程"""
|
||
try:
|
||
port_list = list(serial.tools.list_ports.comports())
|
||
if port_list:
|
||
self.ui.log(f'发现 {len(port_list)} 个串口')
|
||
self.ui.set_serial_port_list(port_list)
|
||
except Exception as e:
|
||
logger.error(f"检测串口时出错: {e}")
|
||
finally:
|
||
self._detecting = False
|
||
|
||
def open_close_serial(self):
|
||
"""打开/关闭串口"""
|
||
self.ui.log('串口操作中...')
|
||
thread = threading.Thread(target=self._open_close_process, daemon=True)
|
||
thread.start()
|
||
self.threads.append(thread)
|
||
|
||
def _open_close_process(self):
|
||
"""打开/关闭串口进程"""
|
||
try:
|
||
if self.com.is_open:
|
||
# 关闭串口
|
||
self._stop_communication()
|
||
self.com.close()
|
||
self.ui.serial_close()
|
||
self.ui.log(f'{self.com.port}: 已关闭')
|
||
else:
|
||
# 打开串口
|
||
if self._open_serial():
|
||
self._start_communication()
|
||
self.ui.save_current_config()
|
||
except Exception as e:
|
||
logger.error(f"串口操作出错: {e}")
|
||
self.ui.log(f'串口操作失败: {e}')
|
||
|
||
def _open_serial(self) -> bool:
|
||
"""打开串口"""
|
||
try:
|
||
self.com.port = self.ui.read_serial_port()
|
||
self.com.baudrate = self.ui.read_serial_baud()
|
||
self.com.bytesize = 8
|
||
self.com.parity = 'N'
|
||
self.com.stopbits = 1
|
||
self.com.timeout = 0.1 # 设置超时避免阻塞
|
||
self.com.write_timeout = 1
|
||
|
||
self.com.open()
|
||
self.ui.serial_open()
|
||
self.ui.log(f'{self.com.port}: 打开成功')
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"打开串口失败: {e}")
|
||
self.com.close()
|
||
self.ui.serial_close()
|
||
self.ui.log(f'{self.com.port}: 打开失败 - {e}')
|
||
return False
|
||
|
||
def _start_communication(self):
|
||
"""启动通信线程"""
|
||
self.running.set()
|
||
self.shutdown_event.clear()
|
||
self.data_ready.clear()
|
||
|
||
# 启动接收线程
|
||
recv_thread = threading.Thread(target=self._receive_loop, daemon=True)
|
||
recv_thread.start()
|
||
self.threads.append(recv_thread)
|
||
|
||
# 启动发送线程
|
||
send_thread = threading.Thread(target=self._send_loop, daemon=True)
|
||
send_thread.start()
|
||
self.threads.append(send_thread)
|
||
|
||
def _stop_communication(self):
|
||
"""停止通信线程"""
|
||
self.running.clear()
|
||
self.shutdown_event.set()
|
||
self.data_ready.set() # 唤醒发送线程
|
||
|
||
# 等待线程结束
|
||
for thread in self.threads[:]:
|
||
if thread.is_alive():
|
||
thread.join(timeout=1.0)
|
||
|
||
self.threads.clear()
|
||
|
||
def _receive_loop(self):
|
||
"""接收数据循环"""
|
||
buffer = b''
|
||
last_data_time = time.time() # 记录最后接收数据的时间
|
||
|
||
while self.running.is_set():
|
||
try:
|
||
if self.com.is_open:
|
||
# 使用带超时的读取,避免阻塞
|
||
data = self.com.read(self.com.in_waiting or 1)
|
||
if data:
|
||
buffer += data
|
||
self.recv_count += len(data)
|
||
last_data_time = time.time() # 更新最后接收时间
|
||
|
||
# 检查是否需要分帧
|
||
current_time = time.time()
|
||
time_since_last_data = (current_time - last_data_time) * 1000 # 转换为毫秒
|
||
|
||
# 获取分帧间隔设置
|
||
try:
|
||
split_interval = float(self.ui.entry_split.var.get().replace('ms', ''))
|
||
except ValueError:
|
||
split_interval = 100 # 默认100ms
|
||
|
||
# 如果缓冲区有数据且超过分帧间隔,或者缓冲区数据量很大
|
||
if buffer and (time_since_last_data >= split_interval or len(buffer) > 1024):
|
||
self.ui.dmesg('recv', buffer)
|
||
self.ui.log(f'{self.com.port}: 接收 {len(buffer)} 字节')
|
||
buffer = b'' # 清空缓冲区
|
||
|
||
# 使用 Event.wait 代替 sleep,可以及时响应停止事件
|
||
self.running.wait(0.01) # 等待10ms或直到running被清除
|
||
|
||
except Exception as e:
|
||
logger.error(f"接收数据错误: {e}")
|
||
# 出错时等待100ms,但可以响应停止事件
|
||
self.running.wait(0.1)
|
||
|
||
def _send_loop(self):
|
||
"""发送数据循环"""
|
||
while self.running.is_set():
|
||
try:
|
||
if self.com.is_open and self.ui.should_send_cycle():
|
||
cycle_interval = self.ui.get_cycle_interval()
|
||
|
||
# 使用 Event.wait 代替 sleep,可以及时响应停止事件
|
||
self.data_ready.wait(cycle_interval)
|
||
self._send_data()
|
||
if self.data_ready.is_set():
|
||
self.data_ready.clear()
|
||
else:
|
||
# 等待数据准备好或停止事件,最多等待100ms
|
||
self.data_ready.wait(0.1)
|
||
if self.data_ready.is_set():
|
||
self._send_data()
|
||
self.data_ready.clear()
|
||
|
||
except Exception as e:
|
||
logger.error(f"发送循环错误: {e}")
|
||
self.running.wait(0.1) # 出错时等待,但可以响应停止事件
|
||
|
||
def send_data(self):
|
||
"""发送数据"""
|
||
if not self.com.is_open:
|
||
self.ui.log('串口未打开')
|
||
return
|
||
|
||
# 设置数据准备事件,唤醒发送线程
|
||
self.data_ready.set()
|
||
# 手动发送时不操作dtr/rts
|
||
self.ui.set_send_data(rts=None, dtr=None)
|
||
|
||
def _send_data(self):
|
||
"""实际发送数据"""
|
||
try:
|
||
data_info = self.ui.get_send_data(cache=False)
|
||
|
||
# 设置RTS/DTR
|
||
if data_info['rts'] is not None:
|
||
self.com.rts = data_info['rts']
|
||
self.ui.log(f'{self.com.port}: RTS = {data_info["rts"]}')
|
||
|
||
if data_info['dtr'] is not None:
|
||
self.com.dtr = data_info['dtr']
|
||
self.ui.log(f'{self.com.port}: DTR = {data_info["dtr"]}')
|
||
|
||
# 发送数据
|
||
data = data_info['text']
|
||
if data:
|
||
self.com.write(data)
|
||
self.send_count += len(data)
|
||
self.ui.log(f'{self.com.port}: 发送 {len(data)} 字节')
|
||
self.ui.dmesg('send', data)
|
||
|
||
except Exception as e:
|
||
logger.error(f"发送数据错误: {e}")
|
||
self.ui.log(f'发送失败: {e}')
|
||
|
||
def clear_window(self):
|
||
"""清空窗口"""
|
||
self.ui.clear_recv_text()
|
||
|
||
def save_file(self):
|
||
"""保存文件"""
|
||
self.ui.save_recv_text()
|
||
|
||
def safe_exit(self):
|
||
"""安全退出"""
|
||
logger.info("正在退出应用程序...")
|
||
|
||
# 停止所有线程
|
||
self.running.clear()
|
||
self.shutdown_event.set()
|
||
self.data_ready.set() # 唤醒可能等待的发送线程
|
||
|
||
# 关闭串口
|
||
if self.com.is_open:
|
||
try:
|
||
self.com.close()
|
||
logger.info("串口已关闭")
|
||
except Exception as e:
|
||
logger.error(f"关闭串口时出错: {e}")
|
||
|
||
# 等待一段时间让线程结束
|
||
self.shutdown_event.wait(1.0)
|
||
|
||
# 强制退出
|
||
sys.exit(0)
|
||
|
||
|
||
class TopWindow:
|
||
"""顶层窗口管理器"""
|
||
|
||
def __init__(self, root):
|
||
self.root = root
|
||
self.root.unpack = {}
|
||
self.root.pack = None
|
||
|
||
self.win_data = None
|
||
self.win_unpack = None
|
||
|
||
logger.info("顶层窗口管理器初始化完成")
|
||
|
||
def set_send_data(self, btn_name: str):
|
||
"""设置发送数据"""
|
||
config = self.root.usercfg.get(btn_name, {})
|
||
if not config:
|
||
return
|
||
|
||
try:
|
||
value = config.get('value', '')
|
||
|
||
# 尝试评估表达式
|
||
try:
|
||
user_vars = self.root.get('entry-uservar').var.get().split(',')
|
||
value = eval(value, {"data": user_vars})
|
||
except:
|
||
pass
|
||
|
||
# 准备发送数据包
|
||
if isinstance(value, dict):
|
||
self.root.pack = {
|
||
'text': value.get('text', ''),
|
||
'encoding': value.get('encoding'),
|
||
'hex_flag': value.get('hex'),
|
||
'rts': value.get('rts'),
|
||
'dtr': value.get('dtr')
|
||
}
|
||
else:
|
||
self.root.pack = {
|
||
'text': str(value),
|
||
'hex_flag': config.get('hex')
|
||
}
|
||
|
||
# 触发发送
|
||
self.root.get('btn-send').invoke()
|
||
|
||
except Exception as e:
|
||
logger.error(f"设置发送数据时出错: {e}")
|
||
|
||
def set_unpack(self, btn_name: str):
|
||
"""设置解析脚本"""
|
||
self.root.unpack[btn_name] = (
|
||
self.root.get(btn_name).var.get()
|
||
and self.root.usercfg.get(btn_name)
|
||
or None
|
||
)
|
||
|
||
def save_config(self, btn_name: str, data: Dict[str, Any]):
|
||
"""保存配置"""
|
||
self.root.save_cfg(btn_name, data)
|
||
self.root.get(btn_name).configure(text=data.get('title', btn_name))
|
||
|
||
def show_data_window(self, event):
|
||
"""显示数据配置窗口"""
|
||
if self.win_data:
|
||
self.win_data.destroy()
|
||
|
||
self.win_data = self.root.toplevel('data.ui', title='预置数据')
|
||
self.win_data.configure(bg='#e8e8e8')
|
||
|
||
btn_name = event.widget._name
|
||
config = self.root.usercfg.get(btn_name, {})
|
||
|
||
# 设置控件值
|
||
self.root.entry('entry-dfile').set(config.get('title', btn_name))
|
||
self.root.get('text-dsetting').delete('1.0', 'end')
|
||
self.root.get('text-dsetting').insert('end', config.get('value', ''))
|
||
self.root.checkbox('ckbtn-dhex').set(config.get('hex', 0))
|
||
|
||
# 绑定保存按钮
|
||
self.root.button('btn-dsave',
|
||
cmd=lambda: self._save_data_config(btn_name),
|
||
focus=True)
|
||
|
||
def _save_data_config(self, btn_name: str):
|
||
"""保存数据配置"""
|
||
try:
|
||
data = {
|
||
'title': self.root.get('entry-dfile').var.get(),
|
||
'value': self.root.get('text-dsetting').get('1.0', 'end-1c'),
|
||
'hex': self.root.get('ckbtn-dhex').var.get()
|
||
}
|
||
self.save_config(btn_name, data)
|
||
|
||
if self.win_data:
|
||
self.win_data.destroy()
|
||
self.win_data = None
|
||
|
||
except Exception as e:
|
||
logger.error(f"保存数据配置时出错: {e}")
|
||
|
||
def show_unpack_window(self, event):
|
||
"""显示解析脚本窗口"""
|
||
if self.win_unpack:
|
||
self.win_unpack.destroy()
|
||
|
||
self.win_unpack = self.root.toplevel('unpack.ui', title='解析脚本')
|
||
self.win_unpack.configure(bg='#e8e8e8')
|
||
|
||
btn_name = event.widget._name
|
||
config = self.root.usercfg.get(btn_name, {})
|
||
|
||
# 设置控件值
|
||
self.root.entry('entry-ufile').set(config.get('title', btn_name))
|
||
self.root.get('text-usetting').delete('1.0', 'end')
|
||
self.root.get('text-usetting').insert('end', config.get('value', ''))
|
||
|
||
# 绑定保存按钮
|
||
self.root.button('btn-usave',
|
||
cmd=lambda: self._save_unpack_config(btn_name),
|
||
focus=True)
|
||
|
||
def _save_unpack_config(self, btn_name: str):
|
||
"""保存解析脚本配置"""
|
||
try:
|
||
data = {
|
||
'title': self.root.get('entry-ufile').var.get(),
|
||
'value': self.root.get('text-usetting').get('1.0', 'end-1c')
|
||
}
|
||
self.save_config(btn_name, data)
|
||
self.set_unpack(btn_name)
|
||
|
||
if self.win_unpack:
|
||
self.win_unpack.destroy()
|
||
self.win_unpack = None
|
||
|
||
except Exception as e:
|
||
logger.error(f"保存解析脚本配置时出错: {e}")
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
try:
|
||
# 创建主窗口
|
||
tkinter.ScrolledText = tkinter.scrolledtext.ScrolledText
|
||
root = tkgen.gengui.TkJson('app.ui', title='scomm串口调试助手')
|
||
|
||
# 初始化通信器
|
||
comm = SerialCommunicator(root)
|
||
window_manager = TopWindow(root)
|
||
|
||
# 加载用户配置
|
||
if os.path.isfile('usercfg.json'):
|
||
with open('usercfg.json', 'r', encoding='utf-8') as f:
|
||
root.usercfg = json.load(f)
|
||
else:
|
||
root.usercfg = {}
|
||
|
||
root.save_cfg = comm.ui.save_config
|
||
|
||
# 设置预置数据按钮
|
||
_setup_data_buttons(root, window_manager)
|
||
|
||
# 设置解析脚本按钮
|
||
_setup_unpack_buttons(root, window_manager)
|
||
|
||
# 设置UI控件
|
||
_setup_ui_controls(root, comm)
|
||
|
||
# 设置窗口属性
|
||
root.configure(bg='#e8e8e8')
|
||
root.lift() # 置顶窗口
|
||
|
||
# 设置关闭协议
|
||
root.protocol("WM_DELETE_WINDOW", lambda: comm.safe_exit())
|
||
|
||
# 启动串口检测
|
||
comm.detect_serial_ports()
|
||
|
||
# 启动主循环
|
||
logger.info("应用程序启动完成")
|
||
root.mainloop()
|
||
|
||
except Exception as e:
|
||
logger.error(f"应用程序启动失败: {e}")
|
||
sys.exit(1)
|
||
|
||
|
||
def _setup_data_buttons(root, window_manager):
|
||
"""设置预置数据按钮"""
|
||
for i in range(20):
|
||
btn_name = f'btn-data{i+1:02d}'
|
||
try:
|
||
btn = root.get(btn_name)
|
||
if btn:
|
||
root.button(btn_name, cmd=lambda x=btn_name: window_manager.set_send_data(x))
|
||
btn.bind('<Button-2>', window_manager.show_data_window)
|
||
btn.bind('<Button-3>', window_manager.show_data_window)
|
||
|
||
# 设置按钮文本
|
||
config = root.usercfg.get(btn_name)
|
||
if config:
|
||
btn.configure(text=config.get('title', btn_name))
|
||
|
||
except Exception as e:
|
||
logger.debug(f"设置按钮 {btn_name} 时出错: {e}")
|
||
|
||
|
||
def _setup_unpack_buttons(root, window_manager):
|
||
"""设置解析脚本按钮"""
|
||
var = tkinter.IntVar()
|
||
for i in range(20):
|
||
btn_name = f'btn-unpack{i+1:02d}'
|
||
try:
|
||
btn = root.get(btn_name)
|
||
if btn:
|
||
btn.var = var
|
||
root.button(btn_name, cmd=lambda x=btn_name: window_manager.set_unpack(x))
|
||
btn.bind('<Button-2>', window_manager.show_unpack_window)
|
||
btn.bind('<Button-3>', window_manager.show_unpack_window)
|
||
|
||
# 设置按钮文本
|
||
config = root.usercfg.get(btn_name)
|
||
if config:
|
||
btn.configure(text=config.get('title', btn_name))
|
||
|
||
root.checkbox(btn_name).set(0)
|
||
|
||
except Exception as e:
|
||
logger.debug(f"设置解析按钮 {btn_name} 时出错: {e}")
|
||
|
||
|
||
def _setup_ui_controls(root, comm):
|
||
"""设置UI控件"""
|
||
# 初始化LED
|
||
root.get('canvas-led').create_oval(4, 4, 19, 19, fill='gray')
|
||
|
||
# 设置复选框默认值
|
||
checkboxes = {
|
||
'ckbtn-rhex': 0, 'ckbtn-shex': 0, 'ckbtn-0d': 0, 'ckbtn-0a': 0,
|
||
'ckbtn-split': 1, 'ckbtn-cycle': 0, 'ckbtn-time': 1, 'ckbtn-sendshow': 1
|
||
}
|
||
|
||
for name, value in checkboxes.items():
|
||
root.checkbox(name).set(value)
|
||
|
||
# 设置输入框默认值
|
||
entries = {
|
||
'entry-split': '99ms',
|
||
'entry-cycle': '1024ms',
|
||
'entry-baud': '9600',
|
||
'entry-encoding': 'utf8',
|
||
'entry-uservar': ''
|
||
}
|
||
|
||
for name, default in entries.items():
|
||
root.entry(name).set(root.usercfg.get(name.split('-')[-1], default))
|
||
|
||
# 绑定按钮事件
|
||
root.button('btn-scan', cmd=lambda: comm.detect_serial_ports())
|
||
root.button('btn-onoff', cmd=lambda: comm.open_close_serial())
|
||
root.button('btn-send', cmd=lambda: comm.send_data())
|
||
root.button('btn-clear', cmd=lambda: comm.clear_window())
|
||
root.button('btn-savefile', cmd=lambda: comm.save_file())
|
||
|
||
# 绑定发送文本框回车事件
|
||
root.entry('entry-sendText', key='<Return>', cmd=lambda e: comm.send_data()).set('')
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|