init weather_station repo

This commit is contained in:
2026-01-24 13:02:13 +08:00
commit 38023a9bf5
18 changed files with 3333 additions and 0 deletions

93
src/app.py Normal file
View File

@@ -0,0 +1,93 @@
# ESP8266天气站主程序
# 首先尝试连接已保存的WiFi失败则启动CaptivePortal进行配置
import gc, time, sys, machine
# 使用全局的WiFiManager实例
from wifi_manager import wifi_manager
print("尝试连接已保存的WiFi网络...")
if wifi_manager.connect():
# 连接成功
ip = wifi_manager.get_ip()
print(f"WiFi连接成功IP地址: {ip}")
# 在这里可以添加主应用程序代码
# 例如:启动天气数据获取和显示
print("启动主应用程序...")
# 示例:保持连接
try:
while True:
# 检查连接状态
if not wifi_manager.is_connected():
print("WiFi连接断开")
break
# 每3秒报告一次状态
time.sleep(3)
gc.collect()
print(f"正常运行中IP: {ip}, 可用内存: {gc.mem_free()} bytes")
except KeyboardInterrupt:
print("用户中断")
finally:
wifi_manager.disconnect()
print("应用程序结束")
else:
# 连接失败启动CaptivePortal进行WiFi配置
print("无法连接到WiFi启动CaptivePortal进行配置")
from captive_portal import CaptivePortal
# 启动CaptivePortal
portal = CaptivePortal()
try:
if portal.start():
# clear
del portal
sys.modules.pop('CaptivePortal', None)
sys.modules.pop('captive_dns', None)
sys.modules.pop('captive_http', None)
sys.modules.pop('server_base', None)
gc.collect()
# CaptivePortal成功配置并连接
print("WiFi配置成功并已连接")
# 在这里可以添加主应用程序代码
print("启动主应用程序...")
# 示例:保持连接
try:
while True:
# 检查连接状态
if not wifi_manager.is_connected():
print("WiFi连接断开")
break
# 每3秒报告一次状态
time.sleep(3)
gc.collect()
print(f"正常运行中,可用内存: {gc.mem_free()} bytes")
except KeyboardInterrupt:
print("用户中断")
finally:
wifi_manager.disconnect()
print("应用程序结束")
else:
print("CaptivePortal未能成功建立连接")
except KeyboardInterrupt:
print("用户中断")
finally:
time.sleep(3)
machine.reset()

11
src/main.py Normal file
View File

@@ -0,0 +1,11 @@
import machine, sys, time
import rom.app
try:
app.start()
except Exception as e:
print("Fatal error in main:")
sys.print_exception(e)
time.sleep(3)
machine.reset()

12
src/rom/boot.py Normal file
View File

@@ -0,0 +1,12 @@
# This file is executed on every boot (including wake-boot from deepsleep)
import esp, gc, uos, machine
esp.osdebug(None)
#uos.dupterm(None, 1) # disable REPL on UART(0)
# cpu freq = 160MHz
machine.freq(160000000)
# memory auto collect (<16KB)
gc.threshold(16384)
gc.collect()

75
src/rom/captive_dns.py Normal file
View File

@@ -0,0 +1,75 @@
import gc
import usocket as socket
from server_base import BaseServer
class DNSQuery:
def __init__(self, data):
self.data = data
self.domain = ""
# header is bytes 0-11, so question starts on byte 12
head = 12
# length of this label defined in first byte
length = data[head]
while length != 0:
label = head + 1
# add the label to the requested domain and insert a dot after
self.domain += data[label : label + length].decode("utf-8") + "."
# check if there is another label after this one
head += length + 1
length = data[head]
def answer(self, ip_addr):
# ** create the answer header **
# copy the ID from incoming request
packet = self.data[:2]
# set response flags (assume RD=1 from request)
packet += b"\x81\x80"
# copy over QDCOUNT and set ANCOUNT equal
packet += self.data[4:6] + self.data[4:6]
# set NSCOUNT and ARCOUNT to 0
packet += b"\x00\x00\x00\x00"
# ** create the answer body **
# respond with original domain name question
packet += self.data[12:]
# pointer back to domain name (at byte 12)
packet += b"\xc0\x0c"
# set TYPE and CLASS (A record and IN class)
packet += b"\x00\x01\x00\x01"
# set TTL to 60sec
packet += b"\x00\x00\x00\x3c"
# set response length to 4 bytes (to hold one IPv4 address)
packet += b"\x00\x04"
# now actually send the IP address as 4 bytes (without the "."s)
packet += bytes(map(int, ip_addr.split(".")))
gc.collect()
return packet
class DNSServer(BaseServer):
def __init__(self, poller, ip_addr):
super().__init__(poller, 53, socket.SOCK_DGRAM, "DNS Server")
self.ip_addr = ip_addr
def handle(self, sock, event, others):
# server doesn't spawn other sockets, so only respond to its own socket
if sock is not self.sock:
return
# check the DNS question, and respond with an answer
try:
data, sender = sock.recvfrom(1024)
request = DNSQuery(data)
print("Sending {:s} -> {:s}".format(request.domain, self.ip_addr))
sock.sendto(request.answer(self.ip_addr), sender)
# help MicroPython with memory management
del request
gc.collect()
except Exception as e:
print("DNS server exception:", e)

