为什么同样的代码可以控制sg90 但是控制不了180度 的MG996R

mg996R 舵机反向转动 比顺时针旋转需要更多的电量

垃圾的micro数据线 提供不了, 可以传输数据的正常的数据线 才能提供足够的电量

mqtt esp32 micropython


import time
import network
from machine import Pin
from servo import Servo  
from lib.mqqt_as import MQTTClient, config
import uasyncio as asyncio 
config['ssid'] = 'honor' 
config['wifi_pw'] = '87654321'
config['server'] = 'mqt.kepao.icu' 
# 监听消息接收
def msg_call(topic, msg, retained):
    print("接收到消息")
    topic_str=topic.decode()#订阅主题的名称
    msg_str=msg.decode()#订阅 的消息
    print("主题的名称",topic_str)
    print("消息",msg_str)
    handle_message(msg_str)
# 设置订阅主题
async def conn_han(client):
    await client.subscribe('web', 1)
    await client.subscribe('door', 1)

config['subs_cb'] = msg_call
config['connect_coro'] = conn_han

async def main(client):
    await client.connect()
    n = 0
    while True:
        await asyncio.sleep(50)
        print('sta', n)
        # If WiFi is down the following will pause for the duration.
        await client.publish('sta', '{}'.format(n), qos = 1)
        n += 1

def handle_message(message):
    if message == "open":
        print("网页端点击开门 esp32启动舵机")
        # 在这里执行开门操作
        my_servo.write_angle(90)  # 舵机转动到90度位置
        await client.publish('sta', 'opendoor success')
    elif message == "close":
        print("网页端点击关门 esp32回正舵机")
        # 在这里执行关门操作
        my_servo.write_angle(0)  # 舵机转动到0度位置
        await client.publish('sta', 'closedoor success')
    else:
        print("接受的指令", message)
        print("esp32无操作")

# 定义舵机控制对象
my_servo = Servo(Pin(13), max_us=2500) 
# 程序入口
if __name__ == '__main__':
#   wifi_connect('BlackSheep','87654321')  
    MQTTClient.DEBUG = True  # Optional: print diagnostic messages
    client = MQTTClient(config)
    try:
        asyncio.run(main(client))
    finally:
        client.close()  # Prevent LmacRxBlk:1 errors


    if msg_str == "open":
        print("网页端点击开门 esp32启动舵机")
        my_servo.write_angle(0)
        time.sleep(0.5)
        my_servo.write_angle(45)
        time.sleep(0.5)
        my_servo.write_angle(90)
        time.sleep(0.5)
        my_servo.write_angle(135)
        time.sleep(0.5)
        my_servo.write_angle(180)
        time.sleep(0.5)
        # 在这里执行开门操作
        my_servo.write_angle(90)  # 舵机转动到90度位置
        await client.publish('sta', 'opendoor success')
    elif msg_str == "close":
        print("网页端点击关门 esp32回正舵机")
        # 在这里执行关门操作
        my_servo.write_angle(0)  # 舵机转动到0度位置
        await client.publish('sta', 'closedoor success') 
        time.sleep(0.5)
        my_servo.write_angle(45)
        time.sleep(0.5)
        my_servo.write_angle(90)
        time.sleep(0.5)
        my_servo.write_angle(135)
        time.sleep(0.5)
        my_servo.write_angle(180)
        time.sleep(0.5)
    else:
        print("接受的指令", message)
        print("esp32无操作")

[]

360不等待版本


import time
import network
from machine import Pin
from servo import Servo  
from lib.mqqt_as import MQTTClient, config
import uasyncio as asyncio 
from servo import Servo 
config['ssid'] = 'BlackSheep' 
config['wifi_pw'] = '87654321'
config['server'] = 'mqt.kepao.icu' 
    
door = 0
# 监听消息接收
def msg_call(topic, msg, retained):
    global door  # 声明door是全局变量
    print("接收到消息")
    topic_str=topic.decode()#订阅主题的名称
    msg_str=msg.decode()#订阅 的消息
    print("主题的名称",topic_str)
    print("消息",msg_str)
    if msg_str == "open":
        print("网页端点击开门 esp32启动舵机")
#         my_servo.write_angle(0);
        my_servo.write_angle(0); 
        asyncio.sleep(5)
        my_servo.write_angle(360);
        door=1
        client.publish('sta',door)
    elif msg_str == "close":
        print("网页端点击关门 esp32回正舵机")
#         my_servo.write_angle(180);  # 舵机转动到0度位置
        my_servo.write_angle(180);
        asyncio.sleep(5)
        my_servo.write_angle(360);
        door=0 
    else: 
        print("接受的指令", msg_str)
        print(door) 
        print("esp32无操作") 
# 设置订阅主题
async def conn_han(client):
    await client.subscribe('web', 1)
    await client.subscribe('door', 1)

config['subs_cb'] = msg_call
config['connect_coro'] = conn_han

async def main(client):
    await client.connect()
    n = 0
    while True:
#         await asyncio.sleep(1)
        print('sta', n,door)
        # If WiFi is down the following will pause for the duration.
        await client.publish('sta', '{}'.format(door), qos = 1)
        n += 1
        if n>10:
            n=0
# 定义舵机控制对象
my_servo = Servo(Pin(13), max_us=2500) 
# 程序入口
if __name__ == '__main__': 
    MQTTClient.DEBUG = True
    client = MQTTClient(config)
    try:
        asyncio.run(main(client))
    finally:
        client.close()  # Prevent LmacRxBlk:1 errors



180度


import time
import network
from machine import Pin
from servo import Servo  
from lib.mqqt_as import MQTTClient, config
import uasyncio as asyncio 
from servo import Servo 
config['ssid'] = 'BlackSheep' 
config['wifi_pw'] = '87654321'
config['server'] = 'mqt.kepao.icu' 
    
door = 0
# 监听消息接收
def msg_call(topic, msg, retained):
    global door  # 声明door是全局变量
    print("接收到消息")
    topic_str=topic.decode()#订阅主题的名称
    msg_str=msg.decode()#订阅 的消息
    print("主题的名称",topic_str)
    print("消息",msg_str)
    if msg_str == "open":
        print("网页端点击开门 esp32启动舵机")
        my_servo.write_angle(0);
        door=1
        client.publish('sta',door)
    elif msg_str == "close":
        print("网页端点击关门 esp32回正舵机")
        my_servo.write_angle(180);  # 舵机转动到0度位置
        door=0 
    else: 
        print("接受的指令", msg_str)
        print(door) 
        print("esp32无操作") 
# 设置订阅主题
async def conn_han(client):
    await client.subscribe('web', 1)
    await client.subscribe('door', 1)

config['subs_cb'] = msg_call
config['connect_coro'] = conn_han

async def main(client):
    await client.connect()
    n = 0
    while True:
