Files
app_rf915_dump/rfid_tester.py
2026-04-15 07:53:51 +08:00

551 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MT6 RFID 读卡器测试程序
基于 USB HID 私有协议
"""
import sys
import os
import time
# 添加 PySimpleGUI 目录到路径
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), 'PySimpleGUI'))
import PySimpleGUI as sg
try:
import hid
except ImportError:
sg.popup_error('请安装 hidapi 库:\npip install hidapi')
sys.exit(1)
# ==================== 协议常量 ====================
REPORT_ID_SET_REQ = 0x01
REPORT_ID_GET_RES = 0x03
REPORT_SIZE = 256
# 设备信息
VENDOR_ID = 0xFFFF # 默认VID
PRODUCT_ID = 0x0035 # 默认PID
# ==================== 协议函数 ====================
def calc_outer_checksum(data_len_bytes: bytes, payload: bytes) -> int:
"""计算外层帧校验XOR"""
result = 0
for b in data_len_bytes + payload:
result ^= b
return result
def build_setreq_frame(data: bytes) -> bytes:
"""
构建 SetReq 帧
:param data: Data 段数据
:return: 完整的 256 字节帧
"""
# Frame Length: 从 0x07 到 Checksum包含校验和不含结束符
# = 2 (constant) + 2 (data length) + len(data) + 1 (checksum)
frame_length = 5 + len(data)
frame_length_bytes = frame_length.to_bytes(2, 'little')
# Data Length
data_length_bytes = len(data).to_bytes(2, 'little')
# Checksum
checksum = calc_outer_checksum(data_length_bytes, data)
# 构建帧
frame = bytearray(REPORT_SIZE)
frame[0] = REPORT_ID_SET_REQ
frame[1:5] = b'\x00\x00\x00\x00' # Fixed
frame[5:7] = frame_length_bytes # Frame Length
frame[7:9] = b'\x00\x02' # Constant
frame[9:11] = data_length_bytes # Data Length
frame[11:11+len(data)] = data # Data
frame[11+len(data)] = checksum # Checksum
frame[12+len(data)] = 0x03 # End Marker
return bytes(frame)
def parse_getres_frame(data: bytes) -> tuple:
"""
解析 GetRes 帧
:param data: 接收到的数据
:return: (status, response_data) 或 None
"""
if len(data) < 12:
return None
# 检查 Report ID
if data[0] != REPORT_ID_GET_RES:
return None
# 解析帧结构
frame_length = int.from_bytes(data[5:7], 'little')
data_length = int.from_bytes(data[9:11], 'little')
# 提取 Data 段
response_data = data[11:11+data_length]
if len(response_data) < 1:
return None
status = response_data[0]
return (status, response_data[1:])
# ==================== 命令函数 ====================
def cmd_get_version() -> bytes:
"""读版本号命令"""
return bytes([0xc0])
def cmd_buzzer(on: bool) -> bytes:
"""蜂鸣器控制命令"""
return bytes([0xcd, 0x01 if on else 0x00])
def cmd_set_power(power: int) -> bytes:
"""设置功率命令 (0-9)"""
power = max(0, min(9, power))
return bytes([0xcc, power])
def cmd_set_mode(mode: int) -> bytes:
"""设置工作模式命令
1: 单标签巡查
2: 被动模式
3: 多标签巡查
"""
return bytes([0x0f, mode])
def cmd_read_epc() -> bytes:
"""读取 EPC 命令"""
return bytes([0xce, 0xbb, 0x00, 0x22, 0x00, 0x00, 0x22, 0x7e])
def cmd_select_card(epc_data: bytes) -> bytes:
"""选中卡命令"""
# 固定前缀 + EPC 数据
payload = bytes([0x01, 0x00, 0x00, 0x00, 0x20, 0x10, 0x00]) + epc_data
# 计算 checksum
card_op_cmd = bytes([0x00, 0x0c])
checksum = 0
for b in card_op_cmd + payload:
checksum = (checksum + b) & 0xFF
internal_len = len(payload).to_bytes(2, 'little')
return bytes([0xce, 0xbb]) + card_op_cmd + internal_len + payload + bytes([checksum, 0x7e])
def cmd_write_epc(old_epc_crc: bytes, new_epc: bytes) -> bytes:
"""写入 EPC 命令"""
epc_len_indicator = (len(new_epc) * 4) // 4
if epc_len_indicator == 0:
epc_len_indicator = 8
word_count = 2 + len(new_epc) // 2
payload = bytes([0x00, 0x00, 0x00, 0x00, 0x01, 0x00]) # Reserved
payload += bytes([0x00, 0x00]) # Reserved
payload += bytes([word_count]) # Word Count
payload += old_epc_crc # Old EPC CRC
payload += bytes([epc_len_indicator, 0x00]) # EPC Len Indicator, Status
payload += new_epc # New EPC Data
# 计算 checksum
card_op_cmd = bytes([0x00, 0x49])
checksum = 0
for b in card_op_cmd + len(payload).to_bytes(2, 'little') + payload:
checksum = (checksum + b) & 0xFF
internal_len = len(payload).to_bytes(2, 'little')
return bytes([0xce, 0xbb]) + card_op_cmd + internal_len + payload + bytes([checksum, 0x7e])
# ==================== 设备管理类 ====================
class RFIDDevice:
def __init__(self):
self.device = None
self.vendor_id = VENDOR_ID
self.product_id = PRODUCT_ID
def list_devices(self):
"""列出所有 HID 设备"""
devices = []
try:
for d in hid.enumerate():
devices.append({
'vendor_id': d['vendor_id'],
'product_id': d['product_id'],
'manufacturer_string': d.get('manufacturer_string', ''),
'product_string': d.get('product_string', ''),
'serial_number': d.get('serial_number', ''),
'interface_number': d.get('interface_number', -1),
'path': d['path']
})
except Exception as e:
print(f"枚举设备失败: {e}")
return devices
def connect(self, vendor_id=None, product_id=None):
"""连接设备"""
if vendor_id:
self.vendor_id = vendor_id
if product_id:
self.product_id = product_id
try:
self.device = hid.device()
self.device.open(self.vendor_id, self.product_id)
return True, f"已连接到设备 (VID: 0x{self.vendor_id:04x}, PID: 0x{self.product_id:04x})"
except Exception as e:
return False, f"连接失败: {e}"
def disconnect(self):
"""断开连接"""
if self.device:
try:
self.device.close()
except:
pass
self.device = None
return "已断开连接"
def is_connected(self):
"""检查是否已连接"""
return self.device is not None
def send_command(self, cmd_data: bytes):
"""发送命令并接收响应"""
if not self.device:
return None, "设备未连接"
try:
# 构建并发送 SetReq
frame = build_setreq_frame(cmd_data)
self.device.send_feature_report(frame)
# 接收 GetRes
time.sleep(0.1)
response = self.device.get_feature_report(REPORT_ID_GET_RES, REPORT_SIZE)
# 解析响应
result = parse_getres_frame(bytes(response))
if result is None:
return None, "响应格式错误"
status, data = result
return data, None
except Exception as e:
return None, f"通信错误: {e}"
# ==================== GUI 界面 ====================
def create_window():
"""创建主窗口"""
sg.theme('LightBlue2')
# 设备连接区域
device_frame = [
[sg.Text('VID (十六进制):'), sg.Input('FFFF', size=(8, 1), key='-VID-'),
sg.Text('PID (十六进制):'), sg.Input('0035', size=(8, 1), key='-PID-'),
sg.Button('枚举设备', key='-ENUM-'),
sg.Button('连接', key='-CONNECT-'),
sg.Button('断开', key='-DISCONNECT-')],
[sg.Text('状态: ', size=(12, 1)), sg.Text('未连接', key='-STATUS-', text_color='red')]
]
# 基本命令区域
basic_frame = [
[sg.Button('读取版本号', key='-VERSION-', size=(15, 1)),
sg.Button('打开蜂鸣器', key='-BUZZER_ON-', size=(15, 1)),
sg.Button('关闭蜂鸣器', key='-BUZZER_OFF-', size=(15, 1))],
[sg.Text('功率 (0-9):'), sg.Slider(range=(0, 9), default_value=8, orientation='h', size=(20, 15), key='-POWER-'),
sg.Button('设置功率', key='-SET_POWER-')],
[sg.Text('工作模式:'),
sg.Radio('单标签巡查', 'mode', key='-MODE1-', default=True),
sg.Radio('被动模式', 'mode', key='-MODE2-'),
sg.Radio('多标签巡查', 'mode', key='-MODE3-'),
sg.Button('设置模式', key='-SET_MODE-')]
]
# EPC 操作区域
epc_frame = [
[sg.Button('读取 EPC', key='-READ_EPC-', size=(15, 1)),
sg.Button('选中卡', key='-SELECT_CARD-', size=(15, 1))],
[sg.Text('EPC 数据 (十六进制):'), sg.Input('', size=(40, 1), key='-EPC_DATA-')],
[sg.Text('新 EPC (十六进制):'), sg.Input('', size=(40, 1), key='-NEW_EPC-')],
[sg.Button('写入 EPC', key='-WRITE_EPC-', size=(15, 1))]
]
# 日志输出区域
log_frame = [
[sg.Multiline('', size=(80, 15), key='-LOG-', autoscroll=True, disabled=True)]
]
layout = [
[sg.Frame('设备连接', device_frame)],
[sg.Frame('基本命令', basic_frame)],
[sg.Frame('EPC 操作', epc_frame)],
[sg.Frame('日志', log_frame)],
[sg.Button('清空日志', key='-CLEAR_LOG-'), sg.Button('退出', key='-EXIT-')]
]
return sg.Window('MT6 RFID 读卡器测试程序', layout, finalize=True)
def log_message(window, message, is_hex=False):
"""输出日志消息"""
timestamp = time.strftime('%H:%M:%S')
if is_hex:
if isinstance(message, bytes):
hex_str = ' '.join(f'{b:02x}' for b in message)
window['-LOG-'].print(f'[{timestamp}] {hex_str}')
else:
window['-LOG-'].print(f'[{timestamp}] {message}')
else:
window['-LOG-'].print(f'[{timestamp}] {message}')
def hex_to_bytes(hex_str: str) -> bytes:
"""十六进制字符串转字节"""
hex_str = hex_str.replace(' ', '').replace(',', '')
return bytes.fromhex(hex_str)
def main():
"""主函数"""
window = create_window()
rfid = RFIDDevice()
device_list = []
while True:
event, values = window.read()
if event in (sg.WIN_CLOSED, '-EXIT-'):
rfid.disconnect()
break
# 枚举设备
if event == '-ENUM-':
device_list = rfid.list_devices()
if device_list:
log_message(window, f"找到 {len(device_list)} 个 HID 设备")
device_strs = []
for i, d in enumerate(device_list):
s = f"[{i}] VID:0x{d['vendor_id']:04x} PID:0x{d['product_id']:04x} " \
f"IF:{d['interface_number']} {d['manufacturer_string']} {d['product_string']}"
device_strs.append(s)
log_message(window, s)
# 创建设备选择窗口
select_layout = [
[sg.Listbox(values=device_strs, size=(80, min(10, len(device_strs))), key='-SELECTED-')],
[sg.Button('使用选中设备'), sg.Button('取消')]
]
select_window = sg.Window('选择设备', select_layout, modal=True)
while True:
sel_event, sel_values = select_window.read()
if sel_event in (sg.WIN_CLOSED, '取消'):
select_window.close()
break
if sel_event == '使用选中设备':
selected_idx = sel_values.get('-SELECTED-', [])
if selected_idx:
idx = device_strs.index(selected_idx[0])
d = device_list[idx]
window['-VID-'].update(f"{d['vendor_id']:04x}")
window['-PID-'].update(f"{d['product_id']:04x}")
log_message(window, f"已选择设备: VID=0x{d['vendor_id']:04x}, PID=0x{d['product_id']:04x}, IF={d['interface_number']}")
select_window.close()
break
else:
log_message(window, "未找到 HID 设备")
# 连接设备
if event == '-CONNECT-':
try:
vid = int(values['-VID-'], 16)
pid = int(values['-PID-'], 16)
success, msg = rfid.connect(vid, pid)
log_message(window, msg)
if success:
window['-STATUS-'].update('已连接', text_color='green')
else:
window['-STATUS-'].update('连接失败', text_color='red')
except ValueError:
log_message(window, "请输入有效的十六进制 VID/PID")
# 断开连接
if event == '-DISCONNECT-':
msg = rfid.disconnect()
log_message(window, msg)
window['-STATUS-'].update('未连接', text_color='red')
# 读取版本号
if event == '-VERSION-':
if not rfid.is_connected():
log_message(window, "错误: 设备未连接")
else:
log_message(window, "发送命令: 读版本号")
data, err = rfid.send_command(cmd_get_version())
if err:
log_message(window, f"错误: {err}")
else:
log_message(window, f"响应 (hex): ", is_hex=True)
log_message(window, data, is_hex=True)
try:
version = data.decode('ascii', errors='ignore')
log_message(window, f"版本: {version}")
except:
pass
# 蜂鸣器控制
if event == '-BUZZER_ON-':
if not rfid.is_connected():
log_message(window, "错误: 设备未连接")
else:
log_message(window, "发送命令: 打开蜂鸣器")
data, err = rfid.send_command(cmd_buzzer(True))
if err:
log_message(window, f"错误: {err}")
else:
log_message(window, f"响应: {data.hex() if data else '无数据'}")
if event == '-BUZZER_OFF-':
if not rfid.is_connected():
log_message(window, "错误: 设备未连接")
else:
log_message(window, "发送命令: 关闭蜂鸣器")
data, err = rfid.send_command(cmd_buzzer(False))
if err:
log_message(window, f"错误: {err}")
else:
log_message(window, f"响应: {data.hex() if data else '无数据'}")
# 设置功率
if event == '-SET_POWER-':
if not rfid.is_connected():
log_message(window, "错误: 设备未连接")
else:
power = int(values['-POWER-'])
log_message(window, f"发送命令: 设置功率 = {power}")
data, err = rfid.send_command(cmd_set_power(power))
if err:
log_message(window, f"错误: {err}")
else:
log_message(window, f"响应: {data.hex() if data else '无数据'}")
# 设置工作模式
if event == '-SET_MODE-':
if not rfid.is_connected():
log_message(window, "错误: 设备未连接")
else:
mode = 1
if values['-MODE2-']:
mode = 2
elif values['-MODE3-']:
mode = 3
mode_names = {1: '单标签巡查', 2: '被动模式', 3: '多标签巡查'}
log_message(window, f"发送命令: 设置模式 = {mode_names.get(mode, mode)}")
data, err = rfid.send_command(cmd_set_mode(mode))
if err:
log_message(window, f"错误: {err}")
else:
log_message(window, f"响应: {data.hex() if data else '无数据'}")
# 读取 EPC
if event == '-READ_EPC-':
if not rfid.is_connected():
log_message(window, "错误: 设备未连接")
else:
log_message(window, "发送命令: 读取 EPC")
data, err = rfid.send_command(cmd_read_epc())
if err:
log_message(window, f"错误: {err}")
else:
log_message(window, f"响应 (hex): ", is_hex=True)
log_message(window, data, is_hex=True)
# 解析 EPC
if data and len(data) > 10:
# 检查 Card Op Resp
if data[2:4] == b'\x02\x22':
# 有卡
rssi = data[6] if len(data) > 6 else 0
epc_len_indicator = data[7] if len(data) > 7 else 0
epc_len = epc_len_indicator // 4 if epc_len_indicator > 0 else 0
epc_data = data[9:9+epc_len] if len(data) > 9 else b''
log_message(window, f"RSSI: 0x{rssi:02x}")
log_message(window, f"EPC: {epc_data.hex()}")
window['-EPC_DATA-'].update(epc_data.hex())
elif data[2:4] == b'\x01\xff':
log_message(window, "无卡或读取失败")
else:
log_message(window, f"未知响应: {data[2:4].hex()}")
# 选中卡
if event == '-SELECT_CARD-':
if not rfid.is_connected():
log_message(window, "错误: 设备未连接")
else:
epc_hex = values['-EPC_DATA-'].strip()
if not epc_hex:
log_message(window, "错误: 请输入 EPC 数据")
else:
try:
epc_data = hex_to_bytes(epc_hex)
log_message(window, f"发送命令: 选中卡 EPC={epc_data.hex()}")
data, err = rfid.send_command(cmd_select_card(epc_data))
if err:
log_message(window, f"错误: {err}")
else:
log_message(window, f"响应 (hex): {data.hex() if data else '无数据'}")
if data and len(data) > 6:
if data[6] == 0x00:
log_message(window, "选中成功")
else:
log_message(window, f"选中失败: 0x{data[6]:02x}")
except ValueError:
log_message(window, "错误: EPC 数据格式错误,请使用十六进制")
# 写入 EPC
if event == '-WRITE_EPC-':
if not rfid.is_connected():
log_message(window, "错误: 设备未连接")
else:
new_epc_hex = values['-NEW_EPC-'].strip()
if not new_epc_hex:
log_message(window, "错误: 请输入新的 EPC 数据")
else:
try:
new_epc = hex_to_bytes(new_epc_hex)
# 这里需要一个旧的 EPC CRC暂时使用示例值
old_epc_crc = bytes([0xca, 0x9e]) # 需要根据实际 EPC 计算
log_message(window, f"发送命令: 写入 EPC={new_epc.hex()}")
data, err = rfid.send_command(cmd_write_epc(old_epc_crc, new_epc))
if err:
log_message(window, f"错误: {err}")
else:
log_message(window, f"响应 (hex): {data.hex() if data else '无数据'}")
except ValueError:
log_message(window, "错误: EPC 数据格式错误,请使用十六进制")
# 清空日志
if event == '-CLEAR_LOG-':
window['-LOG-'].update('')
window.close()
if __name__ == '__main__':
main()