292
src/rom/captive_http.py Normal file
View File

@@ -0,0 +1,292 @@
from collections import namedtuple
import network
import uerrno
import uio
import uselect as select
import usocket as socket
from config import config
from server_base import BaseServer
from wifi_manager import wifi_manager
WriteConn = namedtuple("WriteConn", ["body", "buff", "buffmv", "write_range"])
ReqInfo = namedtuple("ReqInfo", ["type", "path", "params", "host"])
import gc
def unquote(string):
"""stripped down implementation of urllib.parse unquote_to_bytes"""
if not string:
return b""
if isinstance(string, str):
string = string.encode("utf-8")
string = string.replace(b"+", b" ")
# split into substrings on each escape character
bits = string.split(b"%")
if len(bits) == 1:
return string # there was no escape character
res = [bits[0]] # everything before the first escape character
# for each escape character, get the next two digits and convert to
for item in bits[1:]:
code = item[:2]
char = bytes([int(code, 16)]) # convert to utf-8-encoded byte
res.append(char) # append the converted character
res.append(
item[2:]
) # append anything else that occurred before the next escape character
return b"".join(res)
class HTTPServer(BaseServer):
def __init__(self, poller, local_ip):
super().__init__(poller, 80, socket.SOCK_STREAM, "HTTP Server")
if type(local_ip) is bytes:
self.local_ip = local_ip
else:
self.local_ip = local_ip.encode()
self.request = dict()
self.conns = dict()
self.routes = {
b"/": b"/rom/index.html",
b"/login": self.login,
b"/scan": self.scan_networks,
}
self.ssid = None
# queue up to 2 connection requests before refusing (ESP8266 memory optimization)
self.sock.listen(2)
self.sock.setblocking(False)
#@micropython.native
def handle(self, sock, event, others):
if sock is self.sock:
# client connecting on port 80, so spawn off a new
# socket to handle this connection
print("- Accepting new HTTP connection")
self.accept(sock)
elif event & select.POLLIN:
# socket has data to read in
print("- Reading incoming HTTP data")
self.read(sock)
elif event & select.POLLOUT:
# existing connection has space to send more data
print("- Sending outgoing HTTP data")
self.write_to(sock)
def accept(self, server_sock):
"""accept a new client request socket and register it for polling"""
try:
client_sock, addr = server_sock.accept()
except OSError as e:
if e.args[0] == uerrno.EAGAIN:
return
client_sock.setblocking(False)
client_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.poller.register(client_sock, select.POLLIN)
def parse_request(self, req):
"""parse a raw HTTP request to get items of interest"""
req_lines = req.split(b"\r\n")
req_type, full_path, http_ver = req_lines[0].split(b" ")
path = full_path.split(b"?")
base_path = path[0]
query = path[1] if len(path) > 1 else None
query_params = (
{
key: val
for key, val in [param.split(b"=") for param in query.split(b"&")]
}
if query
else {}
)
host = [line.split(b": ")[1] for line in req_lines if b"Host:" in line][0]
return ReqInfo(req_type, base_path, query_params, host)
def login(self, params):
# 从URL参数中提取表单数据
ssid = unquote(params.get(b"ssid", None))
password = unquote(params.get(b"password", None))
city = unquote(params.get(b"city", None))
# 使用全局Config实例保存配置
config.set("ssid", ssid)
config.set("password", password)
config.set("city", city)
if config.write():
print("配置保存成功")
else:
print("配置保存失败,数据无效")
# 重定向local_ip
headers = (
b"HTTP/1.1 307 Temporary Redirect\r\nLocation: http://{:s}/\r\n".format(
self.local_ip
)
)
return b"", headers
def scan_networks(self, params):
"""扫描WiFi网络并返回JSON数据"""
try:
# 使用wifi_manager扫描网络
networks = wifi_manager.scan_networks()
import ujson
json_data = ujson.dumps({"networks": networks})
except Exception as e:
print(f"扫描网络时出错: {e}")
json_data = ujson.dumps({"networks": []})
headers = b"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nAccess-Control-Allow-Origin: *\r\n"
return json_data.encode(), headers
def get_response(self, req):
"""generate a response body and headers, given a route"""
headers = b"HTTP/1.1 200 OK\r\n"
route = self.routes.get(req.path, None)
if type(route) is bytes:
# expect a filename, so return contents of file
return open(route, "rb"), headers
if callable(route):
# call a function, which may or may not return a response
response = route(req.params)
body = response[0] or b""
headers = response[1] or headers
return uio.BytesIO(body), headers
headers = b"HTTP/1.1 404 Not Found\r\n"
return uio.BytesIO(b""), headers
def is_valid_req(self, req):
if req.host != self.local_ip:
# force a redirect to the MCU's IP address
return False
# redirect if we don't have a route for the requested path
return req.path in self.routes
def read(self, s):
"""read in client request from socket"""
data = s.read()
if not data:
# no data in the TCP stream, so close the socket
self.close(s)
return
# add new data to the full request
sid = id(s)
self.request[sid] = self.request.get(sid, b"") + data
# check if additional data expected
if data[-4:] != b"\r\n\r\n":
# HTTP request is not finished if no blank line at the end
# wait for next read event on this socket instead
return
# get the completed request
req = self.parse_request(self.request.pop(sid))
if not self.is_valid_req(req):
headers = (
b"HTTP/1.1 307 Temporary Redirect\r\nLocation: http://{:s}/\r\n".format(
self.local_ip
)
)
body = uio.BytesIO(b"")
self.prepare_write(s, body, headers)
return
# by this point, we know the request has the correct
# host and a valid route
body, headers = self.get_response(req)
self.prepare_write(s, body, headers)
def prepare_write(self, s, body, headers):
# add newline to headers to signify transition to body
headers += "\r\n"
# TCP/IP MSS is 536 bytes, so create buffer of this size and
# initially populate with header data
buff = bytearray(headers + "\x00" * (536 - len(headers)))
# use memoryview to read directly into the buffer without copying
buffmv = memoryview(buff)
# start reading body data into the memoryview starting after
# the headers, and writing at most the remaining space of the buffer
# return the number of bytes written into the memoryview from the body
bw = body.readinto(buffmv[len(headers) :], 536 - len(headers))
# save place for next write event
c = WriteConn(body, buff, buffmv, [0, len(headers) + bw])
self.conns[id(s)] = c
# let the poller know we want to know when it's OK to write
self.poller.modify(s, select.POLLOUT)
def write_to(self, sock):
"""write the next message to an open socket"""
# get the data that needs to be written to this socket
c = self.conns[id(sock)]
if c:
# write next 536 bytes (max) into the socket
try:
bytes_written = sock.write(
c.buffmv[c.write_range[0] : c.write_range[1]]
)
except OSError:
print("cannot write to a closed socket")
self.close(sock)
return
if not bytes_written or c.write_range[1] < 536:
# either we wrote no bytes, or we wrote < TCP MSS of bytes
# so we're done with this connection
self.close(sock)
else:
# more to write, so read the next portion of the data into
# the memoryview for the next send event
self.buff_advance(c, bytes_written)
def buff_advance(self, c, bytes_written):
"""advance the writer buffer for this connection to next outgoing bytes"""
if bytes_written == c.write_range[1] - c.write_range[0]:
# wrote all the bytes we had buffered into the memoryview
# set next write start on the memoryview to the beginning
c.write_range[0] = 0
# set next write end on the memoryview to length of bytes
# read in from remainder of the body, up to TCP MSS
c.write_range[1] = c.body.readinto(c.buff, 536)
else:
# didn't read in all the bytes that were in the memoryview
# so just set next write start to where we ended the write
c.write_range[0] += bytes_written
def close(self, s):
"""close the socket, unregister from poller, and delete connection"""
s.close()
self.poller.unregister(s)
sid = id(s)
if sid in self.request:
del self.request[sid]
if sid in self.conns:
c = self.conns[sid]
# 检查body是文件对象而不是BytesIO则关闭文件
if hasattr(c.body, "close"):
c.body.close()
del self.conns[sid]
gc.collect()