#         await asyncio.sleep(1)
        print('sta', n,door)
        # If WiFi is down the following will pause for the duration.
        await client.publish('sta', '{}'.format(door), qos = 1)
        n += 1
        if n>10:
            n=0
# 定义舵机控制对象
my_servo = Servo(Pin(13), max_us=2500) 
# 程序入口
if __name__ == '__main__': 
    MQTTClient.DEBUG = True
    client = MQTTClient(config)
    try:
        asyncio.run(main(client))
    finally:
        client.close()  # Prevent LmacRxBlk:1 errors


180 通过订阅sta enter来控制esp32 休眠运行状态


import time
import network
from machine import Pin
from servo import Servo  
from lib.mqqt_as import MQTTClient, config
import uasyncio as asyncio 
from servo import Servo 
config['ssid'] = 'BlackSheep' 
config['wifi_pw'] = '87654321'
config['server'] = 'mqt.kepao.icu' 
    
door = 0
# 监听消息接收
def msg_call(topic, msg, retained):
    global door  # 声明door是全局变量
    print("接收到消息")
    topic_str=topic.decode()#订阅主题的名称
    msg_str=msg.decode()#订阅 的消息
    print("主题的名称",topic_str)
    print("消息",msg_str)
    if msg_str == "open":
        print("网页端点击开门 esp32启动舵机")
#         my_servo.write_angle(0);
        my_servo.write_angle(0); 
        asyncio.sleep(5)
        my_servo.write_angle(360);
        door=1
        client.publish('sta',door)
    elif msg_str == "close":
        print("网页端点击关门 esp32回正舵机")
#         my_servo.write_angle(180);  # 舵机转动到0度位置
        my_servo.write_angle(180);
        asyncio.sleep(5)
        my_servo.write_angle(360);
        door=0 
    else: 
        print("接受的指令", msg_str)
        print(door) 
        print("esp32无操作") 
# 设置订阅主题
async def conn_han(client):
    await client.subscribe('web', 1)
    await client.subscribe('door', 1)

config['subs_cb'] = msg_call
config['connect_coro'] = conn_han

async def main(client):
    await client.connect()
    n = 0
    while True:
#         await asyncio.sleep(1)
        print('sta', n,door)
        # If WiFi is down the following will pause for the duration.
        await client.publish('sta', '{}'.format(door), qos = 1)
        n += 1
        if n>10:
            n=0
# 定义舵机控制对象
my_servo = Servo(Pin(13), max_us=2500) 
# 程序入口
if __name__ == '__main__': 
    MQTTClient.DEBUG = True
    client = MQTTClient(config)
    try:
        asyncio.run(main(client))
    finally:
        client.close()  # Prevent LmacRxBlk:1 errors



360 订阅状态


import time
import network
from machine import Pin
from servo import Servo  
from lib.mqqt_as import MQTTClient, config
import uasyncio as asyncio 
from servo import Servo 
config['ssid'] = 'BlackSheep' 
config['wifi_pw'] = '87654321'
config['server'] = 'mqt.kepao.icu' 
    
door = 0
enter = 0
# 监听消息接收
def msg_call(topic, msg, retained):
    global door  # 声明door是全局变量
    global enter
    print("接收到消息")
    topic_str=topic.decode()#订阅主题的名称
    msg_str=msg.decode()#订阅 的消息
    print("主题的名称",topic_str)
    print("消息",msg_str)
    if msg_str == "open":
        print("网页端点击开门 esp32启动舵机")
        my_servo.write_angle(0); 
        asyncio.sleep(5)
        my_servo.write_angle(360);
        door=1
    elif msg_str == "close":
        print("网页端点击关门 esp32回正舵机")
        my_servo.write_angle(180);
        asyncio.sleep(5)
        my_servo.write_angle(360);
        door=0  
    elif msg_str == "enterd":
        print("接收进入网页 快速传递状态")
        enter=10
    else: 
        print("接受的指令", msg_str)
        client.publish('sta',door)
        print("esp32无操作") 
# 设置订阅主题
async def conn_han(client):
    await client.subscribe('web', 1)
    await client.subscribe('door', 1)

config['subs_cb'] = msg_call
config['connect_coro'] = conn_han

async def main(client):
    await client.connect()
    global enter
    n = 0
    while True:
        if enter>1:
            print('进入网页 快速传递状态')
            await client.publish('staesp',"fast")
#             await asyncio.sleep(1)
            print('sta', n,door)
            # If WiFi is down the following will pause for the duration.
            await client.publish('sta', '{}'.format(door), qos = 1)
            enter -= 1
            print(enter)
            n += 2
            if n>10:
                n=0
        else:
            print('网页离线 esp32进入慢休眠')
            await client.publish('staesp',"sleep")
            await asyncio.sleep(5)
            print('sta', n,door)
            # If WiFi is down the following will pause for the duration.
            await client.publish('sta', '{}'.format(door), qos = 1)
            n += 1
            if n>10:
                n=0
# 定义舵机控制对象
my_servo = Servo(Pin(13), max_us=2500) 
# 程序入口
if __name__ == '__main__': 
    MQTTClient.DEBUG = True
    client = MQTTClient(config)
    try:
        asyncio.run(main(client))
    finally:
        client.close()  # Prevent LmacRxBlk:1 errors


多线程加入内网 原生不能异步多线程

HTTP服务器部分被放在一个名为start_server的函数中,并且使用_thread.start_new_thread来启动一个新的线程来运行HTTP服务器。这样,当程序执行到main()时,HTTP服务器将会在单独的线程中运行,而你可以在main()函数中执行其他需要的代码。

import usocket as socket
import _thread
import time
import network
from machine import Pin
from servo import Servo  
from lib.mqqt_as import MQTTClient, config
import uasyncio as asyncio 
from servo import Servo 
config['ssid'] = 'BlackSheep' 
config['wifi_pw'] = '87654321'
config['server'] = 'mqt.kepao.icu'  
door = 0
enter = 0
def handle_request(client_socket):
    # 处理HTTP请求
    request_data = client_socket.recv(1024)
    request_lines = request_data.split(b"\r\n")
    if len(request_lines) > 0:
        request_line = request_lines[0].decode("utf-8")
        method, path, protocol = request_line.split(" ")
        if method == "GET" and path == "/":
            with open("public/index.html", "rb") as f:
                response_body = f.read()
            response = (
                "HTTP/1.1 200 OK\r\n"
                "Content-Type: text/html\r\n"
                "Content-Length: {}\r\n"
                "\r\n"
            ).format(len(response_body))
            client_socket.send(response.encode("utf-8"))
            client_socket.send(response_body)
        elif path == "/xuanzhuan1":
            # 调用 xuanzhuan1 方法
            xuanzhuan1()
            response = "HTTP/1.1 200 OK\r\n\r\n"
            client_socket.send(response.encode("utf-8"))
        elif path == "/xuanzhuan2":
            # 调用 xuanzhuan2 方法
            xuanzhuan2()
            response = "HTTP/1.1 200 OK\r\n\r\n"
            client_socket.send(response.encode("utf-8"))

    client_socket.close()

