MicroPython+Python ESP32板子使用wss的(例子


发布于 2024-11-19 / 19 阅读 / 0 评论 /
import ujson import usocket import ubinascii import utime import _thread # 配置WebSocket地址和端口 SERVER = "" PORT = 80 # 根据实际情况可能需要修改 PATH = "/wl/wss" #
import ujson
import usocket
import ubinascii
import utime
import _thread

# 配置WebSocket地址和端口
SERVER = ""
PORT = 80  # 根据实际情况可能需要修改
PATH = "/wl/wss"

# 全局变量:WebSocket连接
ws = None

# 函数:创建WebSocket连接
def create_connection():
    global ws  # 使用全局变量
    addr_info = usocket.getaddrinfo(SERVER, PORT)[0]
    s = usocket.socket(addr_info[0], usocket.SOCK_STREAM)
    s.connect(addr_info[-1])
    s.send(b"GET " + PATH + " HTTP/1.1\r\nHost: " + SERVER.encode() + b"\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==\r\nSec-WebSocket-Version: 13\r\n\r\n")
    ws = s  # 将全局变量设置为WebSocket连接
    return s

# 函数:发送数据
def send_data(socket, data):
    # 将数据转换为JSON字符串
    json_data = ujson.dumps(data)
    # 构建WebSocket帧来发送数据,包括设置MASK位
    frame = bytearray()
    frame.append(0x81)  # FIN = 1, RSV1/2/3 = 0, Opcode = 1 (Text)
    payload = json_data.encode()
    payload_length = len(payload)

    if payload_length <= 125:
        frame.append(payload_length | 0x80)  # 设置MASK位为1
    elif 126 <= payload_length <= 65535:
        frame.append(126 | 0x80)  # 设置MASK位为1
        frame.extend(payload_length.to_bytes(2, 'big'))
    else:
        frame.append(127 | 0x80)  # 设置MASK位为1
        frame.extend(payload_length.to_bytes(8, 'big'))

    # 添加MASK位及掩码值
    mask = b'\x01\x02\x03\x04'  # 4个字节的掩码值
    frame.extend(mask)

    # 对数据应用掩码
    masked_payload = bytearray(payload)
    for i in range(payload_length):
        masked_payload[i] ^= mask[i % 4]

    frame.extend(masked_payload)

    socket.send(frame)

# 函数:接收数据
def receive_data(socket):
    while True:
        try:
            # 读取响应
            response = socket.recv(512)
            if not response:
                print("No data received. Connection might be closed.")
                break  # 退出循环,如果没有接收到数据

            # 找到JSON数据的开始位置
            start = response.find(b'\x81')
            if start != -1:
                # 提取有效载荷长度
                length = response[start+1] & 127
                # 根据长度提取JSON字符串,跳过2字节的基础帧头部,加上长度得到JSON数据的起点
                json_start = start + 2
                json_data = response[json_start:json_start+length]
                # 打印JSON字符串
                print("Received:", json_data.decode())
            else:
                print("No JSON data found")
        except Exception as e:
            print("Error occurred:", str(e))
            break  # 发生错误时退出循环

# 主函数
def start_websocket():
    global ws  # 使用全局变量
    while True:
        try:
            # 创建WebSocket连接
            create_connection()
            # 发送数据
            send_data(ws, {"name": "我是设备"})
             # 转换数据为JSON字符串
          # 启动定时任务线程
            interval_seconds = 60  # 定时任务的时间间隔,以秒为单位
            _thread.start_new_thread(timed_task, (interval_seconds,))

            # 接收并打印响应
            receive_data(ws)
        except Exception as e:
            print("WebSocket connection error:", str(e))

        # 重新连接之前等待一段时间
        utime.sleep(10)  # 例如,每隔10秒重新连接

# 函数:接受WebSocket连接
def accept_websocket(data):
    global ws  # 使用全局变量
    if ws is not None:
        send_data(ws, data)

# 定时任务函数
def timed_task(interval_seconds):
    while True:
        utime.sleep(interval_seconds)
        # 在此处调用定时任务要执行的函数
        accept_websocket({"name": "定时任务触发"})  # 调用 accept_websocket 函数,传递数据



是否对你有帮助?

评论