133
src/rom/captive_portal.py Normal file
View File

@@ -0,0 +1,133 @@
import gc
import network
import ubinascii as binascii
import uselect as select
import utime as time
from captive_dns import DNSServer
from captive_http import HTTPServer
from config import config
from wifi_manager import wifi_manager
class CaptivePortal:
AP_IP = "192.168.4.1"
MAX_CONN_ATTEMPTS = 10
def __init__(self, essid=None):
self.local_ip = self.AP_IP
self.ap_if = network.WLAN(network.AP_IF)
if essid is None:
essid = b"ws2-%s" % binascii.hexlify(self.ap_if.config("mac")[-3:])
self.essid = essid
self.dns_server = None
self.http_server = None
self.poller = select.poll()
def start_access_point(self):
# sometimes need to turn off AP before it will come up properly
self.ap_if.active(False)
while not self.ap_if.active():
print("Waiting for access point to turn on")
self.ap_if.active(True)
time.sleep(1)
# IP address, netmask, gateway, DNS
self.ap_if.ifconfig(
(self.local_ip, "255.255.255.0", self.local_ip, self.local_ip)
)
self.ap_if.config(essid=self.essid, authmode=network.AUTH_OPEN)
print("AP mode configured:", self.ap_if.ifconfig())
def connect_to_wifi(self):
# 使用全局WiFiManager进行连接
if wifi_manager.connect():
self.local_ip = wifi_manager.get_ip()
return True
return False
def check_valid_wifi(self):
if not wifi_manager.is_connected():
if config.is_valid():
# have credentials to connect, but not yet connected
# return value based on whether the connection was successful
return self.connect_to_wifi()
# not connected, and no credentials to connect yet
return False
return True
def captive_portal(self):
print("Starting captive portal")
self.start_access_point()
if self.http_server is None:
self.http_server = HTTPServer(self.poller, self.local_ip)
print("Configured HTTP server")
if self.dns_server is None:
self.dns_server = DNSServer(self.poller, self.local_ip)
print("Configured DNS server")
try:
while True:
gc.collect()
# check for socket events and handle them
for response in self.poller.ipoll(1000):
sock, event, *others = response
is_handled = self.handle_dns(sock, event, others)
if not is_handled:
self.handle_http(sock, event, others)
if self.check_valid_wifi():
print("Connected to WiFi!")
self.http_server.stop(self.poller)
self.dns_server.stop(self.poller)
break
except KeyboardInterrupt:
print("Captive portal stopped")
self.cleanup()
return wifi_manager.is_connected()
def handle_dns(self, sock, event, others):
if sock is self.dns_server.sock:
# ignore UDP socket hangups
if event == select.POLLHUP:
return True
self.dns_server.handle(sock, event, others)
return True
return False
def handle_http(self, sock, event, others):
self.http_server.handle(sock, event, others)
def cleanup(self):
print("Cleaning up")
if self.ap_if.active():
self.ap_if.active(False)
print("Turned off access point")
if self.dns_server:
self.dns_server.stop(self.poller)
self.dns_server = None
print("Discard portal.dns_server")
if self.http_server:
self.http_server.stop(self.poller)
self.http_server = None
print("Discard portal.http_server")
gc.collect()
def try_connect_from_file(self):
if config.is_valid():
if self.connect_to_wifi():
return True
# WiFi Connection failed but keep credentials for future retries
print("连接失败但保留配置,可以稍后重试")
return False
def start(self):
# turn off station interface to force a reconnect
wifi_manager.disconnect()
if not self.try_connect_from_file():
return self.captive_portal()

