import ujson
import usocket
import ubinascii
import utime
import _thread
# 配置WebSocket地址和端口
SERVER = ""
PORT = 80 # 根据实际情况可能需要修改
PATH = "/wl/wss"
#
import ujson
import usocket
import ubinascii
import utime
import _thread
# 配置WebSocket地址和端口
SERVER = ""
PORT = 80 # 根据实际情况可能需要修改
PATH = "/wl/wss"
# 全局变量:WebSocket连接
ws = None
# 函数:创建WebSocket连接
def create_connection():
global ws # 使用全局变量
addr_info = usocket.getaddrinfo(SERVER, PORT)[0]
s = usocket.socket(addr_info[0], usocket.SOCK_STREAM)
s.connect(addr_info[-1])
s.send(b"GET " + PATH + " HTTP/1.1\r\nHost: " + SERVER.encode() + b"\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==\r\nSec-WebSocket-Version: 13\r\n\r\n")
ws = s # 将全局变量设置为WebSocket连接
return s
# 函数:发送数据
def send_data(socket, data):
# 将数据转换为JSON字符串
json_data = ujson.dumps(data)
# 构建WebSocket帧来发送数据,包括设置MASK位
frame = bytearray()
frame.append(0x81) # FIN = 1, RSV1/2/3 = 0, Opcode = 1 (Text)
payload = json_data.encode()
payload_length = len(payload)
if payload_length <= 125:
frame.append(payload_length | 0x80) # 设置MASK位为1
elif 126 <= payload_length <= 65535:
frame.append(126 | 0x80) # 设置MASK位为1
frame.extend(payload_length.to_bytes(2, 'big'))
else:
frame.append(127 | 0x80) # 设置MASK位为1
frame.extend(payload_length.to_bytes(8, 'big'))
# 添加MASK位及掩码值
mask = b'\x01\x02\x03\x04' # 4个字节的掩码值
frame.extend(mask)
# 对数据应用掩码
masked_payload = bytearray(payload)
for i in range(payload_length):
masked_payload[i] ^= mask[i % 4]
frame.extend(masked_payload)
socket.send(frame)
# 函数:接收数据
def receive_data(socket):
while True:
try:
# 读取响应
response = socket.recv(512)
if not response:
print("No data received. Connection might be closed.")
break # 退出循环,如果没有接收到数据
# 找到JSON数据的开始位置
start = response.find(b'\x81')
if start != -1:
# 提取有效载荷长度
length = response[start+1] & 127
# 根据长度提取JSON字符串,跳过2字节的基础帧头部,加上长度得到JSON数据的起点
json_start = start + 2
json_data = response[json_start:json_start+length]
# 打印JSON字符串
print("Received:", json_data.decode())
else:
print("No JSON data found")
except Exception as e:
print("Error occurred:", str(e))
break # 发生错误时退出循环
# 主函数
def start_websocket():
global ws # 使用全局变量
while True:
try:
# 创建WebSocket连接
create_connection()
# 发送数据
send_data(ws, {"name": "我是设备"})
# 转换数据为JSON字符串
# 启动定时任务线程
interval_seconds = 60 # 定时任务的时间间隔,以秒为单位
_thread.start_new_thread(timed_task, (interval_seconds,))
# 接收并打印响应
receive_data(ws)
except Exception as e:
print("WebSocket connection error:", str(e))
# 重新连接之前等待一段时间
utime.sleep(10) # 例如,每隔10秒重新连接
# 函数:接受WebSocket连接
def accept_websocket(data):
global ws # 使用全局变量
if ws is not None:
send_data(ws, data)
# 定时任务函数
def timed_task(interval_seconds):
while True:
utime.sleep(interval_seconds)
# 在此处调用定时任务要执行的函数
accept_websocket({"name": "定时任务触发"}) # 调用 accept_websocket 函数,传递数据