def start_server():
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind(('0.0.0.0', 80))
    server_socket.listen(5)
    print("Server is listening on port 80...") 
    while True:
        client_socket, addr = server_socket.accept()
        print("Connection from:", addr)
        _thread.start_new_thread(handle_request, (client_socket,))
def xuanzhuan1():
    global door
    # 实现 xuanzhuan1 方法的代码
    print("内网点击开门 esp32启动舵机")
    my_servo.write_angle(0);  
    door=1  
def xuanzhuan2():
    global door
    print("内网点击关门 esp32回正舵机")
    my_servo.write_angle(180); 
    door=0  
# 监听消息接收
def msg_call(topic, msg, retained):
    global door  # 声明door是全局变量
    global enter
    print("接收到消息")
    topic_str=topic.decode()#订阅主题的名称
    msg_str=msg.decode()#订阅 的消息
    print("主题的名称",topic_str)
    print("消息",msg_str)
    if msg_str == "open":
        print("网页端点击开门 esp32启动舵机")
        my_servo.write_angle(0);  
        door=1
    elif msg_str == "close":
        print("网页端点击关门 esp32回正舵机")
        my_servo.write_angle(180); 
        door=0  
    elif msg_str == "enterd":
        print("接收进入网页 快速传递状态")
        enter=10
    else: 
        print("接受的指令", msg_str)
        client.publish('sta',door)
        print("esp32无操作") 
# 设置订阅主题
async def conn_han(client):
    await client.subscribe('web', 1)
    await client.subscribe('door', 1)

config['subs_cb'] = msg_call
config['connect_coro'] = conn_han

async def main(client):
    await client.connect()
    global enter
    _thread.start_new_thread(start_server, ())
    n = 0
    while True:
        if enter>1:
            print('进入网页 快速传递状态')
            await client.publish('staesp',"fast")
#             await asyncio.sleep(1)
            print('sta', n,door)
            # If WiFi is down the following will pause for the duration.
            await client.publish('sta', '{}'.format(door), qos = 1)
            enter -= 1
            print(enter)
            n += 2
            if n>10:
                n=0
        else:
            print('网页离线 esp32进入慢休眠')
            await client.publish('staesp',"sleep")
            await asyncio.sleep(5)
            print('sta', n,door)
            # If WiFi is down the following will pause for the duration.
            await client.publish('sta', '{}'.format(door), qos = 1)
            n += 1
            if n>10:
                n=0
# 定义舵机控制对象
my_servo = Servo(Pin(13), max_us=2500) 
# 程序入口
if __name__ == '__main__': 
    MQTTClient.DEBUG = True
    client = MQTTClient(config) 
    try:
        asyncio.run(main(client))
    finally:
        client.close()  # Prevent LmacRxBlk:1 errors



microdot 异步运行搭建的web框架

from lib.microdot import Microdot,send_file,Request
from lib.microdot_websocket import with_websocket
import time
import _thread
import network
from machine import Pin
from servo import Servo  
from lib.mqqt_as import MQTTClient, config
import uasyncio as asyncio 
from servo import Servo 
config['ssid'] = 'BlackSheep' 
config['wifi_pw'] = '87654321'
config['server'] = 'mqt.kepao.icu'  
door = 0
enter = 0
app = Microdot()
# 返回一个网页
@app.route('/')
def index(request):
    return send_file('public/index.html')

# 设置一个get请求 如果
@app.get('/on')
def index(request):
    global door
    # 实现 xuanzhuan1 方法的代码
    print("内网点击开门 esp32启动舵机")
    my_servo.write_angle(0);  
    door=1  
    # 如果收到get请求on就开灯
    print("Open")
    return "开灯了"
@app.get('/off')
def index(request):
    global door
    print("内网点击关门 esp32回正舵机")
    my_servo.write_angle(180); 
    door=0  
    # 如果收到get请求off就关灯
    print("Close")
    return "关灯了" 
def xuanzhuan1():
    global door
    # 实现 xuanzhuan1 方法的代码
    print("内网点击开门 esp32启动舵机")
    my_servo.write_angle(0);  
    door=1  
def xuanzhuan2():
    global door
    print("内网点击关门 esp32回正舵机")
    my_servo.write_angle(180); 
    door=0  
# 监听消息接收
def msg_call(topic, msg, retained):
    global door  # 声明door是全局变量
    global enter
    print("接收到消息")
    topic_str=topic.decode()#订阅主题的名称
    msg_str=msg.decode()#订阅 的消息
    print("主题的名称",topic_str)
    print("消息",msg_str)
    if msg_str == "open":
        print("网页端点击开门 esp32启动舵机")
        my_servo.write_angle(0);  
        door=1
    elif msg_str == "close":
        print("网页端点击关门 esp32回正舵机")
        my_servo.write_angle(180); 
        door=0  
    elif msg_str == "enterd":
        print("接收进入网页 快速传递状态")
        enter=10
    else: 
        print("接受的指令", msg_str)
        client.publish('sta',door)
        print("esp32无操作") 
# 设置订阅主题
async def conn_han(client):
    await client.subscribe('web', 1)
    await client.subscribe('door', 1)

config['subs_cb'] = msg_call
config['connect_coro'] = conn_han
def run_microdot_app():
    app.run(host='0.0.0.0', port=80, debug=True, ssl=None)
async def main(client):
    await client.connect()
    global enter
    _thread.start_new_thread(run_microdot_app, ())
    n = 0
    while True:
        if enter>1:
#             print('进入网页 快速传递状态')
            await client.publish('staesp',"fast")
#             await asyncio.sleep(1)
            print('sta', n,door)
            # If WiFi is down the following will pause for the duration.
            await client.publish('sta', '{}'.format(door), qos = 1)
            enter -= 1
            print(enter)
            n += 2
            if n>10:
                n=0
        else:
            print('网页离线 esp32进入慢休眠')
            await client.publish('staesp',"sleep")
            await asyncio.sleep(5)
            print('sta', n,door)
            # If WiFi is down the following will pause for the duration.
            await client.publish('sta', '{}'.format(door), qos = 1)
            n += 1
            if n>10:
                n=0
        

# 程序入口
if __name__ == '__main__':
    # 定义舵机控制对象
    my_servo = Servo(Pin(13), max_us=2500) 
    MQTTClient.DEBUG = True
    client = MQTTClient(config) 
    try:
        asyncio.run(main(client)) 
    finally:
        client.close()  # Prevent LmacRxBlk:1 errors



MG996R micropython 控制 360舵机 thonmy

控制sg90 正常但是控制mg996r 需求注意停断

转另一个方向时,需要先停后进

0正传 180逆 90停