91
src/rom/config.py Normal file
View File

@@ -0,0 +1,91 @@
import ujson
import uos
class Config:
"""通用配置类,用于保存各种系统配置项"""
CONFIG_FILE = "/config.json"
_instance = None
_initialized = False
def __new__(cls):
"""单一实例模式实现"""
if cls._instance is None:
cls._instance = super(Config, cls).__new__(cls)
return cls._instance
def __init__(self, ssid=None, password=None, city=None, **kwargs):
"""初始化配置,只在第一次调用时执行"""
if not self._initialized:
self.config_data = {"ssid": ssid, "password": password, "city": city}
# 添加其他可能的自定义配置项
self.config_data.update(kwargs)
self._initialized = True
# 自动加载配置文件
self.load()
def write(self):
"""将配置写入JSON格式的配置文件"""
if self.is_valid():
# 只将非None的值保存到文件
save_data = {k: v for k, v in self.config_data.items() if v is not None}
with open(self.CONFIG_FILE, "w") as f:
ujson.dump(save_data, f)
print(f"写入配置到 {self.CONFIG_FILE}")
# 写入后重新加载配置
return self.load()
return False
def load(self):
"""从配置文件加载配置"""
try:
with open(self.CONFIG_FILE, "r") as f:
loaded_data = ujson.load(f)
self.config_data.update(loaded_data)
print(f"{self.CONFIG_FILE} 加载配置")
# 如果核心配置不完整,可能需要清除文件
if not self.is_valid():
print("配置不完整,清除配置文件")
self.remove()
except (OSError, ValueError):
pass
return self
def get(self, key, default=None):
"""获取配置项"""
return self.config_data.get(key, default)
def set(self, key, value):
"""设置配置项"""
self.config_data[key] = value
def remove(self):
"""删除配置文件并重置配置"""
try:
uos.remove(self.CONFIG_FILE)
except OSError:
pass
# 保留默认的关键配置项
self.config_data = {"ssid": None, "password": None, "city": None}
def is_valid(self):
"""检查核心配置项是否有效"""
ssid = self.config_data.get("ssid")
password = self.config_data.get("password")
# 确保SSID和密码都是字符串类型且不为空
if not ssid:
return False
if not password:
return False
return True
# 全局配置实例
config = Config()

