diff --git a/src/rom/app.py b/src/rom/app.py
index cdef1aa..f1b0048 100644
--- a/src/rom/app.py
+++ b/src/rom/app.py
@@ -41,6 +41,124 @@ async def json_response(request, data):
await request.write("Content-Type: application/json; charset=utf-8\r\n\r\n")
await request.write(data)
+CREDENTIALS = ('admin', config.get('web_password', 'admin'))
+def authenticate(credentials):
+ async def fail(request):
+ await request.write("HTTP/1.1 401 Unauthorized\r\n")
+ await request.write('WWW-Authenticate: Basic realm="Restricted"\r\n\r\n')
+ await request.write("
Unauthorized
")
+
+ def decorator(func):
+ async def wrapper(request):
+ from ubinascii import a2b_base64 as base64_decode
+ header = request.headers.get('Authorization', None)
+ if header is None:
+ return await fail(request)
+
+ # Authorization: Basic XXX
+ kind, authorization = header.strip().split(' ', 1)
+ if kind != "Basic":
+ return await fail(request)
+
+ authorization = base64_decode(authorization.strip()) \
+ .decode('ascii') \
+ .split(':')
+
+ if list(credentials) != list(authorization):
+ return await fail(request)
+
+ return await func(request)
+ return wrapper
+ return decorator
+
+# /status: 获取系统状态
+@authenticate(credentials=CREDENTIALS)
+async def sys_status(request):
+ await json_response(request,
+ json.dumps({
+ "time": "{}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}".format(
+ *time.localtime()
+ ),
+ "uptime": str(f"{time.ticks_ms() // 1000} sec"),
+ "memory": str(f"{gc.mem_free() // 1000} KB"),
+ "uuid": uuid(),
+ "platform": str(sys.platform),
+ "version": str(sys.version),
+ })
+ )
+
+# /lcd: 获取LCD状态
+@authenticate(credentials=CREDENTIALS)
+async def lcd_status(request):
+ # 返回LCD状态
+ await json_response(request,
+ json.dumps({
+ "ready": display.is_ready(),
+ "brightness": display.brightness(),
+ "ui_type": display.ui_type,
+ "data": display.ui_data,
+ })
+ )
+
+# /config: 获取当前配置
+@authenticate(credentials=CREDENTIALS)
+async def config_get(request):
+ # 返回所有配置项
+ await json_response(request, json.dumps(config.config_data))
+
+# /lcd/set: 设置LCD状态
+@authenticate(credentials=CREDENTIALS)
+async def lcd_set(request):
+ ack = {"status": "success"}
+ try:
+ post_data = await post_parse(request)
+
+ for k, v in json.loads(post_data).items():
+ if k == "brightness":
+ display.brightness(int(v))
+ except Exception as e:
+ ack["status"] = "error"
+ ack["message"] = str(e)
+ finally:
+ await json_response(request, json.dumps(ack))
+
+# /config/set: 更新配置
+# curl -H "Content-Type: application/json" -X POST -d '{"city":"xxx","who":"ami"}' 'http:///config/set'
+@authenticate(credentials=CREDENTIALS)
+async def config_update(request):
+ ack = {"status": "success"}
+ try:
+ post_data = await post_parse(request)
+
+ for k, v in json.loads(post_data).items():
+ config.set(k, v)
+ config.write()
+ except Exception as e:
+ ack["status"] = "error"
+ ack["message"] = str(e)
+ finally:
+ await json_response(request, json.dumps(ack))
+
+# /exec: 执行命令并返回
+# {"cmd":"import network;R=network.WLAN().config(\"mac\").hex()", "token":"xxx"}
+@authenticate(credentials=CREDENTIALS)
+async def eval_cmd(request):
+ ack = {"status": "success"}
+ try:
+ post_data = await post_parse(request)
+
+ cmd = json.loads(post_data).get("cmd")
+ token = json.loads(post_data).get("token")
+ if cmd and token == uuid:
+ _NS = {}
+ exec(cmd, _NS)
+ ack["result"] = str(_NS.get("R"))
+ except Exception as e:
+ ack["status"] = "error"
+ ack["message"] = str(e)
+ finally:
+ await json_response(request, json.dumps(ack))
+
# ntp时钟同步
def sync_ntp_time():
import ntptime
@@ -58,11 +176,7 @@ def sync_ntp_time():
# 简化的天气数据获取函数
async def fetch_weather_data(city=None):
- """获取天气数据,返回JSON格式数据
- Args:
- city: 城市名称,如果为None则从配置文件获取
- """
-
+ # 获取天气数据,返回JSON格式数据
try:
import aiohttp
@@ -230,93 +344,15 @@ def start():
# website top directory
naw.STATIC_DIR = "/rom/www"
- # /status
- @naw.route("/status")
- async def sys_status(request):
- await json_response(request,
- json.dumps({
- "time": "{}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}".format(
- *time.localtime()
- ),
- "uptime": str(f"{time.ticks_ms() // 1000} sec"),
- "memory": str(f"{gc.mem_free() // 1000} KB"),
- "uuid": uuid(),
- "platform": str(sys.platform),
- "version": str(sys.version),
- })
- )
-
- # /lcd: 获取LCD状态
- @naw.route("/lcd")
- async def lcd_status(request):
- # 返回LCD状态
- await json_response(request,
- json.dumps({
- "ready": display.is_ready(),
- "brightness": display.brightness(),
- "ui_type": display.ui_type,
- "data": display.ui_data,
- })
- )
-
- # /config: 获取当前配置
- @naw.route("/config")
- async def config_get(request):
- # 返回所有配置项
- await json_response(request, json.dumps(config.config_data))
-
- # /lcd/set: 设置LCD状态
- @naw.route("/lcd/set")
- async def lcd_set(request):
- ack = {"status": "success"}
- try:
- post_data = await post_parse(request)
-
- for k, v in json.loads(post_data).items():
- if k == "brightness":
- display.brightness(int(v))
- except Exception as e:
- ack["status"] = "error"
- ack["message"] = str(e)
- finally:
- await json_response(request, json.dumps(ack))
-
- # /config/set: 更新配置
- # curl -H "Content-Type: application/json" -X POST -d '{"city":"xxx","who":"ami"}' 'http:///config/set'
- @naw.route("/config/set")
- async def config_update(request):
- ack = {"status": "success"}
- try:
- post_data = await post_parse(request)
-
- for k, v in json.loads(post_data).items():
- config.set(k, v)
- config.write()
- except Exception as e:
- ack["status"] = "error"
- ack["message"] = str(e)
- finally:
- await json_response(request, json.dumps(ack))
-
- # /exec: 执行命令并返回
- # {"cmd":"import network;R=network.WLAN().config(\"mac\").hex()", "token":"xxx"}
- @naw.route("/exec")
- async def eval_cmd(request):
- ack = {"status": "success"}
- try:
- post_data = await post_parse(request)
-
- cmd = json.loads(post_data).get("cmd")
- token = json.loads(post_data).get("token")
- if cmd and token == uuid:
- _NS = {}
- exec(cmd, _NS)
- ack["result"] = str(_NS.get("R"))
- except Exception as e:
- ack["status"] = "error"
- ack["message"] = str(e)
- finally:
- await json_response(request, json.dumps(ack))
+ # Declare route from a dict
+ naw.routes = {
+ '/lcd': lcd_status,
+ '/config': config_get,
+ '/status': sys_status,
+ '/exec': eval_cmd,
+ '/lcd/set': lcd_set,
+ '/config/set': config_update,
+ }
# create task
loop = asyncio.get_event_loop()
diff --git a/src/rom/display.py b/src/rom/display.py
index 88cc926..ae283c7 100644
--- a/src/rom/display.py
+++ b/src/rom/display.py
@@ -68,7 +68,7 @@ class Display:
self.tft.init()
self.tft.fill(0)
self.show_jpg(self.bootimg, 80, 80)
- self.message("WS2 v1.2.2 (20260201)")
+ self.message("WS2 v1.3.0 (20260201)")
_print_mem()
return True