import time
from machine import Pin
from servo import Servo
# 定义舵机控制对象
my_servo = Servo(Pin(13), max_us=2500)
# 程序入口
if __name__ == '__main__':
    while True:
#         正
        my_servo.write_angle(0)
        time.sleep(1.5)
        
#           //停 回周期
        my_servo.write_angle(90)
        time.sleep(0.1)
#         逆
        my_servo.write_angle(180)
        time.sleep(1.5)
#         //停 回回周期
        my_servo.write_angle(90)
        time.sleep(0.1)  


MG996R micropython 控制 360舵机 thonmy

from lib.microdot import Microdot,send_file,Request
from lib.microdot_websocket import with_websocket
import time
import _thread
import network
from machine import Pin
from servo import Servo  
from lib.mqqt_as import MQTTClient, config
import uasyncio as asyncio 
from servo import Servo 
config['ssid'] = 'BlackSheep' 
config['wifi_pw'] = '87654321'
config['server'] = 'mqt.kepao.icu'  
door = 0
enter = 0
app = Microdot()
# 返回一个网页
@app.route('/')
def index(request):
    return send_file('public/index.html')

# 设置一个get请求 如果
@app.get('/on')
def index(request):
    global door
    # 实现 xuanzhuan1 方法的代码
    print("内网点击开门 esp32启动舵机")
    time.sleep(0.1)
    my_servo.write_angle(90)
    time.sleep(0.1)
    my_servo.write_angle(0); 
    time.sleep(1.2) 
    my_servo.write_angle(90)
    time.sleep(5) 
    my_servo.write_angle(180);
    time.sleep(1.2) 
    my_servo.write_angle(90)
    time.sleep(0.1)
    door=1
    # 如果收到get请求on就开灯
    print("Open")
    return "开灯了"
@app.get('/off')
def index(request):
    global door
    print("内网点击关门 esp32回正舵机")
    time.sleep(0.1)
    my_servo.write_angle(90)
    time.sleep(0.1)
    my_servo.write_angle(180);
    time.sleep(1.2) 
    my_servo.write_angle(90)
    time.sleep(0.1)
    door=0  
    # 如果收到get请求off就关灯
    print("Close")
    return "关灯了" 

# 监听消息接收
def msg_call(topic, msg, retained):
    global door  # 声明door是全局变量
    global enter
    print("接收到消息")
    topic_str=topic.decode()#订阅主题的名称
    msg_str=msg.decode()#订阅 的消息
    print("主题的名称",topic_str)
    print("消息",msg_str)
    if msg_str == "open":
        print("网页端点击开门 esp32启动舵机")
        time.sleep(0.1)
        my_servo.write_angle(90)
        time.sleep(0.1)
        my_servo.write_angle(0); 
        time.sleep(1.2) 
        my_servo.write_angle(90)
        time.sleep(0.1)
        door=1
    elif msg_str == "close":
        print("网页端点击关门 esp32回正舵机")
        time.sleep(0.1)
        my_servo.write_angle(90)
        time.sleep(0.1)
        my_servo.write_angle(180);
        time.sleep(1.2) 
        my_servo.write_angle(90)
        time.sleep(0.1)
        door=0  
    elif msg_str == "enterd":
        print("接收进入网页 快速传递状态")
        enter=50
    else: 
        print("接受的指令", msg_str)
        client.publish('sta',door)
        print("esp32无操作") 
# 设置订阅主题
async def conn_han(client):
    await client.subscribe('web', 1)
    await client.subscribe('door', 1)

config['subs_cb'] = msg_call
config['connect_coro'] = conn_han
def run_microdot_app():
    app.run(host='0.0.0.0', port=80, debug=True, ssl=None)
async def main(client):
    await client.connect()
    global enter
    _thread.start_new_thread(run_microdot_app, ())
    n = 0
    while True:
        if enter>1:
#             print('进入网页 快速传递状态')
            await client.publish('staesp',"fast")
#             await asyncio.sleep(1)
            print('sta', n,door)
            # If WiFi is down the following will pause for the duration.
            await client.publish('sta', '{}'.format(door), qos = 1)
            enter -= 1
            print(enter)
            n += 2
            if n>10:
                n=0
        else:
            print('网页离线 esp32进入慢休眠')
            await client.publish('staesp',"sleep")
            await asyncio.sleep(5)
            print('sta', n,door)
            # If WiFi is down the following will pause for the duration.
            await client.publish('sta', '{}'.format(door), qos = 1)
            n += 1
            if n>10:
                n=0
        

# 程序入口
if __name__ == '__main__':
    # 定义舵机控制对象
    my_servo = Servo(Pin(13), max_us=2500) 
    MQTTClient.DEBUG = True
    client = MQTTClient(config) 
    try:
        asyncio.run(main(client)) 
    finally:
        client.close()  # Prevent LmacRxBlk:1 errors

检测内存+wifi版本1

from lib.microdot import Microdot,send_file,Request
from lib.microdot_websocket import with_websocket
import time
import _thread
import network
from machine import Pin
from servo import Servo  
from lib.mqqt_as import MQTTClient, config
import uasyncio as asyncio 
from servo import Servo
import gc  
config['ssid'] = 'BlackSheep' 
config['wifi_pw'] = '87654321'
config['server'] = 'mqt.kepao.icu'  
door = 0
enter = 0
app = Microdot()
# 返回一个网页
@app.route('/')
def index(request):
    return send_file('public/index.html')

# 设置一个get请求 如果
@app.get('/on')
def index(request):
    global door
    # 实现 xuanzhuan1 方法的代码
    print("内网点击开门 esp32启动舵机")
    time.sleep(0.1)
    my_servo.write_angle(90)
    time.sleep(0.1)
    my_servo.write_angle(0); 
    time.sleep(2) 
    my_servo.write_angle(90)
    time.sleep(0.1)
    door=1
    # 如果收到get请求on就开灯
    print("Open")
    return "开灯了"
@app.get('/off')
def index(request):
    global door
    print("内网点击关门 esp32回正舵机")
    time.sleep(0.1)
    my_servo.write_angle(90)
    time.sleep(0.1)
    my_servo.write_angle(180);
    time.sleep(2) 
    my_servo.write_angle(90)
    time.sleep(0.1)
    door=0  
    # 如果收到get请求off就关灯
    print("Close")
    return "关灯了" 

# 监听消息接收
def msg_call(topic, msg, retained):
    global door  # 声明door是全局变量
    global enter
    print("接收到消息")
    topic_str=topic.decode()#订阅主题的名称
    msg_str=msg.decode()#订阅 的消息
    print("主题的名称",topic_str)
    print("消息",msg_str)
    if msg_str == "open":
        print("网页端点击开门 esp32启动舵机")
        time.sleep(0.1)
        my_servo.write_angle(90)
        time.sleep(0.1)
        my_servo.write_angle(0); 
        time.sleep(2) 
        my_servo.write_angle(90)
        time.sleep(0.1)
        door=1
    elif msg_str == "close":
        print("网页端点击关门 esp32回正舵机")
        time.sleep(0.1)
        my_servo.write_angle(90)
        time.sleep(0.1)
        my_servo.write_angle(180);
        time.sleep(2) 
        my_servo.write_angle(90)
        time.sleep(0.1)
        door=0  
    elif msg_str == "enterd":
        print("接收进入网页 快速传递状态")
        enter=50
    else: 
        print("接受的指令", msg_str)
        client.publish('sta',door)
        print("esp32无操作") 