317
src/rom/index.html Normal file
View File

@@ -0,0 +1,317 @@
<html>
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>WiFi认证</title>
<style>
body {
font-family: sans-serif;
background: #3498db;
width: 100%;
text-align: center;
margin: 20px 0;
position: relative;
}
p {
font-size: 12px;
text-decoration: none;
color: #fff;
}
h1 {
font-size: 1.5em;
color: #525252;
}
.box {
background: white;
width: 40ch;
border-radius: 6px;
margin: 0 auto;
padding: 10px 0;
position: relative;
}
input[type="text"],
input[type="password"] {
background: #ecf0f1;
border: #ccc 1px solid;
border-bottom: #ccc 2px solid;
padding: 8px;
width: 80%;
color: #000;
margin-top: 10px;
font-size: 1em;
border-radius: 4px;
}
.btn {
background: #2ecc71;
width: 80%;
padding: 8px 0;
color: white;
border-radius: 4px;
border: #27ae60 1px solid;
margin: 20 auto;
font-weight: 800;
font-size: 0.9em;
cursor: pointer;
}
.btn:hover {
background: #27ae60;
}
.config-title {
font-size: 1em;
color: #525252;
margin-top: 15px;
}
.wifi-list {
width: 80%;
max-height: 200px;
overflow-y: auto;
margin: 10px auto;
text-align: left;
border: 1px solid #eee;
background: #f8f8f8;
border-radius: 4px;
padding: 5px;
}
.wifi-item {
padding: 5px;
cursor: pointer;
border-bottom: 1px solid #eee;
display: flex;
align-items: center;
justify-content: space-between;
}
.wifi-item:hover {
background: #e0e0e0;
}
.wifi-name {
flex-grow: 1;
}
.wifi-signal {
font-size: 12px;
color: #888;
}
.refresh-btn {
position: absolute;
top: 10px;
right: 10px;
background: none;
border: none;
cursor: pointer;
font-size: 20px;
color: #7f8c8d;
width: 30px;
height: 30px;
display: flex;
align-items: center;
justify-content: center;
border-radius: 50%;
}
.refresh-btn:hover {
background: #ecf0f1;
}
.refresh-btn:disabled {
color: #bdc3c7;
cursor: not-allowed;
}
.wifi-icon {
position: relative;
display: inline-block;
width: 18px;
height: 12px;
margin-right: 5px;
}
.wifi-icon-bar {
position: absolute;
bottom: 0;
background: #7f8c8d;
border-radius: 1px;
}
.wifi-icon-bar:nth-child(1) {
left: 0;
width: 3px;
height: 3px;
}
.wifi-icon-bar:nth-child(2) {
left: 5px;
width: 3px;
height: 6px;
}
.wifi-icon-bar:nth-child(3) {
left: 10px;
width: 3px;
height: 9px;
}
.wifi-icon-bar:nth-child(4) {
left: 15px;
width: 3px;
height: 12px;
}
.wifi-signal-4 .wifi-icon-bar {
background: #2ecc71;
}
.wifi-signal-3 .wifi-icon-bar:nth-child(1),
.wifi-signal-3 .wifi-icon-bar:nth-child(2),
.wifi-signal-3 .wifi-icon-bar:nth-child(3) {
background: #f1c40f;
}
.wifi-signal-2 .wifi-icon-bar:nth-child(1),
.wifi-signal-2 .wifi-icon-bar:nth-child(2) {
background: #e67e22;
}
.wifi-signal-1 .wifi-icon-bar:nth-child(1) {
background: #e74c3c;
}
/* 遮罩层样式 */
.overlay {
position: fixed;
top: 0;
left: 0;
width: 100%;
height: 100%;
background: rgba(0, 0, 0, 0.7);
display: none;
z-index: 1000;
justify-content: center;
align-items: center;
}
.overlay-content {
background: white;
padding: 30px;
border-radius: 8px;
text-align: center;
max-width: 80%;
}
.spinner-small {
width: 30px;
height: 30px;
border: 3px solid #f3f3f3;
border-top: 3px solid #3498db;
border-radius: 50%;
margin: 15px auto;
animation: spin 1.5s linear infinite;
}
</style>
</head>
<body>
<form action="/login" method="get" class="box">
<h1>WiFi 配置</h1>
<button
type="button"
class="refresh-btn"
onclick="refreshList()"
title="刷新WiFi列表"
>
</button>
<div id="wifiList" class="wifi-list"></div>
<input
type="text"
id="ssid"
placeholder="WiFi名称"
name="ssid"
required
/><br />
<input
type="password"
placeholder="WiFi密码"
name="password"
required
/><br />
<input
type="text"
placeholder="城市名称"
name="city"
value=""
/><br />
<button type="submit" class="btn">保存配置</button>
</form>
<!-- 遮罩层 -->
<div class="overlay" id="connectingOverlay">
<div class="overlay-content">
<h2>正在连接到WiFi</h2>
<div class="spinner-small"></div>
<p>设备正在尝试连接,请稍候...</p>
</div>
</div>
<script>
window.onload = function () {
fetchWifiList();
// 添加表单提交事件监听
document
.querySelector(".box")
.addEventListener("submit", function (e) {
// 显示遮罩层
document.getElementById(
"connectingOverlay",
).style.display = "flex";
});
};
function fetchWifiList() {
const listContainer = document.getElementById("wifiList");
listContainer.innerHTML = "Loading...";
fetch("/scan")
.then((response) => response.json())
.then((data) => {
if (data.networks) {
data.networks.sort((a, b) => b.rssi - a.rssi);
displayWifiList(data.networks);
}
})
.catch((error) => {
console.error("Error fetching WiFi list:", error);
});
}
function getSignalLevel(rssi) {
if (rssi >= -50) return 4;
if (rssi >= -60) return 3;
if (rssi >= -70) return 2;
return 1;
}
function createSignalIcon(signalLevel) {
const icon = document.createElement("span");
icon.className = `wifi-icon wifi-signal-${signalLevel}`;
for (let i = 0; i < 4; i++) {
const bar = document.createElement("span");
bar.className = "wifi-icon-bar";
icon.appendChild(bar);
}
return icon;
}
function displayWifiList(networks) {
const listContainer = document.getElementById("wifiList");
listContainer.innerHTML = "";
for (const network of networks) {
const item = document.createElement("div");
item.className = "wifi-item";
const nameContainer = document.createElement("div");
nameContainer.className = "wifi-name";
const signalLevel = getSignalLevel(network.rssi);
nameContainer.appendChild(createSignalIcon(signalLevel));
const nameText = document.createTextNode(network.ssid);
nameContainer.appendChild(nameText);
const signalContainer = document.createElement("div");
signalContainer.className = "wifi-signal";
signalContainer.textContent = `${network.rssi} dBm`;
item.appendChild(nameContainer);
item.appendChild(signalContainer);
item.onclick = function () {
document.getElementById("ssid").value = network.ssid;
};
listContainer.appendChild(item);
}
}
function refreshList() {
const btn = document.querySelector(".refresh-btn");
btn.innerHTML = "⟳";
btn.disabled = true;
fetchWifiList();
setTimeout(() => {
btn.innerHTML = "↻";
btn.disabled = false;
}, 1000);
}
</script>
</body>
</html>

