init
This commit is contained in:
3
.micropico
Normal file
3
.micropico
Normal file
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"info": "This file is just used to identify a project folder."
|
||||
}
|
||||
292
main.py
Normal file
292
main.py
Normal file
@@ -0,0 +1,292 @@
|
||||
import time
|
||||
import network
|
||||
import socket
|
||||
import struct
|
||||
from machine import Pin, PWM
|
||||
import asyncio
|
||||
|
||||
HOST = "0.0.0.0"
|
||||
PORT = 8888
|
||||
BUFFER_SIZE = 2048
|
||||
INPUT_PACKET_TYPE = 0x00
|
||||
INPUT_PACKET_SIZE = 17 # 1 byte type + 4 float32 values
|
||||
|
||||
PIN_MOTOR_PWM_RIGHT1 = 7
|
||||
PIN_MOTOR_PWM_RIGHT2 = 6
|
||||
PIN_MOTOR_PWM_RIGHT3 = 9
|
||||
PIN_MOTOR_PWM_RIGHT4 = 8
|
||||
PIN_MOTOR_PWM_LEFT1 = 18
|
||||
PIN_MOTOR_PWM_LEFT2 = 19
|
||||
PIN_MOTOR_PWM_LEFT3 = 21
|
||||
PIN_MOTOR_PWM_LEFT4 = 20
|
||||
|
||||
MOTOR_SPEED_MIN = -100
|
||||
MOTOR_SPEED_MAX = 100
|
||||
PWM_FREQ_HZ = 1000
|
||||
JOYSTICK_DEADZONE = 0.08
|
||||
INPUT_AXIS_ABS_MAX = 1.5
|
||||
|
||||
SSID = "5"
|
||||
PASSWORD = "nevem123"
|
||||
|
||||
wlan = None
|
||||
WOULD_BLOCK_ERROR_CODES = (11, 110)
|
||||
|
||||
def init_wifi(ssid, password):
|
||||
global wlan
|
||||
wlan = network.WLAN(network.STA_IF)
|
||||
wlan.active(True)
|
||||
wlan.connect(ssid, password)
|
||||
|
||||
connection_timeout = 10
|
||||
while connection_timeout > 0:
|
||||
if wlan.status() >= 3:
|
||||
break
|
||||
connection_timeout -= 1
|
||||
print("Waiting for Wi-Fi connection...")
|
||||
time.sleep(1)
|
||||
|
||||
if wlan.status() != 3:
|
||||
print("Failed to connect.")
|
||||
return False
|
||||
|
||||
print("Connection successful!")
|
||||
print("IP address:", wlan.ifconfig()[0])
|
||||
return True
|
||||
|
||||
|
||||
def clamp(value, min_value, max_value):
|
||||
if not is_finite(value):
|
||||
return 0.0
|
||||
if value < min_value:
|
||||
return min_value
|
||||
if value > max_value:
|
||||
return max_value
|
||||
return value
|
||||
|
||||
|
||||
def is_finite(value):
|
||||
return value == value and value != float("inf") and value != -float("inf")
|
||||
|
||||
|
||||
def all_axes_reasonable(values):
|
||||
for value in values:
|
||||
if not is_finite(value):
|
||||
return False
|
||||
if abs(value) > INPUT_AXIS_ABS_MAX:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def unpack_input_axes(payload):
|
||||
little = struct.unpack("<ffff", payload)
|
||||
big = struct.unpack(">ffff", payload)
|
||||
|
||||
little_ok = all_axes_reasonable(little)
|
||||
big_ok = all_axes_reasonable(big)
|
||||
|
||||
if little_ok and not big_ok:
|
||||
return little
|
||||
if big_ok and not little_ok:
|
||||
return big
|
||||
if little_ok and big_ok:
|
||||
return little
|
||||
return None
|
||||
|
||||
|
||||
def apply_deadzone(value, deadzone):
|
||||
if abs(value) < deadzone:
|
||||
return 0.0
|
||||
return value
|
||||
|
||||
|
||||
def set_pwm_duty(pwm, normalized_value):
|
||||
normalized = clamp(normalized_value, 0.0, 1.0)
|
||||
try:
|
||||
pwm.duty_u16(int(normalized * 65535))
|
||||
except AttributeError:
|
||||
pwm.duty(int(normalized * 1023))
|
||||
|
||||
|
||||
def set_motor_speed(forward_pwm, reverse_pwm, speed):
|
||||
speed = clamp(speed, MOTOR_SPEED_MIN, MOTOR_SPEED_MAX)
|
||||
magnitude = abs(speed) / float(MOTOR_SPEED_MAX)
|
||||
|
||||
if speed >= 0:
|
||||
set_pwm_duty(forward_pwm, magnitude)
|
||||
set_pwm_duty(reverse_pwm, 0.0)
|
||||
else:
|
||||
set_pwm_duty(forward_pwm, 0.0)
|
||||
set_pwm_duty(reverse_pwm, magnitude)
|
||||
|
||||
|
||||
def setup_motors():
|
||||
fl_forward = PWM(Pin(PIN_MOTOR_PWM_LEFT1), freq=PWM_FREQ_HZ)
|
||||
fl_reverse = PWM(Pin(PIN_MOTOR_PWM_LEFT2), freq=PWM_FREQ_HZ)
|
||||
|
||||
rl_forward = PWM(Pin(PIN_MOTOR_PWM_LEFT3), freq=PWM_FREQ_HZ)
|
||||
rl_reverse = PWM(Pin(PIN_MOTOR_PWM_LEFT4), freq=PWM_FREQ_HZ)
|
||||
|
||||
fr_forward = PWM(Pin(PIN_MOTOR_PWM_RIGHT1), freq=PWM_FREQ_HZ)
|
||||
fr_reverse = PWM(Pin(PIN_MOTOR_PWM_RIGHT2), freq=PWM_FREQ_HZ)
|
||||
|
||||
rr_forward = PWM(Pin(PIN_MOTOR_PWM_RIGHT3), freq=PWM_FREQ_HZ)
|
||||
rr_reverse = PWM(Pin(PIN_MOTOR_PWM_RIGHT4), freq=PWM_FREQ_HZ)
|
||||
|
||||
return {
|
||||
"fl": (fl_forward, fl_reverse),
|
||||
"rl": (rl_forward, rl_reverse),
|
||||
"fr": (fr_forward, fr_reverse),
|
||||
"rr": (rr_forward, rr_reverse),
|
||||
}
|
||||
|
||||
|
||||
def apply_tank_drive(motors, left_x, left_y, right_x, right_y):
|
||||
left_cmd = apply_deadzone(clamp(-left_y, -1.0, 1.0), JOYSTICK_DEADZONE)
|
||||
right_cmd = apply_deadzone(clamp(-right_y, -1.0, 1.0), JOYSTICK_DEADZONE)
|
||||
|
||||
left_speed = left_cmd * MOTOR_SPEED_MAX
|
||||
right_speed = right_cmd * MOTOR_SPEED_MAX
|
||||
|
||||
set_motor_speed(motors["fl"][0], motors["fl"][1], left_speed)
|
||||
set_motor_speed(motors["rl"][0], motors["rl"][1], left_speed)
|
||||
set_motor_speed(motors["fr"][0], motors["fr"][1], right_speed)
|
||||
set_motor_speed(motors["rr"][0], motors["rr"][1], right_speed)
|
||||
|
||||
print(
|
||||
"Tank L:{:.1f} R:{:.1f} | LX:{:.3f} LY:{:.3f} RX:{:.3f} RY:{:.3f}".format(
|
||||
left_speed, right_speed, left_x, left_y, right_x, right_y
|
||||
)
|
||||
)
|
||||
|
||||
def ip_string_to_int(ip_str):
|
||||
parts = ip_str.split(".")
|
||||
if len(parts) != 4:
|
||||
raise ValueError("Invalid IP address format")
|
||||
return (
|
||||
(int(parts[0]) << 24)
|
||||
| (int(parts[1]) << 16)
|
||||
| (int(parts[2]) << 8)
|
||||
| int(parts[3])
|
||||
)
|
||||
|
||||
def send_ip_advertisement(sock):
|
||||
ip = network.WLAN(network.STA_IF).ifconfig()[0]
|
||||
|
||||
# Create IP based of existing config
|
||||
mask = network.WLAN(network.STA_IF).ifconfig()[1]
|
||||
maskInt = ip_string_to_int(mask)
|
||||
broadcastInt = ip_string_to_int(ip) | (~maskInt & 0xFFFFFFFF)
|
||||
broadcast_ip_str = "{}.{}.{}.{}".format(
|
||||
(broadcastInt >> 24) & 0xFF,
|
||||
(broadcastInt >> 16) & 0xFF,
|
||||
(broadcastInt >> 8) & 0xFF,
|
||||
broadcastInt & 0xFF,
|
||||
)
|
||||
print("Broadcasting IP on {}:{}".format(broadcast_ip_str, PORT))
|
||||
|
||||
# Construct packet [0x01][IP as 4 bytes]
|
||||
message = bytes([0x01]) + ip_string_to_int(ip).to_bytes(4, "big") # big endian
|
||||
sock.sendto(message, (broadcast_ip_str, PORT))
|
||||
|
||||
async def periodic_ip_advertisement(sock):
|
||||
while True:
|
||||
try:
|
||||
send_ip_advertisement(sock)
|
||||
except Exception as err:
|
||||
print("IP advertisement failed:", err)
|
||||
await asyncio.sleep(1)
|
||||
|
||||
|
||||
async def udp_server_loop(sock, motors):
|
||||
while True:
|
||||
try:
|
||||
packet, sender = sock.recvfrom(BUFFER_SIZE)
|
||||
except OSError as err:
|
||||
if err.args and err.args[0] in WOULD_BLOCK_ERROR_CODES:
|
||||
await asyncio.sleep_ms(10)
|
||||
continue
|
||||
raise
|
||||
|
||||
if not packet:
|
||||
await asyncio.sleep_ms(1)
|
||||
continue
|
||||
|
||||
packet_type = packet[0]
|
||||
|
||||
if packet_type == INPUT_PACKET_TYPE:
|
||||
if len(packet) != INPUT_PACKET_SIZE:
|
||||
print(
|
||||
"Invalid INPUT packet size: {} bytes from {}:{} (expected {})".format(
|
||||
len(packet), sender[0], sender[1], INPUT_PACKET_SIZE
|
||||
)
|
||||
)
|
||||
for motor in motors.values():
|
||||
set_motor_speed(motor[0], motor[1], 0)
|
||||
continue
|
||||
|
||||
decoded = unpack_input_axes(packet[1:17])
|
||||
if decoded is None:
|
||||
print(
|
||||
"Dropped INPUT packet with invalid axis values from {}:{}".format(
|
||||
sender[0], sender[1]
|
||||
)
|
||||
)
|
||||
for motor in motors.values():
|
||||
set_motor_speed(motor[0], motor[1], 0)
|
||||
continue
|
||||
|
||||
left_x, left_y, right_x, right_y = decoded
|
||||
print(
|
||||
"INPUT from {}:{} | L({:.3f}, {:.3f}) R({:.3f}, {:.3f})".format(
|
||||
sender[0], sender[1], left_x, left_y, right_x, right_y
|
||||
)
|
||||
)
|
||||
apply_tank_drive(motors, left_x, left_y, right_x, right_y)
|
||||
else:
|
||||
print(
|
||||
"Unknown packet type 0x{:02X}: {} bytes from {}:{}".format(
|
||||
packet_type, len(packet), sender[0], sender[1]
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
async def run_server():
|
||||
if not init_wifi(SSID, PASSWORD):
|
||||
return
|
||||
|
||||
motors = setup_motors()
|
||||
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
||||
sock.bind((HOST, PORT))
|
||||
sock.setblocking(False)
|
||||
|
||||
print("UDP listener started on {}:{}".format(HOST, PORT))
|
||||
advertise_task = asyncio.create_task(periodic_ip_advertisement(sock))
|
||||
|
||||
try:
|
||||
await udp_server_loop(sock, motors)
|
||||
except KeyboardInterrupt:
|
||||
print("Program interrupted")
|
||||
finally:
|
||||
advertise_task.cancel()
|
||||
try:
|
||||
await advertise_task
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
for motor in motors.values():
|
||||
set_motor_speed(motor[0], motor[1], 0)
|
||||
motor[0].deinit()
|
||||
motor[1].deinit()
|
||||
sock.close()
|
||||
|
||||
|
||||
def main():
|
||||
asyncio.run(run_server())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
100
webpage/index.html
Normal file
100
webpage/index.html
Normal file
@@ -0,0 +1,100 @@
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<title>brum brum</title>
|
||||
<link type="text/css" rel="stylesheet" href="./style.css">
|
||||
</head>
|
||||
<style>
|
||||
body {
|
||||
background-color: #3a3a3a;
|
||||
}
|
||||
.button {
|
||||
position: absolute;
|
||||
width: 200px;
|
||||
height: 200px;
|
||||
border-radius: 25px;
|
||||
font-size: 100px;
|
||||
background-color: #8a0261;
|
||||
border: none;
|
||||
color: rgba(255, 255, 255, 0.92);
|
||||
display: flex;
|
||||
align-items: center;
|
||||
justify-content: center;
|
||||
cursor: pointer;
|
||||
}
|
||||
button.button:hover{
|
||||
background-color: #bb0284;
|
||||
}
|
||||
button.button:active{
|
||||
background-color: #d10594;
|
||||
}
|
||||
#dalinec {
|
||||
display: block;
|
||||
margin: auto;
|
||||
margin-top: 100px;
|
||||
background-color: #333;
|
||||
border-radius: 25px;
|
||||
width: 800px;
|
||||
height: 800px;
|
||||
position: relative;
|
||||
}
|
||||
#levo {
|
||||
left: 25%;
|
||||
top: 50%;
|
||||
transform: translate(-50%, -50%);
|
||||
}
|
||||
#desno {
|
||||
left: 75%;
|
||||
top: 50%;
|
||||
transform: translate(-50%, -50%);
|
||||
}
|
||||
#gor {
|
||||
left: 50%;
|
||||
top: 25%;
|
||||
transform: translate(-50%, -50%);
|
||||
}
|
||||
#dol {
|
||||
left: 50%;
|
||||
top: 75%;
|
||||
transform: translate(-50%, -50%);
|
||||
}
|
||||
@media (max-width: 900px) {
|
||||
#dalinec {
|
||||
width: 90vw;
|
||||
height: 90vw;
|
||||
}
|
||||
.button {
|
||||
width: 20vw;
|
||||
height: 20vw;
|
||||
font-size: 8vw;
|
||||
}
|
||||
}
|
||||
</style>
|
||||
<body>
|
||||
<div id="dalinec">
|
||||
<form method="get" action="left">
|
||||
<button class="button" id="levo" type="submit" value="left">
|
||||
←
|
||||
</button>
|
||||
</form>
|
||||
<form method="get" action="up">
|
||||
<button class="button" id="gor" type="submit" value="up">
|
||||
↑
|
||||
</button>
|
||||
</form>
|
||||
<form method="get" action="down">
|
||||
<button class="button" id="dol" type="submit" value="down">
|
||||
↓
|
||||
</button>
|
||||
</form>
|
||||
<form method="get" action="right">
|
||||
<button class="button" id="desno" type="submit" value="right">
|
||||
→
|
||||
</button>
|
||||
</form>
|
||||
|
||||
</div>
|
||||
</body>
|
||||
</html>
|
||||
Reference in New Issue
Block a user