# 设置订阅主题
async def conn_han(client):
    await client.subscribe('web', 1)
    await client.subscribe('door', 1)

config['subs_cb'] = msg_call
config['connect_coro'] = conn_han
def run_microdot_app():
    app.run(host='0.0.0.0', port=80, debug=True, ssl=None)
async def main(client):
    await client.connect() 
    global enter
    _thread.start_new_thread(run_microdot_app, ())
    n = 0
    while True:
        if client._sta_if.isconnected():
            print("WiFi 已连接") 
        else:
            print("WiFi 未连接")  
            machine.reset()
        # 获取当前使用的内存
        used_memory = gc.mem_alloc()  
        print("当前使用的内存:", used_memory, "bytes") 
         
        if enter>1:
#             print('进入网页 快速传递状态') 
            await client.publish('staesp', '{}'.format(used_memory), qos = 1)
#             await asyncio.sleep(1)
            print('sta', n,door)
            # If WiFi is down the following will pause for the duration.
            await client.publish('sta', '{}'.format(door), qos = 1)
            enter -= 1
            print(enter)
            n += 2
            if n>10:
                n=0
        else:
            print('网页离线 esp32进入慢休眠')
            await client.publish('staesp', '{}'.format(used_memory), qos = 1)
            await asyncio.sleep(1)
            print('sta', n,door)
            # If WiFi is down the following will pause for the duration.
            await client.publish('sta', '{}'.format(door), qos = 1)
            n += 1
            if n>10:
                n=0
        

# 程序入口
if __name__ == '__main__':
    # 定义舵机控制对象
    my_servo = Servo(Pin(13), max_us=2500) 
    MQTTClient.DEBUG = True
    client = MQTTClient(config) 
    try:
        asyncio.run(main(client)) 
    finally:
        client.close()  # Prevent LmacRxBlk:1 errors

不要内网 控制360

import time
import network
from machine import Pin
from servo import Servo  
from lib.mqqt_as import MQTTClient, config
import uasyncio as asyncio 
from servo import Servo 
config['ssid'] = 'BlackSheep' 
config['wifi_pw'] = '87654321'
config['server'] = 'mqt.kepao.icu'  
door = 0
enter = 0
# 监听消息接收
def msg_call(topic, msg, retained):
    global door  # 声明door是全局变量
    global enter
    print("接收到消息")
    topic_str=topic.decode()#订阅主题的名称
    msg_str=msg.decode()#订阅 的消息
    print("主题的名称",topic_str)
    print("消息",msg_str)
    if msg_str == "open":
        print("网页端点击开门 esp32启动舵机")
        time.sleep(0.1)
        my_servo.write_angle(90)
        time.sleep(0.1)
        my_servo.write_angle(0); 
        time.sleep(1.2) 
        my_servo.write_angle(90)
        time.sleep(0.1)
        door=1
    elif msg_str == "close":
        print("网页端点击关门 esp32回正舵机")
        time.sleep(0.1)
        my_servo.write_angle(90)
        time.sleep(0.1)
        my_servo.write_angle(180);
        time.sleep(1.2) 
        my_servo.write_angle(90)
        time.sleep(0.1)
        door=0  
    elif msg_str == "enterd":
        print("接收进入网页 快速传递状态")
        enter=99
    else: 
        print("接受的指令", msg_str)
        client.publish('sta',door)
        print("esp32无操作") 
# 设置订阅主题
async def conn_han(client): 
    await client.subscribe('door', 1)

config['subs_cb'] = msg_call
config['connect_coro'] = conn_han

async def main(client):
    await client.connect()
    global enter
    n = 0
    while True:
        if enter>1:
#             print('进入网页 快速传递状态')
            await client.publish('staesp',"fast")
            await asyncio.sleep(0.1)
            print('sta', n,door)
            # If WiFi is down the following will pause for the duration.
            await client.publish('sta', '{}'.format(door), qos = 1)
            enter -= 1
            print(enter)
            n += 2
            if n>10:
                n=0
        else:
            print('网页离线 esp32进入慢休眠')
            await client.publish('staesp',"sleep")
            await asyncio.sleep(5)
            print('sta', n,door)
            # If WiFi is down the following will pause for the duration.
            await client.publish('sta', '{}'.format(door), qos = 1)
            n += 1
            if n>10:
                n=0
        

# 程序入口
if __name__ == '__main__':
    # 定义舵机控制对象
    my_servo = Servo(Pin(13), max_us=2500) 
    MQTTClient.DEBUG = True
    client = MQTTClient(config) 
    try:
        asyncio.run(main(client)) 
    finally:
        client.close()  # Prevent LmacRxBlk:1 errors

服务器断链 本地网络搭建mqtt

from lib.microdot import Microdot,send_file,Request
from lib.microdot_websocket import with_websocket
import time
import _thread
import network
import webrepl
import machine
from servo import Servo  
from lib.mqqt_as import MQTTClient, config
import uasyncio as asyncio 
from servo import Servo
import gc  
# config['ssid'] = 'BlackSheep'
config['ssid'] = 'honor'
config['wifi_pw'] = '87654321'
# config['server'] = 'mqt.kepao.icu'
config['server'] = '192.168.118.202'
door = 0
enter = 0
app = Microdot()
# 返回一个网页
@app.route('/')
def index(request):
    return send_file('public/index.html')

# 设置一个get请求 如果
@app.get('/on')
def index(request):
    global door
    # 实现 xuanzhuan1 方法的代码
    print("内网点击开门 esp32启动舵机")
    time.sleep(0.1)
    my_servo.write_angle(90)
    time.sleep(0.1)
    my_servo.write_angle(0); 
    time.sleep(2) 
    my_servo.write_angle(90)
    time.sleep(0.1)
    door=1
    # 如果收到get请求on就开灯
    print("Open")
    return "开灯了"
@app.get('/off')
def index(request):
    global door
    print("内网点击关门 esp32回正舵机")
    time.sleep(0.1)
    my_servo.write_angle(90)
    time.sleep(0.1)
    my_servo.write_angle(180);
    time.sleep(2) 
    my_servo.write_angle(90)
    time.sleep(0.1)
    door=0  
    # 如果收到get请求off就关灯
    print("Close")
    return "关灯了" 