196
src/rom/nanoweb.py Normal file
View File

@@ -0,0 +1,196 @@
import uasyncio as asyncio
import uerrno
__version__ = '1.0.0'
class HttpError(Exception):
pass
class Request:
url = ""
method = ""
headers = {}
route = ""
read = None
write = None
close = None
def __init__(self):
self.url = ""
self.method = ""
self.headers = {}
self.route = ""
self.read = None
self.write = None
self.close = None
async def write(request, data):
await request.write(
data.encode('ISO-8859-1') if type(data) == str else data
)
async def error(request, code, reason):
await request.write("HTTP/1.1 %s %s\r\n\r\n" % (code, reason))
await request.write("<h1>%s</h1>" % (reason))
async def send_file(request, filename, segment=64, binary=False):
try:
with open(filename, 'rb' if binary else 'r') as f:
while True:
data = f.read(segment)
if not data:
break
await request.write(data)
except OSError as e:
if e.args[0] != uerrno.ENOENT:
raise
raise HttpError(request, 404, "File Not Found")
class Nanoweb:
extract_headers = ('Authorization', 'Content-Length', 'Content-Type')
headers = {}
routes = {}
assets_extensions = ('html', 'css', 'js')
callback_request = None
callback_error = staticmethod(error)
STATIC_DIR = ''
INDEX_FILE = STATIC_DIR + '/index.html'
def __init__(self, port=80, address='0.0.0.0'):
self.port = port
self.address = address
def route(self, route):
"""Route decorator"""
def decorator(func):
self.routes[route] = func
return func
return decorator
async def generate_output(self, request, handler):
"""Generate output from handler
`handler` can be :
* dict representing the template context
* string, considered as a path to a file
* tuple where the first item is filename and the second
is the template context
* callable, the output of which is sent to the client
"""
while True:
if isinstance(handler, dict):
handler = (request.url, handler)
if isinstance(handler, str):
await write(request, "HTTP/1.1 200 OK\r\n\r\n")
await send_file(request, handler)
elif isinstance(handler, tuple):
await write(request, "HTTP/1.1 200 OK\r\n\r\n")
filename, context = handler
context = context() if callable(context) else context
try:
with open(filename, "r") as f:
for l in f:
await write(request, l.format(**context))
except OSError as e:
if e.args[0] != uerrno.ENOENT:
raise
raise HttpError(request, 404, "File Not Found")
else:
handler = await handler(request)
if handler:
# handler can returns data that can be fed back
# to the input of the function
continue
break
async def handle(self, reader, writer):
items = await reader.readline()
items = items.decode('ascii').split()
if len(items) != 3:
return
request = Request()
request.read = reader.read
request.write = writer.awrite
request.close = writer.aclose
request.method, request.url, version = items
try:
try:
if version not in ("HTTP/1.0", "HTTP/1.1"):
raise HttpError(request, 505, "Version Not Supported")
while True:
items = await reader.readline()
items = items.decode('ascii').split(":", 1)
if len(items) == 2:
header, value = items
value = value.strip()
if header in self.extract_headers:
request.headers[header] = value
elif len(items) == 1:
break
if self.callback_request:
self.callback_request(request)
if request.url in self.routes:
# 1. If current url exists in routes
request.route = request.url
await self.generate_output(request,
self.routes[request.url])
else:
# 2. Search url in routes with wildcard
for route, handler in self.routes.items():
if route == request.url \
or (route[-1] == '*' and
request.url.startswith(route[:-1])):
request.route = route
await self.generate_output(request, handler)
break
else:
# 3. Try to load index file
if request.url in ('', '/'):
await send_file(request, self.INDEX_FILE)
else:
# 4. Current url have an assets extension ?
for extension in self.assets_extensions:
if request.url.endswith('.' + extension):
await send_file(
request,
'%s%s' % (
self.STATIC_DIR,
request.url,
),
binary=True,
)
break
else:
raise HttpError(request, 404, "File Not Found")
except HttpError as e:
request, code, message = e.args
await self.callback_error(request, code, message)
except OSError as e:
# Skip ECONNRESET error (client abort request)
if e.args[0] != uerrno.ECONNRESET:
raise
finally:
await writer.aclose()
async def run(self):
return await asyncio.start_server(self.handle, self.address, self.port)

