935 lines
35 KiB
Python
935 lines
35 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
MT6 RFID 读卡器测试程序
|
||
基于 USB HID 私有协议
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import time
|
||
|
||
# 添加 PySimpleGUI 目录到路径
|
||
sys.path.insert(
|
||
0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "PySimpleGUI")
|
||
)
|
||
|
||
import PySimpleGUI as sg
|
||
|
||
try:
|
||
import hid
|
||
except ImportError:
|
||
sg.popup_error("请安装 hidapi 库:\npip install hidapi")
|
||
sys.exit(1)
|
||
|
||
|
||
# ==================== 协议常量 ====================
|
||
REPORT_ID_SET_REQ = 0x01
|
||
REPORT_ID_GET_RES = 0x03
|
||
REPORT_SIZE = 256
|
||
|
||
# 设备信息
|
||
VENDOR_ID = 0xFFFF # 默认VID
|
||
PRODUCT_ID = 0x0035 # 默认PID
|
||
|
||
# ==================== 协议函数 ====================
|
||
|
||
|
||
def calc_outer_checksum(data_len_bytes: bytes, payload: bytes) -> int:
|
||
"""计算外层帧校验(XOR)"""
|
||
result = 0
|
||
for b in data_len_bytes + payload:
|
||
result ^= b
|
||
return result
|
||
|
||
|
||
def calc_epc_crc(epc_len_indicator: int, epc_status: int, epc_data: bytes) -> int:
|
||
data = bytes([epc_len_indicator, epc_status]) + epc_data
|
||
crc = 0xFFFF
|
||
for byte in data:
|
||
crc ^= byte << 8
|
||
for _ in range(8):
|
||
if crc & 0x8000:
|
||
crc = (crc << 1) ^ 0x1021
|
||
else:
|
||
crc = crc << 1
|
||
crc &= 0xFFFF
|
||
return crc ^ 0xFFFF
|
||
|
||
|
||
def build_setreq_frame(data: bytes) -> bytes:
|
||
"""
|
||
构建 SetReq 帧
|
||
:param data: Data 段数据
|
||
:return: 完整的 256 字节帧
|
||
"""
|
||
# Frame Length: 从 0x07 到 Checksum(包含校验和,不含结束符)
|
||
# = 2 (constant) + 2 (data length) + len(data) + 1 (checksum)
|
||
frame_length = 5 + len(data)
|
||
frame_length_bytes = frame_length.to_bytes(2, "big")
|
||
|
||
# Data Length
|
||
data_length_bytes = len(data).to_bytes(2, "big")
|
||
|
||
# Checksum
|
||
checksum = calc_outer_checksum(data_length_bytes, data)
|
||
|
||
# 构建帧
|
||
frame = bytearray(REPORT_SIZE)
|
||
frame[0] = REPORT_ID_SET_REQ
|
||
frame[1:5] = b"\x00\x00\x00\x00" # Fixed
|
||
frame[5:7] = frame_length_bytes # Frame Length
|
||
frame[7:9] = b"\x00\x02" # Constant
|
||
frame[9:11] = data_length_bytes # Data Length
|
||
frame[11 : 11 + len(data)] = data # Data
|
||
frame[11 + len(data)] = checksum # Checksum
|
||
frame[12 + len(data)] = 0x03 # End Marker
|
||
|
||
return bytes(frame)
|
||
|
||
|
||
def parse_getres_frame(data: bytes) -> tuple:
|
||
"""
|
||
解析 GetRes 帧(带Checksum和End Marker验证)
|
||
:param data: 接收到的数据
|
||
:return: (status, response_data) 或 None,校验失败返回None并打印错误
|
||
"""
|
||
if len(data) < 14: # 最小帧长度需要包含Checksum和End Marker
|
||
print(f"[DEBUG] 数据长度不足: {len(data)} < 14")
|
||
return None
|
||
|
||
# 检查 Report ID
|
||
if data[0] != REPORT_ID_GET_RES:
|
||
print(f"[DEBUG] Report ID错误: 0x{data[0]:02x} (应为 0x03)")
|
||
return None
|
||
|
||
# 解析帧结构(大端序)
|
||
frame_length = int.from_bytes(data[5:7], "big")
|
||
data_length = int.from_bytes(data[9:11], "big")
|
||
|
||
# 检查数据长度是否合理
|
||
expected_total_len = 11 + data_length + 2 # header + data + checksum + end_marker
|
||
if len(data) < expected_total_len:
|
||
print(f"[DEBUG] 数据长度不匹配: 实际{len(data)}, 期望{expected_total_len}")
|
||
return None
|
||
|
||
# 提取 Data 段
|
||
response_data = data[11 : 11 + data_length]
|
||
|
||
# 提取 Checksum 和 End Marker
|
||
received_checksum = data[11 + data_length]
|
||
received_end_marker = data[12 + data_length]
|
||
|
||
# 验证 End Marker(固定为 0x03)
|
||
if received_end_marker != 0x03:
|
||
print(f"[DEBUG] End Marker错误: 0x{received_end_marker:02x} (应为 0x03)")
|
||
return None
|
||
|
||
# 计算并验证 Checksum(XOR算法:Data Length两字节 + Data全部字节)
|
||
calculated_checksum = calc_outer_checksum(data[9:11], response_data)
|
||
if calculated_checksum != received_checksum:
|
||
print(
|
||
f"[DEBUG] Checksum错误: 计算=0x{calculated_checksum:02x}, 接收=0x{received_checksum:02x}"
|
||
)
|
||
return None
|
||
|
||
# print(f"[DEBUG] 帧校验通过: Checksum=0x{received_checksum:02x}, EndMarker=0x{received_end_marker:02x}")
|
||
|
||
if len(response_data) < 1:
|
||
return None
|
||
|
||
status = response_data[0]
|
||
return (status, response_data[1:])
|
||
|
||
|
||
# ==================== 命令函数 ====================
|
||
|
||
|
||
def cmd_get_version() -> bytes:
|
||
"""读版本号命令"""
|
||
return bytes([0xC0])
|
||
|
||
|
||
def cmd_factory_reset() -> bytes:
|
||
"""恢复出厂设置命令"""
|
||
return bytes([0xCF])
|
||
|
||
|
||
def cmd_read_format() -> bytes:
|
||
"""读取格式命令"""
|
||
return bytes([0x83])
|
||
|
||
|
||
def cmd_set_format() -> bytes:
|
||
"""设置格式命令"""
|
||
return bytes([0x82, 0x0F, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, 0x00])
|
||
|
||
|
||
def cmd_buzzer(on: bool) -> bytes:
|
||
"""蜂鸣器控制命令"""
|
||
return bytes([0xCD, 0x01 if on else 0x00])
|
||
|
||
|
||
def cmd_rf_power(on: bool) -> bytes:
|
||
"""射频电源控制命令"""
|
||
return bytes([0x90, 0x01 if on else 0x00])
|
||
|
||
|
||
def cmd_set_power(power: int) -> bytes:
|
||
"""设置功率命令 (0-9)"""
|
||
power = max(0, min(9, power))
|
||
return bytes([0xCC, power])
|
||
|
||
|
||
def cmd_set_mode(mode: int) -> bytes:
|
||
"""设置工作模式命令
|
||
1: 单标签巡查
|
||
2: 被动模式
|
||
3: 多标签巡查
|
||
"""
|
||
return bytes([0x0F, mode])
|
||
|
||
|
||
def cmd_read_epc() -> bytes:
|
||
"""读取 EPC 命令"""
|
||
return bytes([0xCE, 0xBB, 0x00, 0x22, 0x00, 0x00, 0x22, 0x7E])
|
||
|
||
|
||
def cmd_select_card(epc_data: bytes) -> bytes:
|
||
"""选中卡命令"""
|
||
# 固定前缀 + EPC 数据
|
||
payload = bytes([0x01, 0x00, 0x00, 0x00, 0x20, 0x10, 0x00]) + epc_data
|
||
|
||
# 计算 checksum(包含 Card Op Command + Internal Length + Payload)
|
||
card_op_cmd = bytes([0x00, 0x0C])
|
||
internal_len = len(payload).to_bytes(2, "big")
|
||
|
||
checksum = 0
|
||
for b in card_op_cmd + internal_len + payload:
|
||
checksum = (checksum + b) & 0xFF
|
||
|
||
return (
|
||
bytes([0xCE, 0xBB])
|
||
+ card_op_cmd
|
||
+ internal_len
|
||
+ payload
|
||
+ bytes([checksum, 0x7E])
|
||
)
|
||
|
||
|
||
def cmd_write_epc(old_epc_crc: bytes, new_epc: bytes) -> bytes:
|
||
"""写入 EPC 命令"""
|
||
# EPC Len Indicator: EPC字节数 = 值 ÷ 4,所以值 = EPC字节数 * 4
|
||
epc_len_indicator = len(new_epc) * 4
|
||
if epc_len_indicator == 0:
|
||
epc_len_indicator = 8 # 默认最小值
|
||
|
||
word_count = 2 + len(new_epc) // 2
|
||
|
||
payload = bytes(
|
||
[0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00]
|
||
) # Reserved (8 bytes)
|
||
payload += bytes([word_count]) # Word Count
|
||
payload += old_epc_crc # Old EPC CRC (大端序)
|
||
payload += bytes([epc_len_indicator, 0x00]) # EPC Len Indicator, Status
|
||
payload += new_epc # New EPC Data
|
||
|
||
# 计算 checksum(包含 Card Op Command + Internal Length + Payload)
|
||
card_op_cmd = bytes([0x00, 0x49])
|
||
internal_len = len(payload).to_bytes(2, "big")
|
||
|
||
checksum = 0
|
||
for b in card_op_cmd + internal_len + payload:
|
||
checksum = (checksum + b) & 0xFF
|
||
|
||
return (
|
||
bytes([0xCE, 0xBB])
|
||
+ card_op_cmd
|
||
+ internal_len
|
||
+ payload
|
||
+ bytes([checksum, 0x7E])
|
||
)
|
||
|
||
|
||
# ==================== 设备管理类 ====================
|
||
|
||
|
||
class RFIDDevice:
|
||
def __init__(self):
|
||
self.device = None
|
||
self.vendor_id = VENDOR_ID
|
||
self.product_id = PRODUCT_ID
|
||
self.current_path = None # 当前连接的设备路径
|
||
self.current_interface = -1 # 当前连接的接口号
|
||
self.current_path = None # 当前连接的设备路径
|
||
self.current_interface = -1 # 当前连接的接口号
|
||
|
||
def list_devices(self):
|
||
"""列出所有 HID 设备"""
|
||
devices = []
|
||
try:
|
||
for d in hid.enumerate():
|
||
devices.append(
|
||
{
|
||
"vendor_id": d["vendor_id"],
|
||
"product_id": d["product_id"],
|
||
"manufacturer_string": d.get("manufacturer_string", ""),
|
||
"product_string": d.get("product_string", ""),
|
||
"serial_number": d.get("serial_number", ""),
|
||
"interface_number": d.get("interface_number", -1),
|
||
"path": d["path"],
|
||
}
|
||
)
|
||
except Exception as e:
|
||
print(f"枚举设备失败: {e}")
|
||
return devices
|
||
|
||
def connect(self, vendor_id=None, product_id=None):
|
||
"""通过VID/PID连接设备(会打开第一个匹配的设备)"""
|
||
if vendor_id:
|
||
self.vendor_id = vendor_id
|
||
if product_id:
|
||
self.product_id = product_id
|
||
|
||
try:
|
||
self.device = hid.device()
|
||
self.device.open(self.vendor_id, self.product_id)
|
||
self.current_path = None
|
||
self.current_interface = -1
|
||
print(
|
||
f"[DEBUG] 通过VID/PID连接: VID=0x{self.vendor_id:04x}, PID=0x{self.product_id:04x}"
|
||
)
|
||
return (
|
||
True,
|
||
f"已连接到设备 (VID: 0x{self.vendor_id:04x}, PID: 0x{self.product_id:04x}) - 注意:可能连接到任意接口",
|
||
)
|
||
except Exception as e:
|
||
print(f"[DEBUG] VID/PID连接失败: {e}")
|
||
return False, f"连接失败: {e}"
|
||
|
||
def connect_by_path(self, path, interface_number=-1):
|
||
"""通过路径连接设备(可以精确指定接口)"""
|
||
try:
|
||
self.device = hid.device()
|
||
# path可能是bytes类型,需要处理
|
||
if isinstance(path, bytes):
|
||
path_str = path
|
||
else:
|
||
path_str = path.encode("utf-8") if isinstance(path, str) else path
|
||
|
||
self.device.open_path(path_str)
|
||
self.current_path = path
|
||
self.current_interface = interface_number
|
||
|
||
# 获取设备信息
|
||
info = self.device.get_manufacturer_string() or "未知"
|
||
product = self.device.get_product_string() or "未知"
|
||
|
||
print(f"[DEBUG] 通过路径连接成功:")
|
||
print(f"[DEBUG] 路径: {path}")
|
||
print(f"[DEBUG] 接口号: {interface_number}")
|
||
print(f"[DEBUG] 制造商: {info}")
|
||
print(f"[DEBUG] 产品: {product}")
|
||
|
||
path_display = (
|
||
path.decode("utf-8", errors="ignore")
|
||
if isinstance(path, bytes)
|
||
else str(path)
|
||
)
|
||
return (
|
||
True,
|
||
f"已连接到设备 (IF={interface_number}, 路径: ...{path_display[-30:]})",
|
||
)
|
||
except Exception as e:
|
||
print(f"[DEBUG] 路径连接失败: {e}")
|
||
return False, f"连接失败: {e}"
|
||
|
||
def disconnect(self):
|
||
"""断开连接"""
|
||
if self.device:
|
||
try:
|
||
print(f"[DEBUG] 断开设备连接 (接口: {self.current_interface})")
|
||
self.device.close()
|
||
except:
|
||
pass
|
||
self.device = None
|
||
self.current_path = None
|
||
self.current_interface = -1
|
||
return "已断开连接"
|
||
|
||
def is_connected(self):
|
||
"""检查是否已连接"""
|
||
if self.device:
|
||
# print(f"[DEBUG] 设备已连接 - 接口: {self.current_interface}, 路径: {self.current_path}")
|
||
return True
|
||
return False
|
||
|
||
def get_connection_info(self):
|
||
"""获取当前连接信息"""
|
||
return {
|
||
"interface": self.current_interface,
|
||
"path": self.current_path,
|
||
"vendor_id": self.vendor_id,
|
||
"product_id": self.product_id,
|
||
}
|
||
|
||
def send_command(self, cmd_data: bytes):
|
||
"""发送命令并接收响应"""
|
||
if not self.device:
|
||
return None, "设备未连接"
|
||
|
||
try:
|
||
# 构建并发送 SetReq
|
||
frame = build_setreq_frame(cmd_data)
|
||
self.device.send_feature_report(frame)
|
||
|
||
# 接收 GetRes
|
||
time.sleep(0.1)
|
||
response = self.device.get_feature_report(REPORT_ID_GET_RES, REPORT_SIZE)
|
||
|
||
# 解析响应
|
||
result = parse_getres_frame(bytes(response))
|
||
if result is None:
|
||
return None, "响应格式错误"
|
||
|
||
status, data = result
|
||
return data, None
|
||
except Exception as e:
|
||
return None, f"通信错误: {e}"
|
||
|
||
|
||
# ==================== GUI 界面 ====================
|
||
|
||
|
||
def create_window():
|
||
"""创建主窗口"""
|
||
sg.theme("LightBlue2")
|
||
|
||
# 设备连接区域
|
||
device_frame = [
|
||
[
|
||
sg.Text("VID (十六进制):"),
|
||
sg.Input("FFFF", size=(8, 1), key="-VID-"),
|
||
sg.Text("PID (十六进制):"),
|
||
sg.Input("0035", size=(8, 1), key="-PID-"),
|
||
sg.Button("连接", key="-CONNECT-"),
|
||
sg.Button("断开", key="-DISCONNECT-"),
|
||
sg.Button("枚举设备", key="-ENUM-"),
|
||
],
|
||
[
|
||
sg.Text("状态: ", size=(12, 1)),
|
||
sg.Text("未连接", key="-STATUS-", text_color="red"),
|
||
],
|
||
]
|
||
|
||
# 基本命令区域
|
||
basic_frame = [
|
||
[
|
||
sg.Button("读取版本号", key="-VERSION-", size=(12, 1)),
|
||
sg.Button("恢复出厂", key="-FACTORY_RESET-", size=(12, 1)),
|
||
],
|
||
[
|
||
sg.Button("打开蜂鸣器", key="-BUZZER_ON-", size=(12, 1)),
|
||
sg.Button("关闭蜂鸣器", key="-BUZZER_OFF-", size=(12, 1)),
|
||
],
|
||
[
|
||
sg.Text("功率 (0-9):"),
|
||
sg.Slider(
|
||
range=(0, 9),
|
||
default_value=3,
|
||
orientation="h",
|
||
size=(20, 15),
|
||
key="-POWER-",
|
||
),
|
||
sg.Button("设置功率", key="-SET_POWER-"),
|
||
],
|
||
[
|
||
sg.Text("工作模式:"),
|
||
sg.Radio("单标签巡查", "mode", key="-MODE1-", default=True),
|
||
sg.Radio("被动模式", "mode", key="-MODE2-"),
|
||
sg.Radio("多标签巡查", "mode", key="-MODE3-"),
|
||
sg.Button("设置模式", key="-SET_MODE-"),
|
||
],
|
||
]
|
||
|
||
# EPC 操作区域
|
||
epc_frame = [
|
||
[
|
||
sg.Button("读取 EPC", key="-READ_EPC-", size=(15, 1)),
|
||
sg.Text("", key="-READ_EPC_STATUS-", size=(40, 1), text_color="black"),
|
||
], # 选中状态提示
|
||
[
|
||
sg.Text("原 EPC 数据(十六进制):"),
|
||
sg.Input("", size=(40, 1), key="-EPC_DATA-"),
|
||
sg.Button("选中卡", key="-SELECT_CARD-", size=(15, 1)),
|
||
],
|
||
[
|
||
sg.Text("新 EPC 数据(十六进制):"),
|
||
sg.Input("", size=(40, 1), key="-NEW_EPC-"),
|
||
],
|
||
[
|
||
sg.Button("写入 EPC", key="-WRITE_EPC-", size=(15, 1)),
|
||
sg.Text("", key="-WRITE_EPC_STATUS-", size=(40, 1), text_color="black"),
|
||
], # 写入状态提示
|
||
]
|
||
|
||
# 日志输出区域
|
||
log_frame = [
|
||
[sg.Multiline("", size=(80, 15), key="-LOG-", autoscroll=True, disabled=True)]
|
||
]
|
||
|
||
layout = [
|
||
[sg.Frame("设备连接", device_frame)],
|
||
[sg.Frame("基本命令", basic_frame)],
|
||
[sg.Frame("EPC 操作", epc_frame)],
|
||
[sg.Frame("日志", log_frame)],
|
||
[sg.Button("清空日志", key="-CLEAR_LOG-"), sg.Button("退出", key="-EXIT-")],
|
||
]
|
||
|
||
return sg.Window("MT6 RFID 读卡器测试程序", layout, finalize=True)
|
||
|
||
|
||
def log_message(window, message, is_hex=False):
|
||
"""输出日志消息"""
|
||
timestamp = time.strftime("%H:%M:%S")
|
||
if is_hex:
|
||
if isinstance(message, bytes):
|
||
hex_str = " ".join(f"{b:02x}" for b in message)
|
||
window["-LOG-"].print(f"[{timestamp}] {hex_str}")
|
||
else:
|
||
window["-LOG-"].print(f"[{timestamp}] {message}")
|
||
else:
|
||
window["-LOG-"].print(f"[{timestamp}] {message}")
|
||
|
||
|
||
def hex_to_bytes(hex_str: str) -> bytes:
|
||
"""十六进制字符串转字节"""
|
||
hex_str = hex_str.replace(" ", "").replace(",", "")
|
||
return bytes.fromhex(hex_str)
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
window = create_window()
|
||
rfid = RFIDDevice()
|
||
device_list = []
|
||
|
||
while True:
|
||
event, values = window.read()
|
||
|
||
if event in (sg.WIN_CLOSED, "-EXIT-"):
|
||
rfid.disconnect()
|
||
break
|
||
|
||
# 枚举设备
|
||
if event == "-ENUM-":
|
||
device_list = rfid.list_devices()
|
||
if device_list:
|
||
log_message(window, f"找到 {len(device_list)} 个 HID 设备")
|
||
device_strs = []
|
||
for i, d in enumerate(device_list):
|
||
# 显示路径信息,帮助区分相同VID/PID的设备
|
||
path = d["path"]
|
||
path_str = (
|
||
path.decode("utf-8", errors="ignore")
|
||
if isinstance(path, bytes)
|
||
else str(path)
|
||
)
|
||
s = (
|
||
f"[{i}] VID:0x{d['vendor_id']:04x} PID:0x{d['product_id']:04x} "
|
||
f"IF:{d['interface_number']} {d['manufacturer_string']} {d['product_string']} "
|
||
f"Path:...{path_str[-20:]}"
|
||
)
|
||
device_strs.append(s)
|
||
log_message(window, s)
|
||
# 详细日志显示完整路径
|
||
log_message(window, f" -> 完整路径: {path_str}")
|
||
|
||
# 创建设备选择窗口
|
||
select_layout = [
|
||
[
|
||
sg.Listbox(
|
||
values=device_strs,
|
||
size=(80, min(10, len(device_strs))),
|
||
key="-SELECTED-",
|
||
)
|
||
],
|
||
[sg.Button("使用选中设备"), sg.Button("取消")],
|
||
]
|
||
select_window = sg.Window("选择设备", select_layout, modal=True)
|
||
while True:
|
||
sel_event, sel_values = select_window.read()
|
||
if sel_event in (sg.WIN_CLOSED, "取消"):
|
||
select_window.close()
|
||
break
|
||
if sel_event == "使用选中设备":
|
||
selected_idx = sel_values.get("-SELECTED-", [])
|
||
if selected_idx:
|
||
idx = device_strs.index(selected_idx[0])
|
||
d = device_list[idx]
|
||
window["-VID-"].update(f"{d['vendor_id']:04x}")
|
||
window["-PID-"].update(f"{d['product_id']:04x}")
|
||
|
||
# 使用路径连接设备
|
||
path = d["path"]
|
||
interface_number = d["interface_number"]
|
||
log_message(window, f"正在连接设备...")
|
||
log_message(
|
||
window,
|
||
f" VID: 0x{d['vendor_id']:04x}, PID: 0x{d['product_id']:04x}",
|
||
)
|
||
log_message(window, f" 接口号: {interface_number}")
|
||
path_str = (
|
||
path.decode("utf-8", errors="ignore")
|
||
if isinstance(path, bytes)
|
||
else str(path)
|
||
)
|
||
log_message(window, f" 路径: {path_str}")
|
||
|
||
success, msg = rfid.connect_by_path(path, interface_number)
|
||
log_message(window, msg)
|
||
if success:
|
||
window["-STATUS-"].update("已连接", text_color="green")
|
||
log_message(
|
||
window, f"当前使用接口: IF{rfid.current_interface}"
|
||
)
|
||
else:
|
||
window["-STATUS-"].update("连接失败", text_color="red")
|
||
select_window.close()
|
||
break
|
||
else:
|
||
log_message(window, "未找到 HID 设备")
|
||
|
||
# 连接设备
|
||
if event == "-CONNECT-":
|
||
try:
|
||
vid = int(values["-VID-"], 16)
|
||
pid = int(values["-PID-"], 16)
|
||
success, msg = rfid.connect(vid, pid)
|
||
log_message(window, msg)
|
||
if success:
|
||
window["-STATUS-"].update("已连接", text_color="green")
|
||
else:
|
||
window["-STATUS-"].update("连接失败", text_color="red")
|
||
except ValueError:
|
||
log_message(window, "请输入有效的十六进制 VID/PID")
|
||
|
||
# 断开连接
|
||
if event == "-DISCONNECT-":
|
||
msg = rfid.disconnect()
|
||
log_message(window, msg)
|
||
window["-STATUS-"].update("未连接", text_color="red")
|
||
|
||
# 读取版本号
|
||
if event == "-VERSION-":
|
||
if not rfid.is_connected():
|
||
log_message(window, "错误: 设备未连接")
|
||
else:
|
||
log_message(window, "发送命令: 读版本号")
|
||
data, err = rfid.send_command(cmd_get_version())
|
||
if err:
|
||
log_message(window, f"错误: {err}")
|
||
else:
|
||
log_message(window, data, is_hex=True)
|
||
try:
|
||
version = data.decode("ascii", errors="ignore")
|
||
log_message(window, f"版本: {version}")
|
||
except:
|
||
pass
|
||
|
||
# 恢复出厂设置
|
||
if event == "-FACTORY_RESET-":
|
||
if not rfid.is_connected():
|
||
log_message(window, "错误: 设备未连接")
|
||
else:
|
||
log_message(window, "发送命令: 恢复出厂设置")
|
||
data, err = rfid.send_command(cmd_factory_reset())
|
||
if err:
|
||
log_message(window, f"错误: {err}")
|
||
else:
|
||
log_message(window, data, is_hex=True)
|
||
|
||
# 读取格式
|
||
if event == "-READ_FORMAT-":
|
||
if not rfid.is_connected():
|
||
log_message(window, "错误: 设备未连接")
|
||
else:
|
||
log_message(window, "发送命令: 读取格式")
|
||
data, err = rfid.send_command(cmd_read_format())
|
||
if err:
|
||
log_message(window, f"错误: {err}")
|
||
else:
|
||
log_message(window, data, is_hex=True)
|
||
|
||
# 设置格式
|
||
if event == "-SET_FORMAT-":
|
||
if not rfid.is_connected():
|
||
log_message(window, "错误: 设备未连接")
|
||
else:
|
||
log_message(window, "发送命令: 设置格式")
|
||
data, err = rfid.send_command(cmd_set_format())
|
||
if err:
|
||
log_message(window, f"错误: {err}")
|
||
else:
|
||
log_message(window, data, is_hex=True)
|
||
|
||
# 射频电源控制
|
||
if event == "-RF_ON-":
|
||
if not rfid.is_connected():
|
||
log_message(window, "错误: 设备未连接")
|
||
else:
|
||
log_message(window, "发送命令: 打开射频")
|
||
data, err = rfid.send_command(cmd_rf_power(True))
|
||
if err:
|
||
log_message(window, f"错误: {err}")
|
||
else:
|
||
log_message(window, data, is_hex=True)
|
||
|
||
if event == "-RF_OFF-":
|
||
if not rfid.is_connected():
|
||
log_message(window, "错误: 设备未连接")
|
||
else:
|
||
log_message(window, "发送命令: 关闭射频")
|
||
data, err = rfid.send_command(cmd_rf_power(False))
|
||
if err:
|
||
log_message(window, f"错误: {err}")
|
||
else:
|
||
log_message(window, data, is_hex=True)
|
||
|
||
# 蜂鸣器控制
|
||
if event == "-BUZZER_ON-":
|
||
if not rfid.is_connected():
|
||
log_message(window, "错误: 设备未连接")
|
||
else:
|
||
log_message(window, "发送命令: 打开蜂鸣器")
|
||
data, err = rfid.send_command(cmd_buzzer(True))
|
||
if err:
|
||
log_message(window, f"错误: {err}")
|
||
else:
|
||
log_message(window, f"响应: {data.hex() if data else '无数据'}")
|
||
|
||
if event == "-BUZZER_OFF-":
|
||
if not rfid.is_connected():
|
||
log_message(window, "错误: 设备未连接")
|
||
else:
|
||
log_message(window, "发送命令: 关闭蜂鸣器")
|
||
data, err = rfid.send_command(cmd_buzzer(False))
|
||
if err:
|
||
log_message(window, f"错误: {err}")
|
||
else:
|
||
log_message(window, f"响应: {data.hex() if data else '无数据'}")
|
||
|
||
# 设置功率
|
||
if event == "-SET_POWER-":
|
||
if not rfid.is_connected():
|
||
log_message(window, "错误: 设备未连接")
|
||
else:
|
||
power = int(values["-POWER-"])
|
||
log_message(window, f"发送命令: 设置功率 = {power}")
|
||
data, err = rfid.send_command(cmd_set_power(power))
|
||
if err:
|
||
log_message(window, f"错误: {err}")
|
||
else:
|
||
log_message(window, f"响应: {data.hex() if data else '无数据'}")
|
||
|
||
# 设置工作模式
|
||
if event == "-SET_MODE-":
|
||
if not rfid.is_connected():
|
||
log_message(window, "错误: 设备未连接")
|
||
else:
|
||
mode = 1
|
||
if values["-MODE2-"]:
|
||
mode = 2
|
||
elif values["-MODE3-"]:
|
||
mode = 3
|
||
mode_names = {1: "单标签巡查", 2: "被动模式", 3: "多标签巡查"}
|
||
log_message(
|
||
window, f"发送命令: 设置模式 = {mode_names.get(mode, mode)}"
|
||
)
|
||
data, err = rfid.send_command(cmd_set_mode(mode))
|
||
if err:
|
||
log_message(window, f"错误: {err}")
|
||
else:
|
||
log_message(window, f"响应: {data.hex() if data else '无数据'}")
|
||
|
||
# 读取 EPC
|
||
if event == "-READ_EPC-":
|
||
# 清空状态提示
|
||
window["-READ_EPC_STATUS-"].update("", text_color="black")
|
||
if not rfid.is_connected():
|
||
log_message(window, "错误: 设备未连接")
|
||
window["-READ_EPC_STATUS-"].update("✗ 设备未连接", text_color="red")
|
||
else:
|
||
log_message(window, "发送命令: 读取 EPC")
|
||
data, err = rfid.send_command(cmd_read_epc())
|
||
if err:
|
||
log_message(window, f"错误: {err}")
|
||
window["-READ_EPC_STATUS-"].update(f"✗ {err}", text_color="red")
|
||
else:
|
||
log_message(window, f"响应 (hex): ", is_hex=True)
|
||
log_message(window, data, is_hex=True)
|
||
# 解析 EPC
|
||
# 注意: parse_getres_frame返回的data是从bb开始的(status已被去除)
|
||
# 原始格式: 00 bb 02 22 00 0b d3 18 00 [EPC] ...
|
||
# 去掉status后: bb 02 22 00 0b d3 18 00 [EPC] ...
|
||
# 所以: data[0]=bb, data[1:3]=Card Op Resp, data[5]=RSSI, data[6]=EPC Len, data[8:]=EPC
|
||
if data and len(data) >= 8:
|
||
# 检查 Magic 和 Card Op Resp
|
||
if (
|
||
data[0:1] == b"\xbb"
|
||
and data[1:3] == b"\x02\x22"
|
||
and len(data) >= 12
|
||
):
|
||
# 有卡,解析 EPC
|
||
rssi = data[5] if len(data) > 5 else 0
|
||
epc_len_indicator = data[6] if len(data) > 6 else 0
|
||
epc_len = (
|
||
epc_len_indicator // 4 if epc_len_indicator > 0 else 0
|
||
)
|
||
epc_data = data[8 : 8 + epc_len] if len(data) > 8 else b""
|
||
log_message(window, f"Magic: 0x{data[0]:02x} (应为 bb)")
|
||
log_message(
|
||
window, f"Card Op Resp: {data[1:3].hex()} (02 22=有卡)"
|
||
)
|
||
log_message(window, f"RSSI: 0x{rssi:02x}")
|
||
log_message(
|
||
window,
|
||
f"EPC Len Indicator: 0x{epc_len_indicator:02x} (字节数={epc_len})",
|
||
)
|
||
log_message(window, f"EPC: {epc_data.hex()}")
|
||
window["-EPC_DATA-"].update(epc_data.hex())
|
||
# 成功状态提示
|
||
window["-READ_EPC_STATUS-"].update(
|
||
f"✓ 读取成功: {epc_data.hex()}", text_color="green"
|
||
)
|
||
elif data[0:1] == b"\xbb" and data[1:3] == b"\x01\xff":
|
||
log_message(window, "无卡或读取失败 (Card Op Resp: 01 ff)")
|
||
# 无卡状态提示
|
||
window["-READ_EPC_STATUS-"].update(
|
||
"✗ 读取失败: 无卡", text_color="red"
|
||
)
|
||
else:
|
||
log_message(window, f"未知响应格式:")
|
||
log_message(window, f" Magic: {data[0:1].hex()} (应为 bb)")
|
||
log_message(window, f" Card Op Resp: {data[1:3].hex()}")
|
||
# 未知响应状态提示
|
||
window["-READ_EPC_STATUS-"].update(
|
||
"✗ 未知响应格式", text_color="red"
|
||
)
|
||
|
||
# 选中卡
|
||
if event == "-SELECT_CARD-":
|
||
# 清空状态提示
|
||
if not rfid.is_connected():
|
||
log_message(window, "错误: 设备未连接")
|
||
else:
|
||
epc_hex = values["-EPC_DATA-"].strip()
|
||
if not epc_hex:
|
||
log_message(window, "错误: 请输入 EPC 数据")
|
||
else:
|
||
try:
|
||
epc_data = hex_to_bytes(epc_hex)
|
||
log_message(window, f"发送命令: 选中卡 EPC={epc_data.hex()}")
|
||
data, err = rfid.send_command(cmd_select_card(epc_data))
|
||
if err:
|
||
log_message(window, f"错误: {err}")
|
||
else:
|
||
log_message(
|
||
window,
|
||
f"响应 (hex): {data.hex() if data else '无数据'}",
|
||
)
|
||
# 注意: parse_getres_frame返回的data是从bb开始的(status已被去除)
|
||
# 原始格式: 00 bb 01 0c 00 01 00 0e 7e
|
||
# 去掉status后: bb 01 0c 00 01 00 0e 7e
|
||
# data[0]=bb, data[1:3]=01 0c(Card Op Resp), data[3:5]=00 01(Internal Length), data[5]=00(Payload-选中成功标志)
|
||
if data and len(data) > 5:
|
||
if data[5] == 0x00:
|
||
log_message(window, "选中成功")
|
||
window["-READ_EPC_STATUS-"].update(
|
||
f"✓ 选中当前卡", text_color="green"
|
||
)
|
||
else:
|
||
log_message(window, f"选中失败: 0x{data[5]:02x}")
|
||
except ValueError:
|
||
log_message(window, "错误: EPC 数据格式错误,请使用十六进制")
|
||
|
||
# 写入 EPC
|
||
if event == "-WRITE_EPC-":
|
||
# 清空状态提示
|
||
window["-WRITE_EPC_STATUS-"].update("", text_color="black")
|
||
if not rfid.is_connected():
|
||
log_message(window, "错误: 设备未连接")
|
||
window["-WRITE_EPC_STATUS-"].update("✗ 设备未连接", text_color="red")
|
||
else:
|
||
new_epc_hex = values["-NEW_EPC-"].strip()
|
||
old_epc_hex = values["-EPC_DATA-"].strip()
|
||
if not new_epc_hex:
|
||
log_message(window, "错误: 请输入新的 EPC 数据")
|
||
window["-WRITE_EPC_STATUS-"].update(
|
||
"✗ 请输入新EPC", text_color="red"
|
||
)
|
||
elif not old_epc_hex:
|
||
log_message(window, "错误: 请先读取 EPC 数据")
|
||
window["-WRITE_EPC_STATUS-"].update("✗ 请先读卡", text_color="red")
|
||
else:
|
||
try:
|
||
new_epc = hex_to_bytes(new_epc_hex)
|
||
old_epc = hex_to_bytes(old_epc_hex)
|
||
# 需要根据实际 EPC 计算
|
||
old_epc_crc = calc_epc_crc(
|
||
len(old_epc) * 4, 0, old_epc
|
||
).to_bytes(2, "big")
|
||
log_message(window, f"发送命令: 写入 EPC={new_epc.hex()}")
|
||
data, err = rfid.send_command(
|
||
cmd_write_epc(old_epc_crc, new_epc)
|
||
)
|
||
if err:
|
||
log_message(window, f"错误: {err}")
|
||
window["-WRITE_EPC_STATUS-"].update(
|
||
f"✗ {err}", text_color="red"
|
||
)
|
||
else:
|
||
log_message(
|
||
window,
|
||
f"响应 (hex): {data.hex() if data else '无数据'}",
|
||
)
|
||
# 验证写入是否成功
|
||
if data and len(data) >= 3:
|
||
# 检查 Magic 和 Card Op Resp
|
||
if data[0] == 0xBB and data[1:3] == b"\x01\x49":
|
||
# 成功状态提示
|
||
window["-WRITE_EPC_STATUS-"].update(
|
||
f"✓ 写入成功: {new_epc.hex()}",
|
||
text_color="green",
|
||
)
|
||
else:
|
||
# 写入失败
|
||
log_message(
|
||
window,
|
||
f"写入失败: Card Op Resp = {data[1:3].hex()}",
|
||
)
|
||
window["-WRITE_EPC_STATUS-"].update(
|
||
"✗ 写入失败", text_color="red"
|
||
)
|
||
else:
|
||
# 无效响应
|
||
log_message(window, f"无效响应: 数据长度不足")
|
||
window["-WRITE_EPC_STATUS-"].update(
|
||
"✗ 无效响应", text_color="red"
|
||
)
|
||
except ValueError:
|
||
log_message(window, "错误: EPC 数据格式错误,请使用十六进制")
|
||
window["-WRITE_EPC_STATUS-"].update(
|
||
"✗ EPC格式错误", text_color="red"
|
||
)
|
||
|
||
# 清空日志
|
||
if event == "-CLEAR_LOG-":
|
||
window["-LOG-"].update("")
|
||
|
||
window.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|