# 监听消息接收
def msg_call(topic, msg, retained):
    global door  # 声明door是全局变量
    global enter
    print("接收到消息")
    topic_str=topic.decode()#订阅主题的名称
    msg_str=msg.decode()#订阅 的消息
    print("主题的名称",topic_str)
    print("消息",msg_str)
    if msg_str == "open":
        print("网页端点击开门 esp32启动舵机")
        time.sleep(0.1)
        my_servo.write_angle(90)
        time.sleep(0.1)
        my_servo.write_angle(0); 
        time.sleep(2) 
        my_servo.write_angle(90)
        time.sleep(0.1)
        door=1
    elif msg_str == "close":
        print("网页端点击关门 esp32回正舵机")
        time.sleep(0.1)
        my_servo.write_angle(90)
        time.sleep(0.1)
        my_servo.write_angle(180);
        time.sleep(2) 
        my_servo.write_angle(90)
        time.sleep(0.1)
        door=0  
    elif msg_str == "enterd":
        print("接收进入网页 快速传递状态")
        enter=50
    else: 
        print("接受的指令", msg_str)
        client.publish('sta',door)
        print("esp32无操作") 
# 设置订阅主题
async def conn_han(client):
    await client.subscribe('web', 1)
    await client.subscribe('door', 1)

config['subs_cb'] = msg_call
config['connect_coro'] = conn_han
def check_running(timer):
    print("程序正在运行")

# 创建一个定时器,每秒触发一次回调函数
tim = machine.Timer(-1)
tim.init(period=5000, mode=tim.PERIODIC, callback=check_running)
def run_microdot_app():
    app.run(host='0.0.0.0', port=80, debug=True, ssl=None)
async def main(client):
    print("connect config")
    await client.connect()
    if client._sta_if.isconnected():
            print("WiFi 已连接") 
    else:
            print("WiFi 未连接")
    global enter
    _thread.start_new_thread(run_microdot_app, ())
    n = 0
    webrepl.start()
    while True:
        # 获取当前使用的内存
        used_memory = gc.mem_alloc()  
        print("当前使用的内存:", used_memory, "bytes") 
        if enter>1:
#             print('进入网页 快速传递状态') 
            await client.publish('staesp', '{}'.format(used_memory), qos = 1) 
            print('sta', n,door) 
            await client.publish('sta', '{}'.format(door), qos = 1)
            enter -= 1
            print(enter)
            n += 2
            if n>10:
                n=0
        else:
            print('网页离线 esp32进入慢休眠')
            await client.publish('staesp', '{}'.format(used_memory), qos = 1)
            await asyncio.sleep(1)
            print('sta', n,door) 
            await client.publish('sta', '{}'.format(door), qos = 1)
            n += 1
            if n>10:
                n=0
        

# 程序入口
if __name__ == '__main__':
    # 定义舵机控制对象
    my_servo = Servo(machine.Pin(13), max_us=2500) 
    MQTTClient.DEBUG = True
    print("running config")
    client = MQTTClient(config) 
    try:
        asyncio.run(main(client)) 
    finally:
        client.close()  # Prevent LmacRxBlk:1 errors

wifi固件可用版本1.21+

开关门 2024 0610

from lib.microdot import Microdot,send_file,Request
from lib.microdot_websocket import with_websocket
import time
import _thread
import network
import webrepl
import machine 
from lib.mqqt_as import MQTTClient, config
import uasyncio as asyncio 
from lib.servo import Servo
import gc  
config['ssid'] = 'BlackSheep'
# config['ssid'] = 'honor'
config['wifi_pw'] = '87654321'
config['server'] = 'broker.emqx.io'
# config['server'] = '192.168.1.150'
door = 0
enter = 0
app = Microdot()
# 返回一个网页
@app.route('/')
def index(request):
    return send_file('public/index.html')

# 设置一个get请求 如果
@app.get('/on')
def index(request):
    global door
    # 实现 xuanzhuan1 方法的代码
    print("内网点击开门 esp32启动舵机")
    time.sleep(0.1)
    my_servo.write_angle(90)
    time.sleep(0.1)
    my_servo.write_angle(0); 
    time.sleep(2) 
    my_servo.write_angle(90)
    time.sleep(0.1)
    door=1
    # 如果收到get请求on就开灯
    print("Open")
    return "开灯了"
@app.get('/off')
def index(request):
    global door
    print("内网点击关门 esp32回正舵机")
    time.sleep(0.1)
    my_servo.write_angle(90)
    time.sleep(0.1)
    my_servo.write_angle(180);
    time.sleep(2) 
    my_servo.write_angle(90)
    time.sleep(0.1)
    door=0  
    # 如果收到get请求off就关灯
    print("Close")
    return "关灯了" 

# 监听消息接收
def msg_call(topic, msg, retained):
    global door  # 声明door是全局变量
    global enter
    print("接收到消息")
    topic_str=topic.decode()#订阅主题的名称
    msg_str=msg.decode()#订阅 的消息
    print("主题的名称",topic_str)
    print("消息",msg_str)
    if msg_str == "open":
        door=1
        print("网页端点击开门 esp32启动舵机") 
        time.sleep(0.1)
        my_servo.write_angle(90)
        time.sleep(0.1)
        my_servo.write_angle(0); 
        time.sleep(1.3) 
        my_servo.write_angle(90)
        time.sleep(0.9)
        my_servo.write_angle(180);
        time.sleep(1.1)
        my_servo.write_angle(90)
        time.sleep(0.1)
        
    elif msg_str == "close":
        door=0
        print("网页端点击关门 esp32回正舵机")
        time.sleep(0.1)
        my_servo.write_angle(90)
        time.sleep(0.1)
        my_servo.write_angle(0); 
        time.sleep(1.3) 
        my_servo.write_angle(90)
        time.sleep(0.9)
        my_servo.write_angle(180);
        time.sleep(1.1)
        my_servo.write_angle(90)
        time.sleep(0.1)
          
    elif msg_str == "enterd":
        print("接收进入网页 快速传递状态")
        enter=50
    else: 
        print("接受的指令", msg_str)
        client.publish('sta',door)
        print("esp32无操作") 
# 设置订阅主题
async def conn_han(client): 
    await client.subscribe('631mydoor', 1)

config['subs_cb'] = msg_call
config['connect_coro'] = conn_han
def check_running(timer):
    print("程序正在运行")

# 创建一个定时器,每秒触发一次回调函数
tim = machine.Timer(-1)
tim.init(period=5000, mode=tim.PERIODIC, callback=check_running)
def run_microdot_app():
    app.run(host='0.0.0.0', port=80, debug=True, ssl=None)
async def main(client): 
    print("connect config") 
    await client.connect()
    if client._sta_if.isconnected():
            print("WiFi 已连接") 
    else:
            print("WiFi 未连接")
    global enter
    _thread.start_new_thread(run_microdot_app, ())
    n = 0
    webrepl.start()
    while True:
        # 获取当前使用的内存
        used_memory = gc.mem_alloc()  
        print("当前使用的内存:", used_memory, "bytes") 
        if enter>1:
