为什么同样的代码可以控制sg90 但是控制不了180度 的MG996R
mg996R 舵机反向转动 比顺时针旋转需要更多的电量
垃圾的micro数据线 提供不了, 可以传输数据的正常的数据线 才能提供足够的电量
mqtt esp32 micropython
import time
import network
from machine import Pin
from servo import Servo
from lib.mqqt_as import MQTTClient, config
import uasyncio as asyncio
config['ssid'] = 'honor'
config['wifi_pw'] = '87654321'
config['server'] = 'mqt.kepao.icu'
# 监听消息接收
def msg_call(topic, msg, retained):
print("接收到消息")
topic_str=topic.decode()#订阅主题的名称
msg_str=msg.decode()#订阅 的消息
print("主题的名称",topic_str)
print("消息",msg_str)
handle_message(msg_str)
# 设置订阅主题
async def conn_han(client):
await client.subscribe('web', 1)
await client.subscribe('door', 1)
config['subs_cb'] = msg_call
config['connect_coro'] = conn_han
async def main(client):
await client.connect()
n = 0
while True:
await asyncio.sleep(50)
print('sta', n)
# If WiFi is down the following will pause for the duration.
await client.publish('sta', '{}'.format(n), qos = 1)
n += 1
def handle_message(message):
if message == "open":
print("网页端点击开门 esp32启动舵机")
# 在这里执行开门操作
my_servo.write_angle(90) # 舵机转动到90度位置
await client.publish('sta', 'opendoor success')
elif message == "close":
print("网页端点击关门 esp32回正舵机")
# 在这里执行关门操作
my_servo.write_angle(0) # 舵机转动到0度位置
await client.publish('sta', 'closedoor success')
else:
print("接受的指令", message)
print("esp32无操作")
# 定义舵机控制对象
my_servo = Servo(Pin(13), max_us=2500)
# 程序入口
if __name__ == '__main__':
# wifi_connect('BlackSheep','87654321')
MQTTClient.DEBUG = True # Optional: print diagnostic messages
client = MQTTClient(config)
try:
asyncio.run(main(client))
finally:
client.close() # Prevent LmacRxBlk:1 errors
if msg_str == "open":
print("网页端点击开门 esp32启动舵机")
my_servo.write_angle(0)
time.sleep(0.5)
my_servo.write_angle(45)
time.sleep(0.5)
my_servo.write_angle(90)
time.sleep(0.5)
my_servo.write_angle(135)
time.sleep(0.5)
my_servo.write_angle(180)
time.sleep(0.5)
# 在这里执行开门操作
my_servo.write_angle(90) # 舵机转动到90度位置
await client.publish('sta', 'opendoor success')
elif msg_str == "close":
print("网页端点击关门 esp32回正舵机")
# 在这里执行关门操作
my_servo.write_angle(0) # 舵机转动到0度位置
await client.publish('sta', 'closedoor success')
time.sleep(0.5)
my_servo.write_angle(45)
time.sleep(0.5)
my_servo.write_angle(90)
time.sleep(0.5)
my_servo.write_angle(135)
time.sleep(0.5)
my_servo.write_angle(180)
time.sleep(0.5)
else:
print("接受的指令", message)
print("esp32无操作")
[]
360不等待版本
import time
import network
from machine import Pin
from servo import Servo
from lib.mqqt_as import MQTTClient, config
import uasyncio as asyncio
from servo import Servo
config['ssid'] = 'BlackSheep'
config['wifi_pw'] = '87654321'
config['server'] = 'mqt.kepao.icu'
door = 0
# 监听消息接收
def msg_call(topic, msg, retained):
global door # 声明door是全局变量
print("接收到消息")
topic_str=topic.decode()#订阅主题的名称
msg_str=msg.decode()#订阅 的消息
print("主题的名称",topic_str)
print("消息",msg_str)
if msg_str == "open":
print("网页端点击开门 esp32启动舵机")
# my_servo.write_angle(0);
my_servo.write_angle(0);
asyncio.sleep(5)
my_servo.write_angle(360);
door=1
client.publish('sta',door)
elif msg_str == "close":
print("网页端点击关门 esp32回正舵机")
# my_servo.write_angle(180); # 舵机转动到0度位置
my_servo.write_angle(180);
asyncio.sleep(5)
my_servo.write_angle(360);
door=0
else:
print("接受的指令", msg_str)
print(door)
print("esp32无操作")
# 设置订阅主题
async def conn_han(client):
await client.subscribe('web', 1)
await client.subscribe('door', 1)
config['subs_cb'] = msg_call
config['connect_coro'] = conn_han
async def main(client):
await client.connect()
n = 0
while True:
# await asyncio.sleep(1)
print('sta', n,door)
# If WiFi is down the following will pause for the duration.
await client.publish('sta', '{}'.format(door), qos = 1)
n += 1
if n>10:
n=0
# 定义舵机控制对象
my_servo = Servo(Pin(13), max_us=2500)
# 程序入口
if __name__ == '__main__':
MQTTClient.DEBUG = True
client = MQTTClient(config)
try:
asyncio.run(main(client))
finally:
client.close() # Prevent LmacRxBlk:1 errors
180度
import time
import network
from machine import Pin
from servo import Servo
from lib.mqqt_as import MQTTClient, config
import uasyncio as asyncio
from servo import Servo
config['ssid'] = 'BlackSheep'
config['wifi_pw'] = '87654321'
config['server'] = 'mqt.kepao.icu'
door = 0
# 监听消息接收
def msg_call(topic, msg, retained):
global door # 声明door是全局变量
print("接收到消息")
topic_str=topic.decode()#订阅主题的名称
msg_str=msg.decode()#订阅 的消息
print("主题的名称",topic_str)
print("消息",msg_str)
if msg_str == "open":
print("网页端点击开门 esp32启动舵机")
my_servo.write_angle(0);
door=1
client.publish('sta',door)
elif msg_str == "close":
print("网页端点击关门 esp32回正舵机")
my_servo.write_angle(180); # 舵机转动到0度位置
door=0
else:
print("接受的指令", msg_str)
print(door)
print("esp32无操作")
# 设置订阅主题
async def conn_han(client):
await client.subscribe('web', 1)
await client.subscribe('door', 1)
config['subs_cb'] = msg_call
config['connect_coro'] = conn_han
async def main(client):
await client.connect()
n = 0
while True:
# await asyncio.sleep(1)
print('sta', n,door)
# If WiFi is down the following will pause for the duration.
await client.publish('sta', '{}'.format(door), qos = 1)
n += 1
if n>10:
n=0
# 定义舵机控制对象
my_servo = Servo(Pin(13), max_us=2500)
# 程序入口
if __name__ == '__main__':
MQTTClient.DEBUG = True
client = MQTTClient(config)
try:
asyncio.run(main(client))
finally:
client.close() # Prevent LmacRxBlk:1 errors
180 通过订阅sta enter来控制esp32 休眠运行状态
import time
import network
from machine import Pin
from servo import Servo
from lib.mqqt_as import MQTTClient, config
import uasyncio as asyncio
from servo import Servo
config['ssid'] = 'BlackSheep'
config['wifi_pw'] = '87654321'
config['server'] = 'mqt.kepao.icu'
door = 0
# 监听消息接收
def msg_call(topic, msg, retained):
global door # 声明door是全局变量
print("接收到消息")
topic_str=topic.decode()#订阅主题的名称
msg_str=msg.decode()#订阅 的消息
print("主题的名称",topic_str)
print("消息",msg_str)
if msg_str == "open":
print("网页端点击开门 esp32启动舵机")
# my_servo.write_angle(0);
my_servo.write_angle(0);
asyncio.sleep(5)
my_servo.write_angle(360);
door=1
client.publish('sta',door)
elif msg_str == "close":
print("网页端点击关门 esp32回正舵机")
# my_servo.write_angle(180); # 舵机转动到0度位置
my_servo.write_angle(180);
asyncio.sleep(5)
my_servo.write_angle(360);
door=0
else:
print("接受的指令", msg_str)
print(door)
print("esp32无操作")
# 设置订阅主题
async def conn_han(client):
await client.subscribe('web', 1)
await client.subscribe('door', 1)
config['subs_cb'] = msg_call
config['connect_coro'] = conn_han
async def main(client):
await client.connect()
n = 0
while True:
# await asyncio.sleep(1)
print('sta', n,door)
# If WiFi is down the following will pause for the duration.
await client.publish('sta', '{}'.format(door), qos = 1)
n += 1
if n>10:
n=0
# 定义舵机控制对象
my_servo = Servo(Pin(13), max_us=2500)
# 程序入口
if __name__ == '__main__':
MQTTClient.DEBUG = True
client = MQTTClient(config)
try:
asyncio.run(main(client))
finally:
client.close() # Prevent LmacRxBlk:1 errors
360 订阅状态
import time
import network
from machine import Pin
from servo import Servo
from lib.mqqt_as import MQTTClient, config
import uasyncio as asyncio
from servo import Servo
config['ssid'] = 'BlackSheep'
config['wifi_pw'] = '87654321'
config['server'] = 'mqt.kepao.icu'
door = 0
enter = 0
# 监听消息接收
def msg_call(topic, msg, retained):
global door # 声明door是全局变量
global enter
print("接收到消息")
topic_str=topic.decode()#订阅主题的名称
msg_str=msg.decode()#订阅 的消息
print("主题的名称",topic_str)
print("消息",msg_str)
if msg_str == "open":
print("网页端点击开门 esp32启动舵机")
my_servo.write_angle(0);
asyncio.sleep(5)
my_servo.write_angle(360);
door=1
elif msg_str == "close":
print("网页端点击关门 esp32回正舵机")
my_servo.write_angle(180);
asyncio.sleep(5)
my_servo.write_angle(360);
door=0
elif msg_str == "enterd":
print("接收进入网页 快速传递状态")
enter=10
else:
print("接受的指令", msg_str)
client.publish('sta',door)
print("esp32无操作")
# 设置订阅主题
async def conn_han(client):
await client.subscribe('web', 1)
await client.subscribe('door', 1)
config['subs_cb'] = msg_call
config['connect_coro'] = conn_han
async def main(client):
await client.connect()
global enter
n = 0
while True:
if enter>1:
print('进入网页 快速传递状态')
await client.publish('staesp',"fast")
# await asyncio.sleep(1)
print('sta', n,door)
# If WiFi is down the following will pause for the duration.
await client.publish('sta', '{}'.format(door), qos = 1)
enter -= 1
print(enter)
n += 2
if n>10:
n=0
else:
print('网页离线 esp32进入慢休眠')
await client.publish('staesp',"sleep")
await asyncio.sleep(5)
print('sta', n,door)
# If WiFi is down the following will pause for the duration.
await client.publish('sta', '{}'.format(door), qos = 1)
n += 1
if n>10:
n=0
# 定义舵机控制对象
my_servo = Servo(Pin(13), max_us=2500)
# 程序入口
if __name__ == '__main__':
MQTTClient.DEBUG = True
client = MQTTClient(config)
try:
asyncio.run(main(client))
finally:
client.close() # Prevent LmacRxBlk:1 errors
多线程加入内网 原生不能异步多线程
HTTP服务器部分被放在一个名为start_server的函数中,并且使用_thread.start_new_thread来启动一个新的线程来运行HTTP服务器。这样,当程序执行到main()时,HTTP服务器将会在单独的线程中运行,而你可以在main()函数中执行其他需要的代码。
import usocket as socket
import _thread
import time
import network
from machine import Pin
from servo import Servo
from lib.mqqt_as import MQTTClient, config
import uasyncio as asyncio
from servo import Servo
config['ssid'] = 'BlackSheep'
config['wifi_pw'] = '87654321'
config['server'] = 'mqt.kepao.icu'
door = 0
enter = 0
def handle_request(client_socket):
# 处理HTTP请求
request_data = client_socket.recv(1024)
request_lines = request_data.split(b"\r\n")
if len(request_lines) > 0:
request_line = request_lines[0].decode("utf-8")
method, path, protocol = request_line.split(" ")
if method == "GET" and path == "/":
with open("public/index.html", "rb") as f:
response_body = f.read()
response = (
"HTTP/1.1 200 OK\r\n"
"Content-Type: text/html\r\n"
"Content-Length: {}\r\n"
"\r\n"
).format(len(response_body))
client_socket.send(response.encode("utf-8"))
client_socket.send(response_body)
elif path == "/xuanzhuan1":
# 调用 xuanzhuan1 方法
xuanzhuan1()
response = "HTTP/1.1 200 OK\r\n\r\n"
client_socket.send(response.encode("utf-8"))
elif path == "/xuanzhuan2":
# 调用 xuanzhuan2 方法
xuanzhuan2()
response = "HTTP/1.1 200 OK\r\n\r\n"
client_socket.send(response.encode("utf-8"))
client_socket.close()
def start_server():
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('0.0.0.0', 80))
server_socket.listen(5)
print("Server is listening on port 80...")
while True:
client_socket, addr = server_socket.accept()
print("Connection from:", addr)
_thread.start_new_thread(handle_request, (client_socket,))
def xuanzhuan1():
global door
# 实现 xuanzhuan1 方法的代码
print("内网点击开门 esp32启动舵机")
my_servo.write_angle(0);
door=1
def xuanzhuan2():
global door
print("内网点击关门 esp32回正舵机")
my_servo.write_angle(180);
door=0
# 监听消息接收
def msg_call(topic, msg, retained):
global door # 声明door是全局变量
global enter
print("接收到消息")
topic_str=topic.decode()#订阅主题的名称
msg_str=msg.decode()#订阅 的消息
print("主题的名称",topic_str)
print("消息",msg_str)
if msg_str == "open":
print("网页端点击开门 esp32启动舵机")
my_servo.write_angle(0);
door=1
elif msg_str == "close":
print("网页端点击关门 esp32回正舵机")
my_servo.write_angle(180);
door=0
elif msg_str == "enterd":
print("接收进入网页 快速传递状态")
enter=10
else:
print("接受的指令", msg_str)
client.publish('sta',door)
print("esp32无操作")
# 设置订阅主题
async def conn_han(client):
await client.subscribe('web', 1)
await client.subscribe('door', 1)
config['subs_cb'] = msg_call
config['connect_coro'] = conn_han
async def main(client):
await client.connect()
global enter
_thread.start_new_thread(start_server, ())
n = 0
while True:
if enter>1:
print('进入网页 快速传递状态')
await client.publish('staesp',"fast")
# await asyncio.sleep(1)
print('sta', n,door)
# If WiFi is down the following will pause for the duration.
await client.publish('sta', '{}'.format(door), qos = 1)
enter -= 1
print(enter)
n += 2
if n>10:
n=0
else:
print('网页离线 esp32进入慢休眠')
await client.publish('staesp',"sleep")
await asyncio.sleep(5)
print('sta', n,door)
# If WiFi is down the following will pause for the duration.
await client.publish('sta', '{}'.format(door), qos = 1)
n += 1
if n>10:
n=0
# 定义舵机控制对象
my_servo = Servo(Pin(13), max_us=2500)
# 程序入口
if __name__ == '__main__':
MQTTClient.DEBUG = True
client = MQTTClient(config)
try:
asyncio.run(main(client))
finally:
client.close() # Prevent LmacRxBlk:1 errors
microdot 异步运行搭建的web框架
from lib.microdot import Microdot,send_file,Request
from lib.microdot_websocket import with_websocket
import time
import _thread
import network
from machine import Pin
from servo import Servo
from lib.mqqt_as import MQTTClient, config
import uasyncio as asyncio
from servo import Servo
config['ssid'] = 'BlackSheep'
config['wifi_pw'] = '87654321'
config['server'] = 'mqt.kepao.icu'
door = 0
enter = 0
app = Microdot()
# 返回一个网页
@app.route('/')
def index(request):
return send_file('public/index.html')
# 设置一个get请求 如果
@app.get('/on')
def index(request):
global door
# 实现 xuanzhuan1 方法的代码
print("内网点击开门 esp32启动舵机")
my_servo.write_angle(0);
door=1
# 如果收到get请求on就开灯
print("Open")
return "开灯了"
@app.get('/off')
def index(request):
global door
print("内网点击关门 esp32回正舵机")
my_servo.write_angle(180);
door=0
# 如果收到get请求off就关灯
print("Close")
return "关灯了"
def xuanzhuan1():
global door
# 实现 xuanzhuan1 方法的代码
print("内网点击开门 esp32启动舵机")
my_servo.write_angle(0);
door=1
def xuanzhuan2():
global door
print("内网点击关门 esp32回正舵机")
my_servo.write_angle(180);
door=0
# 监听消息接收
def msg_call(topic, msg, retained):
global door # 声明door是全局变量
global enter
print("接收到消息")
topic_str=topic.decode()#订阅主题的名称
msg_str=msg.decode()#订阅 的消息
print("主题的名称",topic_str)
print("消息",msg_str)
if msg_str == "open":
print("网页端点击开门 esp32启动舵机")
my_servo.write_angle(0);
door=1
elif msg_str == "close":
print("网页端点击关门 esp32回正舵机")
my_servo.write_angle(180);
door=0
elif msg_str == "enterd":
print("接收进入网页 快速传递状态")
enter=10
else:
print("接受的指令", msg_str)
client.publish('sta',door)
print("esp32无操作")
# 设置订阅主题
async def conn_han(client):
await client.subscribe('web', 1)
await client.subscribe('door', 1)
config['subs_cb'] = msg_call
config['connect_coro'] = conn_han
def run_microdot_app():
app.run(host='0.0.0.0', port=80, debug=True, ssl=None)
async def main(client):
await client.connect()
global enter
_thread.start_new_thread(run_microdot_app, ())
n = 0
while True:
if enter>1:
# print('进入网页 快速传递状态')
await client.publish('staesp',"fast")
# await asyncio.sleep(1)
print('sta', n,door)
# If WiFi is down the following will pause for the duration.
await client.publish('sta', '{}'.format(door), qos = 1)
enter -= 1
print(enter)
n += 2
if n>10:
n=0
else:
print('网页离线 esp32进入慢休眠')
await client.publish('staesp',"sleep")
await asyncio.sleep(5)
print('sta', n,door)
# If WiFi is down the following will pause for the duration.
await client.publish('sta', '{}'.format(door), qos = 1)
n += 1
if n>10:
n=0
# 程序入口
if __name__ == '__main__':
# 定义舵机控制对象
my_servo = Servo(Pin(13), max_us=2500)
MQTTClient.DEBUG = True
client = MQTTClient(config)
try:
asyncio.run(main(client))
finally:
client.close() # Prevent LmacRxBlk:1 errors
MG996R micropython 控制 360舵机 thonmy
控制sg90 正常但是控制mg996r 需求注意停断
转另一个方向时,需要先停后进
0正传 180逆 90停
import time
from machine import Pin
from servo import Servo
# 定义舵机控制对象
my_servo = Servo(Pin(13), max_us=2500)
# 程序入口
if __name__ == '__main__':
while True:
# 正
my_servo.write_angle(0)
time.sleep(1.5)
# //停 回周期
my_servo.write_angle(90)
time.sleep(0.1)
# 逆
my_servo.write_angle(180)
time.sleep(1.5)
# //停 回回周期
my_servo.write_angle(90)
time.sleep(0.1)
MG996R micropython 控制 360舵机 thonmy
from lib.microdot import Microdot,send_file,Request
from lib.microdot_websocket import with_websocket
import time
import _thread
import network
from machine import Pin
from servo import Servo
from lib.mqqt_as import MQTTClient, config
import uasyncio as asyncio
from servo import Servo
config['ssid'] = 'BlackSheep'
config['wifi_pw'] = '87654321'
config['server'] = 'mqt.kepao.icu'
door = 0
enter = 0
app = Microdot()
# 返回一个网页
@app.route('/')
def index(request):
return send_file('public/index.html')
# 设置一个get请求 如果
@app.get('/on')
def index(request):
global door
# 实现 xuanzhuan1 方法的代码
print("内网点击开门 esp32启动舵机")
time.sleep(0.1)
my_servo.write_angle(90)
time.sleep(0.1)
my_servo.write_angle(0);
time.sleep(1.2)
my_servo.write_angle(90)
time.sleep(5)
my_servo.write_angle(180);
time.sleep(1.2)
my_servo.write_angle(90)
time.sleep(0.1)
door=1
# 如果收到get请求on就开灯
print("Open")
return "开灯了"
@app.get('/off')
def index(request):
global door
print("内网点击关门 esp32回正舵机")
time.sleep(0.1)
my_servo.write_angle(90)
time.sleep(0.1)
my_servo.write_angle(180);
time.sleep(1.2)
my_servo.write_angle(90)
time.sleep(0.1)
door=0
# 如果收到get请求off就关灯
print("Close")
return "关灯了"
# 监听消息接收
def msg_call(topic, msg, retained):
global door # 声明door是全局变量
global enter
print("接收到消息")
topic_str=topic.decode()#订阅主题的名称
msg_str=msg.decode()#订阅 的消息
print("主题的名称",topic_str)
print("消息",msg_str)
if msg_str == "open":
print("网页端点击开门 esp32启动舵机")
time.sleep(0.1)
my_servo.write_angle(90)
time.sleep(0.1)
my_servo.write_angle(0);
time.sleep(1.2)
my_servo.write_angle(90)
time.sleep(0.1)
door=1
elif msg_str == "close":
print("网页端点击关门 esp32回正舵机")
time.sleep(0.1)
my_servo.write_angle(90)
time.sleep(0.1)
my_servo.write_angle(180);
time.sleep(1.2)
my_servo.write_angle(90)
time.sleep(0.1)
door=0
elif msg_str == "enterd":
print("接收进入网页 快速传递状态")
enter=50
else:
print("接受的指令", msg_str)
client.publish('sta',door)
print("esp32无操作")
# 设置订阅主题
async def conn_han(client):
await client.subscribe('web', 1)
await client.subscribe('door', 1)
config['subs_cb'] = msg_call
config['connect_coro'] = conn_han
def run_microdot_app():
app.run(host='0.0.0.0', port=80, debug=True, ssl=None)
async def main(client):
await client.connect()
global enter
_thread.start_new_thread(run_microdot_app, ())
n = 0
while True:
if enter>1:
# print('进入网页 快速传递状态')
await client.publish('staesp',"fast")
# await asyncio.sleep(1)
print('sta', n,door)
# If WiFi is down the following will pause for the duration.
await client.publish('sta', '{}'.format(door), qos = 1)
enter -= 1
print(enter)
n += 2
if n>10:
n=0
else:
print('网页离线 esp32进入慢休眠')
await client.publish('staesp',"sleep")
await asyncio.sleep(5)
print('sta', n,door)
# If WiFi is down the following will pause for the duration.
await client.publish('sta', '{}'.format(door), qos = 1)
n += 1
if n>10:
n=0
# 程序入口
if __name__ == '__main__':
# 定义舵机控制对象
my_servo = Servo(Pin(13), max_us=2500)
MQTTClient.DEBUG = True
client = MQTTClient(config)
try:
asyncio.run(main(client))
finally:
client.close() # Prevent LmacRxBlk:1 errors
检测内存+wifi版本1
from lib.microdot import Microdot,send_file,Request
from lib.microdot_websocket import with_websocket
import time
import _thread
import network
from machine import Pin
from servo import Servo
from lib.mqqt_as import MQTTClient, config
import uasyncio as asyncio
from servo import Servo
import gc
config['ssid'] = 'BlackSheep'
config['wifi_pw'] = '87654321'
config['server'] = 'mqt.kepao.icu'
door = 0
enter = 0
app = Microdot()
# 返回一个网页
@app.route('/')
def index(request):
return send_file('public/index.html')
# 设置一个get请求 如果
@app.get('/on')
def index(request):
global door
# 实现 xuanzhuan1 方法的代码
print("内网点击开门 esp32启动舵机")
time.sleep(0.1)
my_servo.write_angle(90)
time.sleep(0.1)
my_servo.write_angle(0);
time.sleep(2)
my_servo.write_angle(90)
time.sleep(0.1)
door=1
# 如果收到get请求on就开灯
print("Open")
return "开灯了"
@app.get('/off')
def index(request):
global door
print("内网点击关门 esp32回正舵机")
time.sleep(0.1)
my_servo.write_angle(90)
time.sleep(0.1)
my_servo.write_angle(180);
time.sleep(2)
my_servo.write_angle(90)
time.sleep(0.1)
door=0
# 如果收到get请求off就关灯
print("Close")
return "关灯了"
# 监听消息接收
def msg_call(topic, msg, retained):
global door # 声明door是全局变量
global enter
print("接收到消息")
topic_str=topic.decode()#订阅主题的名称
msg_str=msg.decode()#订阅 的消息
print("主题的名称",topic_str)
print("消息",msg_str)
if msg_str == "open":
print("网页端点击开门 esp32启动舵机")
time.sleep(0.1)
my_servo.write_angle(90)
time.sleep(0.1)
my_servo.write_angle(0);
time.sleep(2)
my_servo.write_angle(90)
time.sleep(0.1)
door=1
elif msg_str == "close":
print("网页端点击关门 esp32回正舵机")
time.sleep(0.1)
my_servo.write_angle(90)
time.sleep(0.1)
my_servo.write_angle(180);
time.sleep(2)
my_servo.write_angle(90)
time.sleep(0.1)
door=0
elif msg_str == "enterd":
print("接收进入网页 快速传递状态")
enter=50
else:
print("接受的指令", msg_str)
client.publish('sta',door)
print("esp32无操作")
# 设置订阅主题
async def conn_han(client):
await client.subscribe('web', 1)
await client.subscribe('door', 1)
config['subs_cb'] = msg_call
config['connect_coro'] = conn_han
def run_microdot_app():
app.run(host='0.0.0.0', port=80, debug=True, ssl=None)
async def main(client):
await client.connect()
global enter
_thread.start_new_thread(run_microdot_app, ())
n = 0
while True:
if client._sta_if.isconnected():
print("WiFi 已连接")
else:
print("WiFi 未连接")
machine.reset()
# 获取当前使用的内存
used_memory = gc.mem_alloc()
print("当前使用的内存:", used_memory, "bytes")
if enter>1:
# print('进入网页 快速传递状态')
await client.publish('staesp', '{}'.format(used_memory), qos = 1)
# await asyncio.sleep(1)
print('sta', n,door)
# If WiFi is down the following will pause for the duration.
await client.publish('sta', '{}'.format(door), qos = 1)
enter -= 1
print(enter)
n += 2
if n>10:
n=0
else:
print('网页离线 esp32进入慢休眠')
await client.publish('staesp', '{}'.format(used_memory), qos = 1)
await asyncio.sleep(1)
print('sta', n,door)
# If WiFi is down the following will pause for the duration.
await client.publish('sta', '{}'.format(door), qos = 1)
n += 1
if n>10:
n=0
# 程序入口
if __name__ == '__main__':
# 定义舵机控制对象
my_servo = Servo(Pin(13), max_us=2500)
MQTTClient.DEBUG = True
client = MQTTClient(config)
try:
asyncio.run(main(client))
finally:
client.close() # Prevent LmacRxBlk:1 errors
不要内网 控制360
import time
import network
from machine import Pin
from servo import Servo
from lib.mqqt_as import MQTTClient, config
import uasyncio as asyncio
from servo import Servo
config['ssid'] = 'BlackSheep'
config['wifi_pw'] = '87654321'
config['server'] = 'mqt.kepao.icu'
door = 0
enter = 0
# 监听消息接收
def msg_call(topic, msg, retained):
global door # 声明door是全局变量
global enter
print("接收到消息")
topic_str=topic.decode()#订阅主题的名称
msg_str=msg.decode()#订阅 的消息
print("主题的名称",topic_str)
print("消息",msg_str)
if msg_str == "open":
print("网页端点击开门 esp32启动舵机")
time.sleep(0.1)
my_servo.write_angle(90)
time.sleep(0.1)
my_servo.write_angle(0);
time.sleep(1.2)
my_servo.write_angle(90)
time.sleep(0.1)
door=1
elif msg_str == "close":
print("网页端点击关门 esp32回正舵机")
time.sleep(0.1)
my_servo.write_angle(90)
time.sleep(0.1)
my_servo.write_angle(180);
time.sleep(1.2)
my_servo.write_angle(90)
time.sleep(0.1)
door=0
elif msg_str == "enterd":
print("接收进入网页 快速传递状态")
enter=99
else:
print("接受的指令", msg_str)
client.publish('sta',door)
print("esp32无操作")
# 设置订阅主题
async def conn_han(client):
await client.subscribe('door', 1)
config['subs_cb'] = msg_call
config['connect_coro'] = conn_han
async def main(client):
await client.connect()
global enter
n = 0
while True:
if enter>1:
# print('进入网页 快速传递状态')
await client.publish('staesp',"fast")
await asyncio.sleep(0.1)
print('sta', n,door)
# If WiFi is down the following will pause for the duration.
await client.publish('sta', '{}'.format(door), qos = 1)
enter -= 1
print(enter)
n += 2
if n>10:
n=0
else:
print('网页离线 esp32进入慢休眠')
await client.publish('staesp',"sleep")
await asyncio.sleep(5)
print('sta', n,door)
# If WiFi is down the following will pause for the duration.
await client.publish('sta', '{}'.format(door), qos = 1)
n += 1
if n>10:
n=0
# 程序入口
if __name__ == '__main__':
# 定义舵机控制对象
my_servo = Servo(Pin(13), max_us=2500)
MQTTClient.DEBUG = True
client = MQTTClient(config)
try:
asyncio.run(main(client))
finally:
client.close() # Prevent LmacRxBlk:1 errors
服务器断链 本地网络搭建mqtt
from lib.microdot import Microdot,send_file,Request
from lib.microdot_websocket import with_websocket
import time
import _thread
import network
import webrepl
import machine
from servo import Servo
from lib.mqqt_as import MQTTClient, config
import uasyncio as asyncio
from servo import Servo
import gc
# config['ssid'] = 'BlackSheep'
config['ssid'] = 'honor'
config['wifi_pw'] = '87654321'
# config['server'] = 'mqt.kepao.icu'
config['server'] = '192.168.118.202'
door = 0
enter = 0
app = Microdot()
# 返回一个网页
@app.route('/')
def index(request):
return send_file('public/index.html')
# 设置一个get请求 如果
@app.get('/on')
def index(request):
global door
# 实现 xuanzhuan1 方法的代码
print("内网点击开门 esp32启动舵机")
time.sleep(0.1)
my_servo.write_angle(90)
time.sleep(0.1)
my_servo.write_angle(0);
time.sleep(2)
my_servo.write_angle(90)
time.sleep(0.1)
door=1
# 如果收到get请求on就开灯
print("Open")
return "开灯了"
@app.get('/off')
def index(request):
global door
print("内网点击关门 esp32回正舵机")
time.sleep(0.1)
my_servo.write_angle(90)
time.sleep(0.1)
my_servo.write_angle(180);
time.sleep(2)
my_servo.write_angle(90)
time.sleep(0.1)
door=0
# 如果收到get请求off就关灯
print("Close")
return "关灯了"
# 监听消息接收
def msg_call(topic, msg, retained):
global door # 声明door是全局变量
global enter
print("接收到消息")
topic_str=topic.decode()#订阅主题的名称
msg_str=msg.decode()#订阅 的消息
print("主题的名称",topic_str)
print("消息",msg_str)
if msg_str == "open":
print("网页端点击开门 esp32启动舵机")
time.sleep(0.1)
my_servo.write_angle(90)
time.sleep(0.1)
my_servo.write_angle(0);
time.sleep(2)
my_servo.write_angle(90)
time.sleep(0.1)
door=1
elif msg_str == "close":
print("网页端点击关门 esp32回正舵机")
time.sleep(0.1)
my_servo.write_angle(90)
time.sleep(0.1)
my_servo.write_angle(180);
time.sleep(2)
my_servo.write_angle(90)
time.sleep(0.1)
door=0
elif msg_str == "enterd":
print("接收进入网页 快速传递状态")
enter=50
else:
print("接受的指令", msg_str)
client.publish('sta',door)
print("esp32无操作")
# 设置订阅主题
async def conn_han(client):
await client.subscribe('web', 1)
await client.subscribe('door', 1)
config['subs_cb'] = msg_call
config['connect_coro'] = conn_han
def check_running(timer):
print("程序正在运行")
# 创建一个定时器,每秒触发一次回调函数
tim = machine.Timer(-1)
tim.init(period=5000, mode=tim.PERIODIC, callback=check_running)
def run_microdot_app():
app.run(host='0.0.0.0', port=80, debug=True, ssl=None)
async def main(client):
print("connect config")
await client.connect()
if client._sta_if.isconnected():
print("WiFi 已连接")
else:
print("WiFi 未连接")
global enter
_thread.start_new_thread(run_microdot_app, ())
n = 0
webrepl.start()
while True:
# 获取当前使用的内存
used_memory = gc.mem_alloc()
print("当前使用的内存:", used_memory, "bytes")
if enter>1:
# print('进入网页 快速传递状态')
await client.publish('staesp', '{}'.format(used_memory), qos = 1)
print('sta', n,door)
await client.publish('sta', '{}'.format(door), qos = 1)
enter -= 1
print(enter)
n += 2
if n>10:
n=0
else:
print('网页离线 esp32进入慢休眠')
await client.publish('staesp', '{}'.format(used_memory), qos = 1)
await asyncio.sleep(1)
print('sta', n,door)
await client.publish('sta', '{}'.format(door), qos = 1)
n += 1
if n>10:
n=0
# 程序入口
if __name__ == '__main__':
# 定义舵机控制对象
my_servo = Servo(machine.Pin(13), max_us=2500)
MQTTClient.DEBUG = True
print("running config")
client = MQTTClient(config)
try:
asyncio.run(main(client))
finally:
client.close() # Prevent LmacRxBlk:1 errors
wifi固件可用版本1.21+
开关门 2024 0610
from lib.microdot import Microdot,send_file,Request
from lib.microdot_websocket import with_websocket
import time
import _thread
import network
import webrepl
import machine
from lib.mqqt_as import MQTTClient, config
import uasyncio as asyncio
from lib.servo import Servo
import gc
config['ssid'] = 'BlackSheep'
# config['ssid'] = 'honor'
config['wifi_pw'] = '87654321'
config['server'] = 'broker.emqx.io'
# config['server'] = '192.168.1.150'
door = 0
enter = 0
app = Microdot()
# 返回一个网页
@app.route('/')
def index(request):
return send_file('public/index.html')
# 设置一个get请求 如果
@app.get('/on')
def index(request):
global door
# 实现 xuanzhuan1 方法的代码
print("内网点击开门 esp32启动舵机")
time.sleep(0.1)
my_servo.write_angle(90)
time.sleep(0.1)
my_servo.write_angle(0);
time.sleep(2)
my_servo.write_angle(90)
time.sleep(0.1)
door=1
# 如果收到get请求on就开灯
print("Open")
return "开灯了"
@app.get('/off')
def index(request):
global door
print("内网点击关门 esp32回正舵机")
time.sleep(0.1)
my_servo.write_angle(90)
time.sleep(0.1)
my_servo.write_angle(180);
time.sleep(2)
my_servo.write_angle(90)
time.sleep(0.1)
door=0
# 如果收到get请求off就关灯
print("Close")
return "关灯了"
# 监听消息接收
def msg_call(topic, msg, retained):
global door # 声明door是全局变量
global enter
print("接收到消息")
topic_str=topic.decode()#订阅主题的名称
msg_str=msg.decode()#订阅 的消息
print("主题的名称",topic_str)
print("消息",msg_str)
if msg_str == "open":
door=1
print("网页端点击开门 esp32启动舵机")
time.sleep(0.1)
my_servo.write_angle(90)
time.sleep(0.1)
my_servo.write_angle(0);
time.sleep(1.3)
my_servo.write_angle(90)
time.sleep(0.9)
my_servo.write_angle(180);
time.sleep(1.1)
my_servo.write_angle(90)
time.sleep(0.1)
elif msg_str == "close":
door=0
print("网页端点击关门 esp32回正舵机")
time.sleep(0.1)
my_servo.write_angle(90)
time.sleep(0.1)
my_servo.write_angle(0);
time.sleep(1.3)
my_servo.write_angle(90)
time.sleep(0.9)
my_servo.write_angle(180);
time.sleep(1.1)
my_servo.write_angle(90)
time.sleep(0.1)
elif msg_str == "enterd":
print("接收进入网页 快速传递状态")
enter=50
else:
print("接受的指令", msg_str)
client.publish('sta',door)
print("esp32无操作")
# 设置订阅主题
async def conn_han(client):
await client.subscribe('631mydoor', 1)
config['subs_cb'] = msg_call
config['connect_coro'] = conn_han
def check_running(timer):
print("程序正在运行")
# 创建一个定时器,每秒触发一次回调函数
tim = machine.Timer(-1)
tim.init(period=5000, mode=tim.PERIODIC, callback=check_running)
def run_microdot_app():
app.run(host='0.0.0.0', port=80, debug=True, ssl=None)
async def main(client):
print("connect config")
await client.connect()
if client._sta_if.isconnected():
print("WiFi 已连接")
else:
print("WiFi 未连接")
global enter
_thread.start_new_thread(run_microdot_app, ())
n = 0
webrepl.start()
while True:
# 获取当前使用的内存
used_memory = gc.mem_alloc()
print("当前使用的内存:", used_memory, "bytes")
if enter>1:
# print('进入网页 快速传递状态')
await client.publish('631mystaesp', '{}'.format(used_memory), qos = 1)
print('sta', n,door)
await client.publish('631mysta', '{}'.format(door), qos = 1)
enter -= 1
print(enter)
n += 2
if n>10:
n=0
else:
print('网页离线 esp32进入慢休眠')
await client.publish('631mystaesp', '{}'.format(used_memory), qos = 1)
await asyncio.sleep(1)
print('sta', n,door)
await client.publish('631mysta', '{}'.format(door), qos = 1)
n += 1
if n>10:
n=0
# 程序入口
if __name__ == '__main__':
# 定义舵机控制对象
my_servo = Servo(machine.Pin(13), max_us=2500)
MQTTClient.DEBUG = True
print("running config")
client = MQTTClient(config)
try:
asyncio.run(main(client))
finally:
client.close() # Prevent LmacRxBlk:1 errors
蓝牙通信 固件可用版本1.18 1.19
from machine import Pin
from machine import Timer
from neopixel import NeoPixel
import ubluetooth
#创建像素序列对象
#服务于板载WS2812B
np = NeoPixel(Pin(15), 1)
np[0]=(128,0,0)
np.write()
#接收消息设置颜色的方法
def processMsg(colorStr):
cl=colorStr.split(",")
np[0]=(int(cl[0]),int(cl[1]),int(cl[2]))
np.write()
class ESP32_BLE():
def __init__(self, name):
#用于显示蓝牙连接状态的指示灯
self.led = Pin(13, Pin.OUT)
#完成未连接状态指示灯闪烁任务的定时器
self.timer1 = Timer(0)
#蓝牙设备名称
self.name = name
#创建BLE类对象
self.ble = ubluetooth.BLE()
#激活,使用任何其他相关操作前必须先激活
self.ble.active(True)
#设置未连接状态定时器闪烁任务
self.disconnected()
#设置蓝牙状态发生变化时的中断回调方法
self.ble.irq(self.ble_irq)
#注册蓝牙
self.register()
#广播蓝牙
self.advertiser()
#接收的消息字符串
self.bleMsg = ""
#状态切换为连接的方法
def connected(self):
#指示灯常量
self.led.value(1)
#执行指示灯闪烁的定时器关闭
self.timer1.deinit()
#在未连接状态设置定时器 #以完成指示灯闪烁的任务
def disconnected(self):
self.timer1.init(period=100, mode=Timer.PERIODIC, callback=lambda t: self.led.value(not self.led.value()))
#中断回调方法
def ble_irq(self, event, data):
#连接成功事件
if event == 1:
#关闭闪烁定时器
self.connected()
#断开事件
elif event == 2: #_IRQ_CENTRAL_DISCONNECT:
#广播蓝牙
self.advertiser()
#指示灯闪烁
self.disconnected()
#收到数据
elif event == 3:
#读取数据字节序列
buffer = self.ble.gatts_read(self.rx)
#转换为字符串
self.bleMsg = buffer.decode('UTF-8').strip()
#打印
print(self.bleMsg)
#注册服务
def register(self):
# Nordic UART Service (NUS)
NUS_UUID = '6E400001-B5A3-F393-E0A9-E50E24DCCA9E'
RX_UUID = '6E400002-B5A3-F393-E0A9-E50E24DCCA9E'
TX_UUID = '6E400003-B5A3-F393-E0A9-E50E24DCCA9E'
BLE_NUS = ubluetooth.UUID(NUS_UUID)
BLE_RX = (ubluetooth.UUID(RX_UUID), ubluetooth.FLAG_WRITE)
BLE_TX = (ubluetooth.UUID(TX_UUID), ubluetooth.FLAG_NOTIFY)
BLE_UART = (BLE_NUS, (BLE_TX, BLE_RX,))
SERVICES = (BLE_UART, )
((self.tx, self.rx,), ) = self.ble.gatts_register_services(SERVICES)
#发送数据
def send(self, data):
self.ble.gatts_notify(0, self.tx, data + '\n')
#广播蓝牙
def advertiser(self):
name = bytes(self.name, 'UTF-8')
adv_data = bytearray('\x02\x01\x02') + bytearray((len(name) + 1, 0x09)) + name
self.ble.gap_advertise(100, adv_data)
print(adv_data)
print("\r\n")
ble = ESP32_BLE("ESP32BLE")
print("connected ....")
while True:
if(len(ble.bleMsg)>0):
processMsg(ble.bleMsg)
#返回消息
ble.send("from esp32\r\n")
print("send ....")
#清空消息
ble.bleMsg=""
蓝牙舵机 2024 0607
from machine import Pin
from machine import Timer
from neopixel import NeoPixel
import ubluetooth
from servo import Servo
import time
import _thread
import gc
global door
enter = 0
# 定义舵机控制对象
my_servo = Servo(Pin(13), max_us=2500)
#接收消息设置颜色的方法
def processMsg(colorStr):
print(colorStr)
cl=colorStr.split(",")
print(cl)
class ESP32_BLE():
def __init__(self, name):
#用于显示蓝牙连接状态的指示灯
# self.led = Pin(14, Pin.OUT)
#完成未连接状态指示灯闪烁任务的定时器
self.timer1 = Timer(0)
#蓝牙设备名称
self.name = name
#创建BLE类对象
self.ble = ubluetooth.BLE()
#激活,使用任何其他相关操作前必须先激活
self.ble.active(True)
#设置未连接状态定时器闪烁任务
self.disconnected()
#设置蓝牙状态发生变化时的中断回调方法
self.ble.irq(self.ble_irq)
#注册蓝牙
self.register()
#广播蓝牙
self.advertiser()
#接收的消息字符串
self.bleMsg = ""
#状态切换为连接的方法
def connected(self):
#指示灯常量
# self.led.value(1)
print("状态连接")
#执行指示灯闪烁的定时器关闭
self.timer1.deinit()
#在未连接状态设置定时器
#以完成指示灯闪烁的任务
def disconnected(self):
# 使用 lambda 形式定义回调函数,使代码更简洁
self.timer1.init(period=5000, mode=Timer.PERIODIC, callback=lambda t: (print("没有连接"), print("程序正在运行") ))
# self.timer1.init(period=100, mode=Timer.PERIODIC, callback=lambda t: self.led.value(not self.led.value()))
#中断回调方法
def ble_irq(self, event, data):
#连接成功事件
if event == 1:
#关闭闪烁定时器
self.connected()
#断开事件
elif event == 2: #_IRQ_CENTRAL_DISCONNECT:
#广播蓝牙
self.advertiser()
#指示灯闪烁
self.disconnected()
#收到数据
elif event == 3:
#读取数据字节序列
buffer = self.ble.gatts_read(self.rx)
#转换为字符串
self.bleMsg = buffer.decode('UTF-8').strip()
#打印
print(self.bleMsg)
if self.bleMsg == "k":
print("点击开门 esp32启动舵机")
time.sleep(0.1)
my_servo.write_angle(90)
time.sleep(0.1)
my_servo.write_angle(0);
time.sleep(1.5)
my_servo.write_angle(90)
time.sleep(1)
my_servo.write_angle(180);
time.sleep(1.2)
my_servo.write_angle(90)
time.sleep(0.1)
door=1
elif self.bleMsg == "kg":
print("点击关门 esp32回正舵机")
time.sleep(0.1)
my_servo.write_angle(90)
time.sleep(0.1)
my_servo.write_angle(180);
time.sleep(1.2)
my_servo.write_angle(90)
time.sleep(0.1)
door=0
else:
print("接受的指令", self.bleMsg)
print("esp32无操作")
#注册服务
def register(self):
# Nordic UART Service (NUS)
NUS_UUID = '6E400001-B5A3-F393-E0A9-E50E24DCCA9E'
RX_UUID = '6E400002-B5A3-F393-E0A9-E50E24DCCA9E'
TX_UUID = '6E400003-B5A3-F393-E0A9-E50E24DCCA9E'
BLE_NUS = ubluetooth.UUID(NUS_UUID)
BLE_RX = (ubluetooth.UUID(RX_UUID), ubluetooth.FLAG_WRITE)
BLE_TX = (ubluetooth.UUID(TX_UUID), ubluetooth.FLAG_NOTIFY)
BLE_UART = (BLE_NUS, (BLE_TX, BLE_RX,))
SERVICES = (BLE_UART, )
((self.tx, self.rx,), ) = self.ble.gatts_register_services(SERVICES)
#发送数据
def send(self, data):
self.ble.gatts_notify(0, self.tx, data + '\n')
#广播蓝牙
def advertiser(self):
name = bytes(self.name, 'UTF-8')
adv_data = bytearray('\x02\x01\x02') + bytearray((len(name) + 1, 0x09)) + name
self.ble.gap_advertise(100, adv_data)
print(adv_data)
print("\r\n")
ble = ESP32_BLE("ESP32BLE")
print("connected ....")
while True:
if(len(ble.bleMsg)>0):
processMsg(ble.bleMsg)
#返回消息
ble.send("answer from esp32")
print("send ....")
#清空消息
ble.bleMsg=""
used_memory = gc.mem_alloc()
print("当前使用的内存:", used_memory, "bytes")
ble.send("当前使用的内存"+str(used_memory))
MG996R micropython 控制 180舵机 thonmy
from lib.microdot import Microdot,send_file,Request
from lib.microdot_websocket import with_websocket
import time
import _thread
import network
from machine import Pin
from servo import Servo
from lib.mqqt_as import MQTTClient, config
import uasyncio as asyncio
from servo import Servo
config['ssid'] = 'BlackSheep'
config['wifi_pw'] = '87654321'
config['server'] = 'mqt.kepao.icu'
door = 0
enter = 0
app = Microdot()
# 返回一个网页
@app.route('/')
def index(request):
return send_file('public/index.html')
# 设置一个get请求 如果
@app.get('/on')
def index(request):
global door
# 实现 xuanzhuan1 方法的代码
print("内网点击开门 esp32启动舵机")
time.sleep(0.1)
my_servo.write_angle(180)
door=1
# 如果收到get请求on就开灯
print("Open")
return "开灯了"
@app.get('/off')
def index(request):
global door
print("内网点击关门 esp32回正舵机")
time.sleep(0.1)
my_servo.write_angle(0)
time.sleep(0.1)
door=0
# 如果收到get请求off就关灯
print("Close")
return "关灯了"
# 监听消息接收
def msg_call(topic, msg, retained):
global door # 声明door是全局变量
global enter
print("接收到消息")
topic_str=topic.decode()#订阅主题的名称
msg_str=msg.decode()#订阅 的消息
print("主题的名称",topic_str)
print("消息",msg_str)
if msg_str == "open":
print("网页端点击开门 esp32启动舵机")
time.sleep(0.1)
my_servo.write_angle(180)
time.sleep(0.1)
door=1
elif msg_str == "close":
print("网页端点击关门 esp32回正舵机")
time.sleep(0.1)
my_servo.write_angle(0)
time.sleep(0.1)
door=0
elif msg_str == "enterd":
print("接收进入网页 快速传递状态")
enter=50
else:
print("接受的指令", msg_str)
client.publish('sta',door)
print("esp32无操作")
# 设置订阅主题
async def conn_han(client):
await client.subscribe('web', 1)
await client.subscribe('door', 1)
config['subs_cb'] = msg_call
config['connect_coro'] = conn_han
def run_microdot_app():
app.run(host='0.0.0.0', port=80, debug=True, ssl=None)
async def main(client):
await client.connect()
global enter
_thread.start_new_thread(run_microdot_app, ())
n = 0
while True:
if enter>1:
# print('进入网页 快速传递状态')
await client.publish('staesp',"fast")
# await asyncio.sleep(1)
print('sta', n,door)
# If WiFi is down the following will pause for the duration.
await client.publish('sta', '{}'.format(door), qos = 1)
enter -= 1
print(enter)
n += 2
if n>10:
n=0
else:
print('网页离线 esp32进入慢休眠')
await client.publish('staesp',"sleep")
await asyncio.sleep(5)
print('sta', n,door)
# If WiFi is down the following will pause for the duration.
await client.publish('sta', '{}'.format(door), qos = 1)
n += 1
if n>10:
n=0
# 程序入口
if __name__ == '__main__':
# 定义舵机控制对象
my_servo = Servo(Pin(13), max_us=2500)
MQTTClient.DEBUG = True
client = MQTTClient(config)
try:
asyncio.run(main(client))
finally:
client.close() # Prevent LmacRxBlk:1 errors