boot.py from utime import sleep
import network
import socket
import smartconfig
import time # 添加这一行来导入time模块
import ujson # 导入ujson库用于JSON操作
from mq
boot.py
from utime import sleep
import network
import socket
import smartconfig
import time # 添加这一行来导入time模块
import ujson # 导入ujson库用于JSON操作
from mqtt import mqtt
from mqtt import led
from mqtt import ledstate
import _thread
# 文件名用于保存WiFi凭据
credentials_file = "wifi_credentials.json"
def save_credentials(ssid, password):
# 创建一个字典保存WiFi凭据
credentials = {
"ssid": ssid,
"password": password
}
# 将凭据保存到JSON文件中
with open(credentials_file, "w") as f:
ujson.dump(credentials, f)
def load_credentials():
try:
# 尝试从JSON文件中加载凭据
with open(credentials_file, "r") as f:
credentials = ujson.load(f)
return credentials.get("ssid"), credentials.get("password")
except OSError:
return None, None
def inet_pton(ip_str: str):
'''将IP地址字符串转换为字节序列。'''
ip_bytes = b''
ip_segs = ip_str.split('.')
for seg in ip_segs:
ip_bytes += int(seg).to_bytes(1, 'little')
return ip_bytes
def send_ack(local_ip, local_mac):
'''向配置手机发送ACK完成事件。'''
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
data = smartconfig.info()[3].to_bytes(1, 'little') + local_mac
port = 10000 # AirKiss使用的端口
if smartconfig.info()[2] == smartconfig.TYPE_ESPTOUCH:
data += inet_pton(local_ip)
port = 18266 # ESP-Touch使用的端口
print(
f"""- sending ack:
type: {'esptouch' if smartconfig.info()[2] == smartconfig.TYPE_ESPTOUCH else 'airkiss'}
port: {port}
data: {data}
length: {len(data)}
"""
)
for _ in range(20):
sleep(0.1)
try:
udp.sendto(data, ('255.255.255.255', port))
except OSError:
pass
print('- ack was sent')
def start_and_configure_wifi():
'''启动并配置Wi-Fi连接,包括SmartConfig过程和ACK发送。'''
saved_ssids, saved_passwords = load_credentials()
print('- 当前本地存储的账号密码',saved_ssids, saved_passwords)
station = network.WLAN(network.STA_IF)
station.active(True)
print('- 启动smartconfig...')
smartconfig.start()
# 使用计时逻辑等待SmartConfig成功或超时
print('- 等待成功...')
timeout = 30 # 设置超时时间为30秒
start_time = time.time() # 获取当前时间以开始计时
while not smartconfig.success() and (time.time() - start_time) < timeout:
sleep(0.5) # 每0.5秒检查一次是否成功
if smartconfig.success():
print('- 已获取智能配置信息')
ssid, password, sc_type, token = smartconfig.info()
print(ssid, password, sc_type, token)
save_credentials(ssid, password)
else:
print('- smartconfig失败或超时')
saved_ssid, saved_password = load_credentials()
print('- 连接到wifi...')
print('- 账号密码',saved_ssid, saved_password)
station.connect(saved_ssid, saved_password)
print('- 执行完连接网络')
while not station.isconnected():
sleep(0.5)
print('- wifi连接')
while station.ifconfig()[0] == '0.0.0.0':
sleep(0.5)
print('- 获得ip')
print(station.ifconfig())
send_ack(station.ifconfig()[0], station.config('mac'))
ledstate(1)
# 调用函数启动Wi-Fi配置过程
if __name__ == '__main__':
_thread.start_new_thread(led, ())
#配网程序
start_and_configure_wifi()
#MQTT程序
print('- 启动mqtt')
mqtt()
mqtt.py
import socket
import time
from umqttsimple import MQTTClient
import machine
from machine import Pin, ADC
import onewire, ds18x20
import ujson
# 定义全局变量c作为MQTT客户端
c = None
# 定义设备ID
client_id = None
# 定义主题
subject = "temperatureSensor"
#路径 填写自己的mqtt链接
mqurl = " "
#端口
port=1883
# 创建LED对应的Pin对象
led_pin = Pin(2, Pin.OUT)
# 初始化LED状态
ked_state = 2
# ------------------继电器变量开始----------------------
# 创建继电器的Pin对象
p12 = Pin(13, Pin.OUT)
# 初始化继电器状态
relay_state = 0
# ------------------继电器变量结束----------------------
# 温度传感器--------------------------------------------------------------------
ds_pin = Pin(14)
ds_sensor = ds18x20.DS18X20(onewire.OneWire(ds_pin))
# 创建光敏传感器------------------------------------------------------------------
# 模拟量
ps2_y = ADC(Pin(33))
ps2_y.atten(ADC.ATTN_11DB) # 这里配置测量量程为3.3V
# 数字量
p15 = Pin(15, Pin.IN)
def led():
while True:
if ked_state == 1:
led_pin.value(1)
elif ked_state == 2:
for _ in range(8):
led_pin.value(1)
time.sleep(0.1)
led_pin.value(0)
time.sleep(0.1)
elif ked_state == 3:
led_pin.value(1)
time.sleep(0.3)
led_pin.value(0)
time.sleep(0.3)
else:
led_pin.value(1)
time.sleep(0.5)
led_pin.value(0)
time.sleep(0.5)
def ledstate(state=True):
global ked_state
ked_state = state
def sub_cb(topic, msg):
global relay_state
try:
# 尝试将消息解析为 JSON 对象
decoded_msg = ujson.loads(msg.decode("utf-8"))
# 确保解析后的消息是一个字典
if not isinstance(decoded_msg, dict):
raise TypeError("消息不是字典类型")
# 提取消息字段
port = decoded_msg.get("port")
msg_type = decoded_msg.get("type")
uid = decoded_msg.get("uid")
state = decoded_msg.get("state")
# 判断是否是自己的消息
if uid == client_id:
# 判断是否为指令类型
if port == 2:
# 2类型为状态上传接口
if msg_type == 1:
if state == 0:
p12.value(0)
relay_state = 0
elif state == 1:
p12.value(1)
relay_state = 1
except (ValueError, KeyError, TypeError) as e:
print(f"消息处理错误: {e}")
def connect_mqtt():
global client_id, c,mqurl,port
uid = machine.unique_id()
uid_str = ":".join("{:02X}".format(x) for x in uid)
print("UID:", uid_str)
client_id = uid_str
c = MQTTClient(client_id, mqurl,port=port)
c.set_callback(sub_cb)
# 连接到MQTT服务器的循环
while True:
try:
c.connect()
ledstate(1) # 连接成功后设置 LED 状态
print("连接成功")
break # 连接成功后退出循环
except OSError as e:
ledstate(4) # 连接失败时设置 LED 状态
print(f"连接失败: {e}")
time.sleep(5) # 等待一段时间再重试
# 订阅主题的循环
while True:
try:
c.subscribe(subject)
ledstate(1) # 订阅成功后设置 LED 状态
print("订阅成功")
break # 订阅成功后退出循环
except OSError as e:
ledstate(4) # 订阅失败时设置 LED 状态
print(f"订阅失败: {e}")
time.sleep(5) # 等待一段时间再重试
def mqtt():
global c
connect_mqtt()
i = 0
while True:
try:
light = p15.value()
roms = ds_sensor.scan()
ds_sensor.convert_temp()
wds = 0
for rom in roms:
temp = ds_sensor.read_temp(rom)
if isinstance(temp, float):
temp = round(temp, 2)
wds = temp
reply_msg = {
"type": 1,
"uid": client_id,
"state": relay_state, # 读取继电器状态
"temperature": wds,
"night": light,
"port": 1
}
c.publish(subject, ujson.dumps(reply_msg), qos=0)
time.sleep(0.5)
except OSError as e:
print('上传云端错误:', e)
ledstate(4) # 上传云端失败时设置 LED 状态
connect_mqtt() # 重新连接 MQTT 服务器
continue # 继续循环,不退出
try:
c.check_msg()
except OSError as e:
print('消息检查错误:', e)
ledstate(4) # 消息检查失败时设置 LED 状态
connect_mqtt() # 重新连接 MQTT 服务器
continue # 继续循环,不退出
time.sleep(0.5)
i += 1
if i >= 5:
i = 0
try:
c.ping()
except OSError as e:
print('Ping 错误:', e)
ledstate(4) # Ping 错误时设置 LED 状态
connect_mqtt() # 重新连接 MQTT 服务器
continue # 继续循环,不退出
if name == '__main__':
mqtt()
umqttsimple.py
import usocket as socket
import ustruct as struct
from ubinascii import hexlify
class MQTTException(Exception):
pass
class MQTTClient:
def __init__(
self,
client_id,
server,
port=0,
user=None,
password=None,
keepalive=0,
ssl=False,
ssl_params={},
):
if port == 0:
port = 8883 if ssl else 1883
self.client_id = client_id
self.sock = None
self.server = server
self.port = port
self.ssl = ssl
self.ssl_params = ssl_params
self.pid = 0
self.cb = None
self.user = user
self.pswd = password
self.keepalive = keepalive
self.lw_topic = None
self.lw_msg = None
self.lw_qos = 0
self.lw_retain = False
def _send_str(self, s):
self.sock.write(struct.pack("!H", len(s)))
self.sock.write(s)
def _recv_len(self):
n = 0
sh = 0
while 1:
b = self.sock.read(1)[0]
n |= (b & 0x7F) << sh
if not b & 0x80:
return n
sh += 7
def set_callback(self, f):
self.cb = f
def set_last_will(self, topic, msg, retain=False, qos=0):
assert 0 <= qos <= 2
assert topic
self.lw_topic = topic
self.lw_msg = msg
self.lw_qos = qos
self.lw_retain = retain
def connect(self, clean_session=True):
self.sock = socket.socket()
addr = socket.getaddrinfo(self.server, self.port)[0][-1]
self.sock.connect(addr)
if self.ssl:
import ussl
self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
premsg = bytearray(b"\x10\0\0\0\0\0")
msg = bytearray(b"\x04MQTT\x04\x02\0\0")
sz = 10 + 2 + len(self.client_id)
msg[6] = clean_session << 1
if self.user is not None:
sz += 2 + len(self.user) + 2 + len(self.pswd)
msg[6] |= 0xC0
if self.keepalive:
assert self.keepalive < 65536
msg[7] |= self.keepalive >> 8
msg[8] |= self.keepalive & 0x00FF
if self.lw_topic:
sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
msg[6] |= self.lw_retain << 5
i = 1
while sz > 0x7F:
premsg[i] = (sz & 0x7F) | 0x80
sz >>= 7
i += 1
premsg[i] = sz
self.sock.write(premsg, i + 2)
self.sock.write(msg)
# print(hex(len(msg)), hexlify(msg, ":"))
self._send_str(self.client_id)
if self.lw_topic:
self._send_str(self.lw_topic)
self._send_str(self.lw_msg)
if self.user is not None:
self._send_str(self.user)
self._send_str(self.pswd)
resp = self.sock.read(4)
assert resp[0] == 0x20 and resp[1] == 0x02
if resp[3] != 0:
raise MQTTException(resp[3])
return resp[2] & 1
def disconnect(self):
self.sock.write(b"\xe0\0")
self.sock.close()
def ping(self):
self.sock.write(b"\xc0\0")
def publish(self, topic, msg, retain=False, qos=0):
pkt = bytearray(b"\x30\0\0\0")
pkt[0] |= qos << 1 | retain
sz = 2 + len(topic) + len(msg)
if qos > 0:
sz += 2
assert sz < 2097152
i = 1
while sz > 0x7F:
pkt[i] = (sz & 0x7F) | 0x80
sz >>= 7
i += 1
pkt[i] = sz
# print(hex(len(pkt)), hexlify(pkt, ":"))
self.sock.write(pkt, i + 1)
self._send_str(topic)
if qos > 0:
self.pid += 1
pid = self.pid
struct.pack_into("!H", pkt, 0, pid)
self.sock.write(pkt, 2)
self.sock.write(msg)
if qos == 1:
while 1:
op = self.wait_msg()
if op == 0x40:
sz = self.sock.read(1)
assert sz == b"\x02"
rcv_pid = self.sock.read(2)
rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
if pid == rcv_pid:
return
elif qos == 2:
assert 0
def subscribe(self, topic, qos=0):
assert self.cb is not None, "Subscribe callback is not set"
pkt = bytearray(b"\x82\0\0\0")
self.pid += 1
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
# print(hex(len(pkt)), hexlify(pkt, ":"))
self.sock.write(pkt)
self._send_str(topic)
self.sock.write(qos.to_bytes(1, "little"))
while 1:
op = self.wait_msg()
if op == 0x90:
resp = self.sock.read(4)
# print(resp)
assert resp[1] == pkt[2] and resp[2] == pkt[3]
if resp[3] == 0x80:
raise MQTTException(resp[3])
return
# Wait for a single incoming MQTT message and process it.
# Subscribed messages are delivered to a callback previously
# set by .set_callback() method. Other (internal) MQTT
# messages processed internally.
def wait_msg(self):
res = self.sock.read(1)
self.sock.setblocking(True)
if res is None:
return None
if res == b"":
raise OSError(-1)
if res == b"\xd0": # PINGRESP
sz = self.sock.read(1)[0]
assert sz == 0
return None
op = res[0]
if op & 0xF0 != 0x30:
return op
sz = self._recv_len()
topic_len = self.sock.read(2)
topic_len = (topic_len[0] << 8) | topic_len[1]
topic = self.sock.read(topic_len)
sz -= topic_len + 2
if op & 6:
pid = self.sock.read(2)
pid = pid[0] << 8 | pid[1]
sz -= 2
msg = self.sock.read(sz)
self.cb(topic, msg)
if op & 6 == 2:
pkt = bytearray(b"\x40\x02\0\0")
struct.pack_into("!H", pkt, 2, pid)
self.sock.write(pkt)
elif op & 6 == 4:
assert 0
# Checks whether a pending message from server is available.
# If not, returns immediately with None. Otherwise, does
# the same processing as wait_msg.
def check_msg(self):
self.sock.setblocking(False)
return self.wait_msg()