#             print('进入网页 快速传递状态') 
            await client.publish('631mystaesp', '{}'.format(used_memory), qos = 1) 
            print('sta', n,door) 
            await client.publish('631mysta', '{}'.format(door), qos = 1)
            enter -= 1
            print(enter)
            n += 2
            if n>10:
                n=0
        else:
            print('网页离线 esp32进入慢休眠')
            await client.publish('631mystaesp', '{}'.format(used_memory), qos = 1)
            await asyncio.sleep(1)
            print('sta', n,door) 
            await client.publish('631mysta', '{}'.format(door), qos = 1)
            n += 1
            if n>10:
                n=0
        

# 程序入口
if __name__ == '__main__':
    # 定义舵机控制对象
    my_servo = Servo(machine.Pin(13), max_us=2500) 
    MQTTClient.DEBUG = True
    print("running config")
    client = MQTTClient(config) 
    try:
        asyncio.run(main(client)) 
    finally:
        client.close()  # Prevent LmacRxBlk:1 errors




蓝牙通信 固件可用版本1.18 1.19

from machine import Pin
from machine import Timer
from neopixel import NeoPixel
import ubluetooth


#创建像素序列对象
#服务于板载WS2812B
np = NeoPixel(Pin(15), 1)
np[0]=(128,0,0)
np.write()

#接收消息设置颜色的方法
def processMsg(colorStr):
    cl=colorStr.split(",")
    np[0]=(int(cl[0]),int(cl[1]),int(cl[2]))
    np.write()


class ESP32_BLE():
    def __init__(self, name):
        #用于显示蓝牙连接状态的指示灯
        self.led = Pin(13, Pin.OUT)
        #完成未连接状态指示灯闪烁任务的定时器
        self.timer1 = Timer(0)
        #蓝牙设备名称
        self.name = name
        #创建BLE类对象
        self.ble = ubluetooth.BLE()
        #激活,使用任何其他相关操作前必须先激活
        self.ble.active(True)
        #设置未连接状态定时器闪烁任务
        self.disconnected()
        #设置蓝牙状态发生变化时的中断回调方法
        self.ble.irq(self.ble_irq)
        #注册蓝牙
        self.register()
        #广播蓝牙
        self.advertiser()
        #接收的消息字符串
        self.bleMsg = ""

    #状态切换为连接的方法
    def connected(self):
        #指示灯常量
        self.led.value(1)
        #执行指示灯闪烁的定时器关闭
        self.timer1.deinit()

    #在未连接状态设置定时器 #以完成指示灯闪烁的任务
    def disconnected(self):        
        self.timer1.init(period=100, mode=Timer.PERIODIC, callback=lambda t: self.led.value(not self.led.value()))

    #中断回调方法
    def ble_irq(self, event, data):
        #连接成功事件
        if event == 1:
            #关闭闪烁定时器
            self.connected()
        #断开事件
        elif event == 2: #_IRQ_CENTRAL_DISCONNECT:
            #广播蓝牙
            self.advertiser()
            #指示灯闪烁
            self.disconnected()
        #收到数据
        elif event == 3: 
            #读取数据字节序列    
            buffer = self.ble.gatts_read(self.rx)
            #转换为字符串
            self.bleMsg = buffer.decode('UTF-8').strip()
            #打印
            print(self.bleMsg)
    
    #注册服务
    def register(self):        
        # Nordic UART Service (NUS)
        NUS_UUID = '6E400001-B5A3-F393-E0A9-E50E24DCCA9E'
        RX_UUID = '6E400002-B5A3-F393-E0A9-E50E24DCCA9E'
        TX_UUID = '6E400003-B5A3-F393-E0A9-E50E24DCCA9E'
            
        BLE_NUS = ubluetooth.UUID(NUS_UUID)
        BLE_RX = (ubluetooth.UUID(RX_UUID), ubluetooth.FLAG_WRITE)
        BLE_TX = (ubluetooth.UUID(TX_UUID), ubluetooth.FLAG_NOTIFY)
            
        BLE_UART = (BLE_NUS, (BLE_TX, BLE_RX,))
        SERVICES = (BLE_UART, )
        ((self.tx, self.rx,), ) = self.ble.gatts_register_services(SERVICES)
    #发送数据
    def send(self, data):
        self.ble.gatts_notify(0, self.tx, data + '\n')
    #广播蓝牙
    def advertiser(self):
        name = bytes(self.name, 'UTF-8')
        adv_data = bytearray('\x02\x01\x02') + bytearray((len(name) + 1, 0x09)) + name
        self.ble.gap_advertise(100, adv_data)
        print(adv_data)
        print("\r\n")

ble = ESP32_BLE("ESP32BLE")
print("connected ....")

while True:
    if(len(ble.bleMsg)>0):
        processMsg(ble.bleMsg)
        #返回消息
        ble.send("from esp32\r\n")
        print("send ....")
        #清空消息
        ble.bleMsg=""

蓝牙舵机 2024 0607

from machine import Pin
from machine import Timer
from neopixel import NeoPixel
import ubluetooth 
from servo import Servo
import time
import _thread
import gc
global door
enter = 0
# 定义舵机控制对象
my_servo = Servo(Pin(13), max_us=2500)  
#接收消息设置颜色的方法
def processMsg(colorStr):
    print(colorStr)
    cl=colorStr.split(",")
    print(cl) 


class ESP32_BLE():
    def __init__(self, name):
        #用于显示蓝牙连接状态的指示灯
#         self.led = Pin(14, Pin.OUT) 
        #完成未连接状态指示灯闪烁任务的定时器
        self.timer1 = Timer(0)
        #蓝牙设备名称
        self.name = name
        #创建BLE类对象
        self.ble = ubluetooth.BLE()
        #激活,使用任何其他相关操作前必须先激活
        self.ble.active(True)
        #设置未连接状态定时器闪烁任务
        self.disconnected()
        #设置蓝牙状态发生变化时的中断回调方法
        self.ble.irq(self.ble_irq)
        #注册蓝牙
        self.register()
        #广播蓝牙
        self.advertiser()
        #接收的消息字符串
        self.bleMsg = ""

    #状态切换为连接的方法
    def connected(self):
        #指示灯常量
#         self.led.value(1)
        print("状态连接")
        #执行指示灯闪烁的定时器关闭
        self.timer1.deinit() 
    #在未连接状态设置定时器
    #以完成指示灯闪烁的任务
    def disconnected(self):        
    # 使用 lambda 形式定义回调函数,使代码更简洁
        self.timer1.init(period=5000, mode=Timer.PERIODIC, callback=lambda t: (print("没有连接"), print("程序正在运行") ))
