293 lines
8.0 KiB
Python
293 lines
8.0 KiB
Python
import time
|
|
import network
|
|
import socket
|
|
import struct
|
|
from machine import Pin, PWM
|
|
import asyncio
|
|
|
|
HOST = "0.0.0.0"
|
|
PORT = 8888
|
|
BUFFER_SIZE = 2048
|
|
INPUT_PACKET_TYPE = 0x00
|
|
INPUT_PACKET_SIZE = 17 # 1 byte type + 4 float32 values
|
|
|
|
PIN_MOTOR_PWM_RIGHT1 = 7
|
|
PIN_MOTOR_PWM_RIGHT2 = 6
|
|
PIN_MOTOR_PWM_RIGHT3 = 9
|
|
PIN_MOTOR_PWM_RIGHT4 = 8
|
|
PIN_MOTOR_PWM_LEFT1 = 18
|
|
PIN_MOTOR_PWM_LEFT2 = 19
|
|
PIN_MOTOR_PWM_LEFT3 = 21
|
|
PIN_MOTOR_PWM_LEFT4 = 20
|
|
|
|
MOTOR_SPEED_MIN = -100
|
|
MOTOR_SPEED_MAX = 100
|
|
PWM_FREQ_HZ = 1000
|
|
JOYSTICK_DEADZONE = 0.08
|
|
INPUT_AXIS_ABS_MAX = 1.5
|
|
|
|
SSID = "5"
|
|
PASSWORD = "nevem123"
|
|
|
|
wlan = None
|
|
WOULD_BLOCK_ERROR_CODES = (11, 110)
|
|
|
|
def init_wifi(ssid, password):
|
|
global wlan
|
|
wlan = network.WLAN(network.STA_IF)
|
|
wlan.active(True)
|
|
wlan.connect(ssid, password)
|
|
|
|
connection_timeout = 10
|
|
while connection_timeout > 0:
|
|
if wlan.status() >= 3:
|
|
break
|
|
connection_timeout -= 1
|
|
print("Waiting for Wi-Fi connection...")
|
|
time.sleep(1)
|
|
|
|
if wlan.status() != 3:
|
|
print("Failed to connect.")
|
|
return False
|
|
|
|
print("Connection successful!")
|
|
print("IP address:", wlan.ifconfig()[0])
|
|
return True
|
|
|
|
|
|
def clamp(value, min_value, max_value):
|
|
if not is_finite(value):
|
|
return 0.0
|
|
if value < min_value:
|
|
return min_value
|
|
if value > max_value:
|
|
return max_value
|
|
return value
|
|
|
|
|
|
def is_finite(value):
|
|
return value == value and value != float("inf") and value != -float("inf")
|
|
|
|
|
|
def all_axes_reasonable(values):
|
|
for value in values:
|
|
if not is_finite(value):
|
|
return False
|
|
if abs(value) > INPUT_AXIS_ABS_MAX:
|
|
return False
|
|
return True
|
|
|
|
|
|
def unpack_input_axes(payload):
|
|
little = struct.unpack("<ffff", payload)
|
|
big = struct.unpack(">ffff", payload)
|
|
|
|
little_ok = all_axes_reasonable(little)
|
|
big_ok = all_axes_reasonable(big)
|
|
|
|
if little_ok and not big_ok:
|
|
return little
|
|
if big_ok and not little_ok:
|
|
return big
|
|
if little_ok and big_ok:
|
|
return little
|
|
return None
|
|
|
|
|
|
def apply_deadzone(value, deadzone):
|
|
if abs(value) < deadzone:
|
|
return 0.0
|
|
return value
|
|
|
|
|
|
def set_pwm_duty(pwm, normalized_value):
|
|
normalized = clamp(normalized_value, 0.0, 1.0)
|
|
try:
|
|
pwm.duty_u16(int(normalized * 65535))
|
|
except AttributeError:
|
|
pwm.duty(int(normalized * 1023))
|
|
|
|
|
|
def set_motor_speed(forward_pwm, reverse_pwm, speed):
|
|
speed = clamp(speed, MOTOR_SPEED_MIN, MOTOR_SPEED_MAX)
|
|
magnitude = abs(speed) / float(MOTOR_SPEED_MAX)
|
|
|
|
if speed >= 0:
|
|
set_pwm_duty(forward_pwm, magnitude)
|
|
set_pwm_duty(reverse_pwm, 0.0)
|
|
else:
|
|
set_pwm_duty(forward_pwm, 0.0)
|
|
set_pwm_duty(reverse_pwm, magnitude)
|
|
|
|
|
|
def setup_motors():
|
|
fl_forward = PWM(Pin(PIN_MOTOR_PWM_LEFT1), freq=PWM_FREQ_HZ)
|
|
fl_reverse = PWM(Pin(PIN_MOTOR_PWM_LEFT2), freq=PWM_FREQ_HZ)
|
|
|
|
rl_forward = PWM(Pin(PIN_MOTOR_PWM_LEFT3), freq=PWM_FREQ_HZ)
|
|
rl_reverse = PWM(Pin(PIN_MOTOR_PWM_LEFT4), freq=PWM_FREQ_HZ)
|
|
|
|
fr_forward = PWM(Pin(PIN_MOTOR_PWM_RIGHT1), freq=PWM_FREQ_HZ)
|
|
fr_reverse = PWM(Pin(PIN_MOTOR_PWM_RIGHT2), freq=PWM_FREQ_HZ)
|
|
|
|
rr_forward = PWM(Pin(PIN_MOTOR_PWM_RIGHT3), freq=PWM_FREQ_HZ)
|
|
rr_reverse = PWM(Pin(PIN_MOTOR_PWM_RIGHT4), freq=PWM_FREQ_HZ)
|
|
|
|
return {
|
|
"fl": (fl_forward, fl_reverse),
|
|
"rl": (rl_forward, rl_reverse),
|
|
"fr": (fr_forward, fr_reverse),
|
|
"rr": (rr_forward, rr_reverse),
|
|
}
|
|
|
|
|
|
def apply_tank_drive(motors, left_x, left_y, right_x, right_y):
|
|
left_cmd = apply_deadzone(clamp(-left_y, -1.0, 1.0), JOYSTICK_DEADZONE)
|
|
right_cmd = apply_deadzone(clamp(-right_y, -1.0, 1.0), JOYSTICK_DEADZONE)
|
|
|
|
left_speed = left_cmd * MOTOR_SPEED_MAX
|
|
right_speed = right_cmd * MOTOR_SPEED_MAX
|
|
|
|
set_motor_speed(motors["fl"][0], motors["fl"][1], left_speed)
|
|
set_motor_speed(motors["rl"][0], motors["rl"][1], left_speed)
|
|
set_motor_speed(motors["fr"][0], motors["fr"][1], right_speed)
|
|
set_motor_speed(motors["rr"][0], motors["rr"][1], right_speed)
|
|
|
|
print(
|
|
"Tank L:{:.1f} R:{:.1f} | LX:{:.3f} LY:{:.3f} RX:{:.3f} RY:{:.3f}".format(
|
|
left_speed, right_speed, left_x, left_y, right_x, right_y
|
|
)
|
|
)
|
|
|
|
def ip_string_to_int(ip_str):
|
|
parts = ip_str.split(".")
|
|
if len(parts) != 4:
|
|
raise ValueError("Invalid IP address format")
|
|
return (
|
|
(int(parts[0]) << 24)
|
|
| (int(parts[1]) << 16)
|
|
| (int(parts[2]) << 8)
|
|
| int(parts[3])
|
|
)
|
|
|
|
def send_ip_advertisement(sock):
|
|
ip = network.WLAN(network.STA_IF).ifconfig()[0]
|
|
|
|
# Create IP based of existing config
|
|
mask = network.WLAN(network.STA_IF).ifconfig()[1]
|
|
maskInt = ip_string_to_int(mask)
|
|
broadcastInt = ip_string_to_int(ip) | (~maskInt & 0xFFFFFFFF)
|
|
broadcast_ip_str = "{}.{}.{}.{}".format(
|
|
(broadcastInt >> 24) & 0xFF,
|
|
(broadcastInt >> 16) & 0xFF,
|
|
(broadcastInt >> 8) & 0xFF,
|
|
broadcastInt & 0xFF,
|
|
)
|
|
print("Broadcasting IP on {}:{}".format(broadcast_ip_str, PORT))
|
|
|
|
# Construct packet [0x01][IP as 4 bytes]
|
|
message = bytes([0x01]) + ip_string_to_int(ip).to_bytes(4, "big") # big endian
|
|
sock.sendto(message, (broadcast_ip_str, PORT))
|
|
|
|
async def periodic_ip_advertisement(sock):
|
|
while True:
|
|
try:
|
|
send_ip_advertisement(sock)
|
|
except Exception as err:
|
|
print("IP advertisement failed:", err)
|
|
await asyncio.sleep(1)
|
|
|
|
|
|
async def udp_server_loop(sock, motors):
|
|
while True:
|
|
try:
|
|
packet, sender = sock.recvfrom(BUFFER_SIZE)
|
|
except OSError as err:
|
|
if err.args and err.args[0] in WOULD_BLOCK_ERROR_CODES:
|
|
await asyncio.sleep_ms(10)
|
|
continue
|
|
raise
|
|
|
|
if not packet:
|
|
await asyncio.sleep_ms(1)
|
|
continue
|
|
|
|
packet_type = packet[0]
|
|
|
|
if packet_type == INPUT_PACKET_TYPE:
|
|
if len(packet) != INPUT_PACKET_SIZE:
|
|
print(
|
|
"Invalid INPUT packet size: {} bytes from {}:{} (expected {})".format(
|
|
len(packet), sender[0], sender[1], INPUT_PACKET_SIZE
|
|
)
|
|
)
|
|
for motor in motors.values():
|
|
set_motor_speed(motor[0], motor[1], 0)
|
|
continue
|
|
|
|
decoded = unpack_input_axes(packet[1:17])
|
|
if decoded is None:
|
|
print(
|
|
"Dropped INPUT packet with invalid axis values from {}:{}".format(
|
|
sender[0], sender[1]
|
|
)
|
|
)
|
|
for motor in motors.values():
|
|
set_motor_speed(motor[0], motor[1], 0)
|
|
continue
|
|
|
|
left_x, left_y, right_x, right_y = decoded
|
|
print(
|
|
"INPUT from {}:{} | L({:.3f}, {:.3f}) R({:.3f}, {:.3f})".format(
|
|
sender[0], sender[1], left_x, left_y, right_x, right_y
|
|
)
|
|
)
|
|
apply_tank_drive(motors, left_x, left_y, right_x, right_y)
|
|
else:
|
|
print(
|
|
"Unknown packet type 0x{:02X}: {} bytes from {}:{}".format(
|
|
packet_type, len(packet), sender[0], sender[1]
|
|
)
|
|
)
|
|
|
|
|
|
async def run_server():
|
|
if not init_wifi(SSID, PASSWORD):
|
|
return
|
|
|
|
motors = setup_motors()
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
|
sock.bind((HOST, PORT))
|
|
sock.setblocking(False)
|
|
|
|
print("UDP listener started on {}:{}".format(HOST, PORT))
|
|
advertise_task = asyncio.create_task(periodic_ip_advertisement(sock))
|
|
|
|
try:
|
|
await udp_server_loop(sock, motors)
|
|
except KeyboardInterrupt:
|
|
print("Program interrupted")
|
|
finally:
|
|
advertise_task.cancel()
|
|
try:
|
|
await advertise_task
|
|
except Exception:
|
|
pass
|
|
|
|
for motor in motors.values():
|
|
set_motor_speed(motor[0], motor[1], 0)
|
|
motor[0].deinit()
|
|
motor[1].deinit()
|
|
sock.close()
|
|
|
|
|
|
def main():
|
|
asyncio.run(run_server())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|