27
src/rom/server_base.py Normal file
View File

@@ -0,0 +1,27 @@
import select
import socket
class BaseServer:
"""基础服务器类为HTTP和DNS服务器提供通用功能"""
def __init__(self, poller, port, sock_type, name):
self.name = name
# create socket with correct type: stream (TCP) or datagram (UDP)
self.sock = socket.socket(socket.AF_INET, sock_type)
# register to get event updates for this socket
self.poller = poller
self.poller.register(self.sock, select.POLLIN)
addr = socket.getaddrinfo("0.0.0.0", port)[0][-1]
# allow new requests while still sending last response
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind(addr)
print(self.name, "listening on", addr)
def stop(self, poller):
poller.unregister(self.sock)
self.sock.close()
print(self.name, "stopped")

164
src/rom/wifi_manager.py Normal file
View File

@@ -0,0 +1,164 @@
import time
import network
from config import config
class WiFiManager:
"""WiFi连接管理类负责处理WiFi连接相关功能"""
MAX_CONN_ATTEMPTS = 12
def __init__(self):
self.sta_if = network.WLAN(network.STA_IF)
self.config = config
self._interface_initialized = False
def _ensure_interface_active(self):
"""确保WLAN接口处于活动状态"""
try:
if not self.sta_if.active():
self.sta_if.active(True)
time.sleep(1) # 等待接口激活
self._interface_initialized = True
return True
except Exception as e:
print(f"激活WiFi接口失败: {e}")
return False
def _safe_connect_check(self):
"""安全地检查连接状态"""
try:
return self.sta_if.isconnected()
except:
return False
def is_connected(self):
"""检查是否已连接到WiFi"""
if not self._interface_initialized:
return False
return self._safe_connect_check()
def get_ip(self):
"""获取当前IP地址"""
if self.is_connected():
try:
return self.sta_if.ifconfig()[0]
except:
return None
return None
def connect(self):
"""尝试连接到WiFi"""
# 加载配置
if not self.config.load().is_valid():
print("没有有效的WiFi配置")
return False
ssid = self.config.get("ssid")
password = self.config.get("password")
if not ssid or not password:
print("SSID或密码为空")
return False
print(f"正在尝试连接到SSID: {ssid}")
try:
# 确保接口处于活动状态
if not self._ensure_interface_active():
return False
# 如果已经连接,先断开
if self._safe_connect_check():
self.sta_if.disconnect()
time.sleep(1)
# 执行连接
self.sta_if.connect(ssid, password)
# 等待连接完成
attempts = 1
while attempts <= self.MAX_CONN_ATTEMPTS:
if self._safe_connect_check():
ip = self.get_ip()
if ip:
print(f"连接成功! IP: {ip}")
return True
print(f"连接尝试 {attempts}/{self.MAX_CONN_ATTEMPTS}...")
time.sleep(2)
attempts += 1
# 连接失败
print(f"连接失败: {ssid}")
self.clear_config()
try:
print(f"WLAN状态: {self.sta_if.status()}")
except:
pass
return False
except Exception as e:
print(f"连接过程中发生错误: {e}")
return False
def disconnect(self):
"""断开WiFi连接"""
try:
if self._safe_connect_check():
self.sta_if.disconnect()
time.sleep(1)
print("已断开WiFi连接")
except Exception as e:
print(f"断开连接时出错: {e}")
def scan_networks(self):
"""扫描可用的WiFi网络"""
try:
# 确保接口处于活动状态
if not self._ensure_interface_active():
return []
# 如果已连接,先断开
if self._safe_connect_check():
self.sta_if.disconnect()
time.sleep(1)
# 执行扫描
networks = self.sta_if.scan()
# 处理结果
result = []
for net in networks:
try:
ssid = net[0].decode("utf-8", "ignore")
if ssid: # 只添加非隐藏网络
result.append({"ssid": ssid, "rssi": net[3]})
except:
continue # 跳过有问题的网络
return result
except Exception as e:
print(f"扫描WiFi网络时出错: {e}")
return []
def reset(self):
"""重置WiFi配置"""
try:
self.config.remove()
print("WiFi配置已重置")
except Exception as e:
print(f"重置配置时出错: {e}")
def clear_config(self):
"""清除WiFi配置可选操作"""
try:
self.config.set("ssid", None)
print("WiFi配置已临时清空")
except Exception as e:
print(f"清除配置时出错: {e}")
# 全局WiFi管理器实例
wifi_manager = WiFiManager()