#         self.timer1.init(period=100, mode=Timer.PERIODIC, callback=lambda t: self.led.value(not self.led.value()))

    #中断回调方法
    def ble_irq(self, event, data):
        #连接成功事件
        if event == 1:
            #关闭闪烁定时器
            self.connected()
        #断开事件
        elif event == 2: #_IRQ_CENTRAL_DISCONNECT:
            #广播蓝牙
            self.advertiser()
            #指示灯闪烁
            self.disconnected()
        #收到数据
        elif event == 3: 
            #读取数据字节序列    
            buffer = self.ble.gatts_read(self.rx)
            #转换为字符串
            self.bleMsg = buffer.decode('UTF-8').strip()
            #打印
            print(self.bleMsg)
            if self.bleMsg == "k":
                print("点击开门 esp32启动舵机")
                time.sleep(0.1)
                my_servo.write_angle(90)
                time.sleep(0.1)
                my_servo.write_angle(0); 
                time.sleep(1.5) 
                my_servo.write_angle(90)
                time.sleep(1)
                my_servo.write_angle(180);
                time.sleep(1.2)
                my_servo.write_angle(90)
                time.sleep(0.1)
                door=1
            elif self.bleMsg == "kg":
                print("点击关门 esp32回正舵机")
                time.sleep(0.1)
                my_servo.write_angle(90)
                time.sleep(0.1)
                my_servo.write_angle(180);
                time.sleep(1.2) 
                my_servo.write_angle(90)
                time.sleep(0.1)
                door=0  
            else: 
                print("接受的指令", self.bleMsg) 
                print("esp32无操作") 
    
    #注册服务
    def register(self):        
        # Nordic UART Service (NUS)
        NUS_UUID = '6E400001-B5A3-F393-E0A9-E50E24DCCA9E'
        RX_UUID = '6E400002-B5A3-F393-E0A9-E50E24DCCA9E'
        TX_UUID = '6E400003-B5A3-F393-E0A9-E50E24DCCA9E'
            
        BLE_NUS = ubluetooth.UUID(NUS_UUID)
        BLE_RX = (ubluetooth.UUID(RX_UUID), ubluetooth.FLAG_WRITE)
        BLE_TX = (ubluetooth.UUID(TX_UUID), ubluetooth.FLAG_NOTIFY)
            
        BLE_UART = (BLE_NUS, (BLE_TX, BLE_RX,))
        SERVICES = (BLE_UART, )
        ((self.tx, self.rx,), ) = self.ble.gatts_register_services(SERVICES)
    #发送数据
    def send(self, data):
        self.ble.gatts_notify(0, self.tx, data + '\n')
    #广播蓝牙
    def advertiser(self):
        name = bytes(self.name, 'UTF-8')
        adv_data = bytearray('\x02\x01\x02') + bytearray((len(name) + 1, 0x09)) + name
        self.ble.gap_advertise(100, adv_data)
        print(adv_data)
        print("\r\n")

ble = ESP32_BLE("ESP32BLE")
print("connected ....")

while True: 
    if(len(ble.bleMsg)>0):
        processMsg(ble.bleMsg)
        #返回消息
        ble.send("answer from esp32")
        print("send ....")
        #清空消息
        ble.bleMsg=""
        used_memory = gc.mem_alloc()
        print("当前使用的内存:", used_memory, "bytes")
        ble.send("当前使用的内存"+str(used_memory)) 

MG996R micropython 控制 180舵机 thonmy

from lib.microdot import Microdot,send_file,Request
from lib.microdot_websocket import with_websocket
import time
import _thread
import network
from machine import Pin
from servo import Servo  
from lib.mqqt_as import MQTTClient, config
import uasyncio as asyncio 
from servo import Servo 
config['ssid'] = 'BlackSheep' 
config['wifi_pw'] = '87654321'
config['server'] = 'mqt.kepao.icu'  
door = 0
enter = 0
app = Microdot()
# 返回一个网页
@app.route('/')
def index(request):
    return send_file('public/index.html')

# 设置一个get请求 如果
@app.get('/on')
def index(request):
    global door
    # 实现 xuanzhuan1 方法的代码
    print("内网点击开门 esp32启动舵机")
    time.sleep(0.1)
    my_servo.write_angle(180) 
    door=1
    # 如果收到get请求on就开灯
    print("Open")
    return "开灯了"
@app.get('/off')
def index(request):
    global door
    print("内网点击关门 esp32回正舵机")
    time.sleep(0.1)
    my_servo.write_angle(0) 
    time.sleep(0.1)
    door=0  
    # 如果收到get请求off就关灯
    print("Close")
    return "关灯了" 

# 监听消息接收
def msg_call(topic, msg, retained):
    global door  # 声明door是全局变量
    global enter
    print("接收到消息")
    topic_str=topic.decode()#订阅主题的名称
    msg_str=msg.decode()#订阅 的消息
    print("主题的名称",topic_str)
    print("消息",msg_str)
    if msg_str == "open":
        print("网页端点击开门 esp32启动舵机")
        time.sleep(0.1)
        my_servo.write_angle(180) 
        time.sleep(0.1)
        door=1
    elif msg_str == "close":
        print("网页端点击关门 esp32回正舵机")
        time.sleep(0.1)
        my_servo.write_angle(0) 
        time.sleep(0.1)
        door=0  
    elif msg_str == "enterd":
        print("接收进入网页 快速传递状态")
        enter=50
    else: 
        print("接受的指令", msg_str)
        client.publish('sta',door)
        print("esp32无操作") 
# 设置订阅主题
async def conn_han(client):
    await client.subscribe('web', 1)
    await client.subscribe('door', 1)

config['subs_cb'] = msg_call
config['connect_coro'] = conn_han
def run_microdot_app():
    app.run(host='0.0.0.0', port=80, debug=True, ssl=None)
async def main(client):
    await client.connect()
    global enter
    _thread.start_new_thread(run_microdot_app, ())
    n = 0
    while True:
        if enter>1:
#             print('进入网页 快速传递状态')
            await client.publish('staesp',"fast")
#             await asyncio.sleep(1)
            print('sta', n,door)
            # If WiFi is down the following will pause for the duration.
            await client.publish('sta', '{}'.format(door), qos = 1)
            enter -= 1
            print(enter)
            n += 2
            if n>10:
                n=0
        else:
            print('网页离线 esp32进入慢休眠')
            await client.publish('staesp',"sleep")
            await asyncio.sleep(5)
            print('sta', n,door)
            # If WiFi is down the following will pause for the duration.
            await client.publish('sta', '{}'.format(door), qos = 1)
            n += 1
            if n>10:
                n=0
        

# 程序入口
if __name__ == '__main__':
    # 定义舵机控制对象
    my_servo = Servo(Pin(13), max_us=2500) 
    MQTTClient.DEBUG = True
    client = MQTTClient(config) 
    try:
        asyncio.run(main(client)) 
    finally:
        client.close()  # Prevent LmacRxBlk:1 errors