From 9415bde6b95e93f6c54cfb3a2dc1db298d849482 Mon Sep 17 00:00:00 2001 From: kicer Date: Sun, 1 Feb 2026 15:56:24 +0800 Subject: [PATCH] add web auth --- src/rom/app.py | 220 ++++++++++++++++++++++++++------------------- src/rom/display.py | 2 +- 2 files changed, 129 insertions(+), 93 deletions(-) diff --git a/src/rom/app.py b/src/rom/app.py index cdef1aa..f1b0048 100644 --- a/src/rom/app.py +++ b/src/rom/app.py @@ -41,6 +41,124 @@ async def json_response(request, data): await request.write("Content-Type: application/json; charset=utf-8\r\n\r\n") await request.write(data) +CREDENTIALS = ('admin', config.get('web_password', 'admin')) +def authenticate(credentials): + async def fail(request): + await request.write("HTTP/1.1 401 Unauthorized\r\n") + await request.write('WWW-Authenticate: Basic realm="Restricted"\r\n\r\n') + await request.write("

Unauthorized

") + + def decorator(func): + async def wrapper(request): + from ubinascii import a2b_base64 as base64_decode + header = request.headers.get('Authorization', None) + if header is None: + return await fail(request) + + # Authorization: Basic XXX + kind, authorization = header.strip().split(' ', 1) + if kind != "Basic": + return await fail(request) + + authorization = base64_decode(authorization.strip()) \ + .decode('ascii') \ + .split(':') + + if list(credentials) != list(authorization): + return await fail(request) + + return await func(request) + return wrapper + return decorator + +# /status: 获取系统状态 +@authenticate(credentials=CREDENTIALS) +async def sys_status(request): + await json_response(request, + json.dumps({ + "time": "{}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}".format( + *time.localtime() + ), + "uptime": str(f"{time.ticks_ms() // 1000} sec"), + "memory": str(f"{gc.mem_free() // 1000} KB"), + "uuid": uuid(), + "platform": str(sys.platform), + "version": str(sys.version), + }) + ) + +# /lcd: 获取LCD状态 +@authenticate(credentials=CREDENTIALS) +async def lcd_status(request): + # 返回LCD状态 + await json_response(request, + json.dumps({ + "ready": display.is_ready(), + "brightness": display.brightness(), + "ui_type": display.ui_type, + "data": display.ui_data, + }) + ) + +# /config: 获取当前配置 +@authenticate(credentials=CREDENTIALS) +async def config_get(request): + # 返回所有配置项 + await json_response(request, json.dumps(config.config_data)) + +# /lcd/set: 设置LCD状态 +@authenticate(credentials=CREDENTIALS) +async def lcd_set(request): + ack = {"status": "success"} + try: + post_data = await post_parse(request) + + for k, v in json.loads(post_data).items(): + if k == "brightness": + display.brightness(int(v)) + except Exception as e: + ack["status"] = "error" + ack["message"] = str(e) + finally: + await json_response(request, json.dumps(ack)) + +# /config/set: 更新配置 +# curl -H "Content-Type: application/json" -X POST -d '{"city":"xxx","who":"ami"}' 'http:///config/set' +@authenticate(credentials=CREDENTIALS) +async def config_update(request): + ack = {"status": "success"} + try: + post_data = await post_parse(request) + + for k, v in json.loads(post_data).items(): + config.set(k, v) + config.write() + except Exception as e: + ack["status"] = "error" + ack["message"] = str(e) + finally: + await json_response(request, json.dumps(ack)) + +# /exec: 执行命令并返回 +# {"cmd":"import network;R=network.WLAN().config(\"mac\").hex()", "token":"xxx"} +@authenticate(credentials=CREDENTIALS) +async def eval_cmd(request): + ack = {"status": "success"} + try: + post_data = await post_parse(request) + + cmd = json.loads(post_data).get("cmd") + token = json.loads(post_data).get("token") + if cmd and token == uuid: + _NS = {} + exec(cmd, _NS) + ack["result"] = str(_NS.get("R")) + except Exception as e: + ack["status"] = "error" + ack["message"] = str(e) + finally: + await json_response(request, json.dumps(ack)) + # ntp时钟同步 def sync_ntp_time(): import ntptime @@ -58,11 +176,7 @@ def sync_ntp_time(): # 简化的天气数据获取函数 async def fetch_weather_data(city=None): - """获取天气数据,返回JSON格式数据 - Args: - city: 城市名称,如果为None则从配置文件获取 - """ - + # 获取天气数据,返回JSON格式数据 try: import aiohttp @@ -230,93 +344,15 @@ def start(): # website top directory naw.STATIC_DIR = "/rom/www" - # /status - @naw.route("/status") - async def sys_status(request): - await json_response(request, - json.dumps({ - "time": "{}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}".format( - *time.localtime() - ), - "uptime": str(f"{time.ticks_ms() // 1000} sec"), - "memory": str(f"{gc.mem_free() // 1000} KB"), - "uuid": uuid(), - "platform": str(sys.platform), - "version": str(sys.version), - }) - ) - - # /lcd: 获取LCD状态 - @naw.route("/lcd") - async def lcd_status(request): - # 返回LCD状态 - await json_response(request, - json.dumps({ - "ready": display.is_ready(), - "brightness": display.brightness(), - "ui_type": display.ui_type, - "data": display.ui_data, - }) - ) - - # /config: 获取当前配置 - @naw.route("/config") - async def config_get(request): - # 返回所有配置项 - await json_response(request, json.dumps(config.config_data)) - - # /lcd/set: 设置LCD状态 - @naw.route("/lcd/set") - async def lcd_set(request): - ack = {"status": "success"} - try: - post_data = await post_parse(request) - - for k, v in json.loads(post_data).items(): - if k == "brightness": - display.brightness(int(v)) - except Exception as e: - ack["status"] = "error" - ack["message"] = str(e) - finally: - await json_response(request, json.dumps(ack)) - - # /config/set: 更新配置 - # curl -H "Content-Type: application/json" -X POST -d '{"city":"xxx","who":"ami"}' 'http:///config/set' - @naw.route("/config/set") - async def config_update(request): - ack = {"status": "success"} - try: - post_data = await post_parse(request) - - for k, v in json.loads(post_data).items(): - config.set(k, v) - config.write() - except Exception as e: - ack["status"] = "error" - ack["message"] = str(e) - finally: - await json_response(request, json.dumps(ack)) - - # /exec: 执行命令并返回 - # {"cmd":"import network;R=network.WLAN().config(\"mac\").hex()", "token":"xxx"} - @naw.route("/exec") - async def eval_cmd(request): - ack = {"status": "success"} - try: - post_data = await post_parse(request) - - cmd = json.loads(post_data).get("cmd") - token = json.loads(post_data).get("token") - if cmd and token == uuid: - _NS = {} - exec(cmd, _NS) - ack["result"] = str(_NS.get("R")) - except Exception as e: - ack["status"] = "error" - ack["message"] = str(e) - finally: - await json_response(request, json.dumps(ack)) + # Declare route from a dict + naw.routes = { + '/lcd': lcd_status, + '/config': config_get, + '/status': sys_status, + '/exec': eval_cmd, + '/lcd/set': lcd_set, + '/config/set': config_update, + } # create task loop = asyncio.get_event_loop() diff --git a/src/rom/display.py b/src/rom/display.py index 88cc926..ae283c7 100644 --- a/src/rom/display.py +++ b/src/rom/display.py @@ -68,7 +68,7 @@ class Display: self.tft.init() self.tft.fill(0) self.show_jpg(self.bootimg, 80, 80) - self.message("WS2 v1.2.2 (20260201)") + self.message("WS2 v1.3.0 (20260201)") _print_mem() return True