Files
ws2/src/rom/app.py
2026-02-08 09:29:25 +08:00

449 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ESP8266天气站主程序
# 首先尝试连接已保存的WiFi失败则启动CaptivePortal进行配置
import asyncio
import gc
import json
import sys
import time
import machine
from config import config
from display import display # 导入液晶屏管理模块
from wifi_manager import wifi_manager
def uuid():
return str(machine.unique_id().hex())
def parse_url_params(url):
# 解析URL中的查询参数返回参数字典
params = {}
if "?" in url:
query_string = url.split("?")[1]
param_pairs = query_string.split("&")
for pair in param_pairs:
if "=" in pair:
key, value = pair.split("=", 1)
params[key] = value
return params
async def post_parse(request):
if request.method != "POST":
raise Exception("invalid request")
content_length = int(request.headers["Content-Length"])
return (await request.read(content_length)).decode()
async def json_response(request, data):
await request.write("HTTP/1.1 200 OK\r\n")
await request.write("Content-Type: application/json; charset=utf-8\r\n\r\n")
await request.write(data)
CREDENTIALS = ("admin", config.get("web_password", "admin"))
def authenticate(credentials):
async def fail(request):
await request.write("HTTP/1.1 401 Unauthorized\r\n")
await request.write('WWW-Authenticate: Basic realm="Restricted"\r\n\r\n')
await request.write("<h1>Unauthorized</h1>")
def decorator(func):
async def wrapper(request):
from ubinascii import a2b_base64 as base64_decode
header = request.headers.get("Authorization", None)
if header is None:
return await fail(request)
# Authorization: Basic XXX
kind, authorization = header.strip().split(" ", 1)
if kind != "Basic":
return await fail(request)
authorization = (
base64_decode(authorization.strip()).decode("ascii").split(":")
)
if list(credentials) != list(authorization):
return await fail(request)
return await func(request)
return wrapper
return decorator
# /status: 获取系统状态
@authenticate(credentials=CREDENTIALS)
async def sys_status(request):
await json_response(
request,
json.dumps(
{
"time": "{}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}".format(
*time.localtime()
),
"uptime": time.ticks_ms(),
"mem_free": gc.mem_free(),
"mem_alloc": gc.mem_alloc(),
"uuid": uuid(),
"platform": str(sys.platform),
"version": str(sys.version),
}
),
)
# /lcd: 获取LCD状态
@authenticate(credentials=CREDENTIALS)
async def lcd_status(request):
# 返回LCD状态
await json_response(
request,
json.dumps(
{
"ready": display.is_ready(),
"brightness": display.brightness(),
"ui_type": display.ui_type,
"data": display.ui_data,
}
),
)
# /config: 获取当前配置
@authenticate(credentials=CREDENTIALS)
async def config_get(request):
# 返回所有配置项
await json_response(request, json.dumps(config.config_data))
# /lcd/set: 设置LCD状态
@authenticate(credentials=CREDENTIALS)
async def lcd_set(request):
ack = {"status": "success"}
try:
post_data = await post_parse(request)
for k, v in json.loads(post_data).items():
if k == "brightness":
display.brightness(int(v))
elif k == "ui_type":
display.ui_type = v
config.set(k, v)
config.write()
except Exception as e:
ack["status"] = "error"
ack["message"] = str(e)
finally:
await json_response(request, json.dumps(ack))
# /config/set: 更新配置
# curl -H "Content-Type: application/json" -X POST -d '{"city":"xxx","who":"ami"}' 'http://<url>/config/set'
@authenticate(credentials=CREDENTIALS)
async def config_update(request):
ack = {"status": "success"}
try:
post_data = await post_parse(request)
for k, v in json.loads(post_data).items():
config.set(k, v)
config.write()
except Exception as e:
ack["status"] = "error"
ack["message"] = str(e)
finally:
await json_response(request, json.dumps(ack))
# /exec: 执行命令并返回
# {"cmd":"import network;R=network.WLAN().config(\"mac\").hex()", "token":"xxx"}
@authenticate(credentials=CREDENTIALS)
async def eval_cmd(request):
ack = {"status": "success"}
try:
post_data = await post_parse(request)
_json = json.loads(post_data)
cmd = _json.get("cmd")
token = _json.get("token")
if cmd and token == uuid():
_NS = {}
exec(cmd, _NS)
ack["result"] = str(_NS.get("R"))
else:
raise Exception("invalid token")
except Exception as e:
ack["status"] = "error"
ack["message"] = str(e)
finally:
await json_response(request, json.dumps(ack))
# 定时熄屏处理功能熄屏则返回True
def standby_control():
# 获取当前时间 (格式: HH:MM)
now = time.localtime()
current_time = "{:02d}:{:02d}".format(now[3], now[4])
# 从配置中获取熄屏和唤醒时间
standby_time = config.get("standby_time")
wakeup_time = config.get("wakeup_time")
_set = config.get('brightness', 10)
# 如果设置了熄屏和唤醒时间
if standby_time and wakeup_time:
# 如果当前时间在熄屏时间之后,唤醒时间之前,则关闭屏幕
if standby_time <= wakeup_time:
if current_time >= standby_time and current_time < wakeup_time:
_set = 0
else:
if current_time >= standby_time or current_time < wakeup_time:
_set = 0
else:
_set = None
if _set is not None:
if display.brightness():
if _set == 0:
display.brightness(0)
else:
if _set > 0:
display.brightness(config.get("brightness", 10)) # 恢复亮度
# ntp时钟同步
def sync_ntp_time():
import ntptime
ntptime.host = config.get("ntphost", "ntp1.aliyun.com")
t = ntptime.time() + 3600 * 8
rtc = machine.RTC()
tm = time.gmtime(t)
rtc.datetime((tm[0], tm[1], tm[2], tm[6], tm[3], tm[4], tm[5], 0))
# 同步完成后等待调度自动更新UI
print(f"时间同步:{rtc.datetime()}")
# 简化的天气数据获取函数
async def fetch_weather_data(city=None):
# 获取天气数据返回JSON格式数据
try:
import aiohttp
# 从配置文件获取城市,如果没有提供则使用配置中的值
if not city:
city = config.get("cityid") or "北京"
# 读取API URL可附加其他参数
# http://esp.foresh.com/api/ws2/?appid=xxx&appsecert=yyy
url = config.get("weather_api_url", "http://esp.foresh.com/api/ws2/")
full_url = f"{url}{'&' if '?' in url else '?'}uuid={uuid()}&city={city}"
print(f"GET> {full_url}")
# 发送GET请求
async with aiohttp.ClientSession() as session:
async with session.get(full_url) as response:
# 检查响应状态
if response.status == 200:
# 解析JSON数据
wdata = await response.json()
# 提取关键信息,减少内存占用
city = wdata.get("city", "N/A")
weather = wdata.get("weather", 0)
t = wdata.get("temperature", "-")
rh = wdata.get("humidity", "-")
pm = wdata.get("pm25", "-")
ap = wdata.get("atmosp", None)
co2 = wdata.get("co2", None)
aqi = wdata.get("aqi", 0)
advice = wdata.get("advice", [])
lunar = wdata.get("lunar", None)
if city == "N/A":
advice.append("城市配置错误")
advice.append(wifi_manager.get_ip())
display.update_ui(
city,
weather,
advice,
aqi,
lunar,
envdat={"t": t, "rh": rh, "co2": co2, "pm": pm, "ap": ap},
)
else:
print(f"获取天气数据失败,状态码: {response.status}")
except Exception as e:
print(f"获取天气数据出错: {e}")
# 定时调度任务(时间不敏感,但考虑错开以节约内存)
async def sysinfo_update_task():
"""定时后台任务"""
weather_ts = 2 # 启动2s后更新
ntptime_ts = 1 # 启动1s后更新
# 初始化时间戳
start_ticks = time.ticks_ms()
last_weather = last_ntptime = start_ticks
while True:
try:
current_ticks = time.ticks_ms()
# 计算时间差,处理溢出
weather_diff = time.ticks_diff(current_ticks, last_weather)
ntp_diff = time.ticks_diff(current_ticks, last_ntptime)
if weather_diff >= weather_ts * 1000:
# 更新天气数据
gc.collect()
await fetch_weather_data()
last_weather = current_ticks
weather_ts = int(config.get("weather_ts", 600)) # 10min
elif ntp_diff >= ntptime_ts * 1000:
# 更新NTP时间
gc.collect()
sync_ntp_time()
last_ntptime = current_ticks
ntptime_ts = int(config.get("ntptime_ts", 3600)) + 13 # 1hour
except Exception as e:
print(f"定时任务更新错误: {e}")
# 等待x秒再检查1~30
_x = min(30, 1 + min(weather_ts, ntptime_ts) // 10)
await asyncio.sleep(_x)
# LCD UI显示任务
async def ui_task():
"""显示JPG动画的后台任务"""
# 检查液晶屏是否已初始化
if not display.is_ready():
print("液晶屏未初始化,跳过动画任务")
return
try:
# 延迟2s后开始更新
await asyncio.sleep_ms(2 * 1000)
F = 0
while True:
try:
t0 = time.ticks_ms()
# 计算当前帧号(1-10),更新动画
cframe = F % 10
pic = f"/rom/images/T{cframe}.jpg"
display.show_jpg(pic, 160, 160)
# 每隔100帧更新一次UI显示
F += 1
if F % 100 == 0:
standby_control() # 控制开关机
# 只在亮屏时更新显示
if display.brightness():
display.update_ui()
else:
gc.collect(); print(f'LCD.idle.mem: {gc.mem_free()}')
# 每轮清理一次内存
gc.collect()
# 控制帧率
now = time.ticks_ms()
ts = time.ticks_diff(now, t0)
_sT = (100 - ts) if ts < 90 else 10
await asyncio.sleep_ms(_sT)
except Exception as e:
print(f"动画帧错误: {e}")
# 出错后等待1秒再继续
await asyncio.sleep_ms(1000)
except Exception as e:
print(f"动画任务初始化失败: {e}")
def cb_progress(data):
if isinstance(data, bytes):
if data == b"/":
display.portal_info("load iwconfig page ")
elif data == b"/login":
display.portal_info("WiFi connecting ... ")
elif isinstance(data, str):
display.message(data, 19, 204)
if data:
print(f"progress: {str(data)}")
def start():
# 禁用repl
if config.get('console') == "disable":
import os; os.dupterm(None, 1)
# 初始化液晶屏
display.init_display(config.get("bl_mode") == "pwm", 7000)
display.brightness(int(config.get("brightness", 10)))
cb_progress("WiFi connect ...")
if not wifi_manager.connect(cb_progress):
gc.collect()
from captive_portal import CaptivePortal
portal = CaptivePortal()
display.portal_win(portal.essid.decode("ascii"))
portal.start(cb_progress)
# just reboot
machine.reset()
gc.collect()
display.load_ui(config.get('ui_type', 'default'))
# init web server
from rom.nanoweb import Nanoweb
naw = Nanoweb()
# website top directory
naw.STATIC_DIR = "/rom/www"
naw.INDEX_FILE = "/rom/www/index.html"
# Declare route from a dict
naw.routes = {
'/lcd': lcd_status,
'/config': config_get,
'/status': sys_status,
'/exec': eval_cmd,
'/lcd/set': lcd_set,
'/config/set': config_update,
}
# create task
loop = asyncio.get_event_loop()
loop.create_task(naw.run())
# 启动定时更新任务
loop.create_task(sysinfo_update_task())
# 启动动画显示任务
loop.create_task(ui_task())
gc.collect()
print(f"success: {gc.mem_free()}...")
# run!
loop.run_forever()