import time import network import socket import struct from machine import Pin, PWM import asyncio HOST = "0.0.0.0" PORT = 8888 BUFFER_SIZE = 2048 INPUT_PACKET_TYPE = 0x00 INPUT_PACKET_SIZE = 17 # 1 byte type + 4 float32 values PIN_MOTOR_PWM_RIGHT1 = 7 PIN_MOTOR_PWM_RIGHT2 = 6 PIN_MOTOR_PWM_RIGHT3 = 9 PIN_MOTOR_PWM_RIGHT4 = 8 PIN_MOTOR_PWM_LEFT1 = 18 PIN_MOTOR_PWM_LEFT2 = 19 PIN_MOTOR_PWM_LEFT3 = 21 PIN_MOTOR_PWM_LEFT4 = 20 MOTOR_SPEED_MIN = -100 MOTOR_SPEED_MAX = 100 PWM_FREQ_HZ = 1000 JOYSTICK_DEADZONE = 0.08 INPUT_AXIS_ABS_MAX = 1.5 SSID = "5" PASSWORD = "nevem123" wlan = None WOULD_BLOCK_ERROR_CODES = (11, 110) def init_wifi(ssid, password): global wlan wlan = network.WLAN(network.STA_IF) wlan.active(True) wlan.connect(ssid, password) connection_timeout = 10 while connection_timeout > 0: if wlan.status() >= 3: break connection_timeout -= 1 print("Waiting for Wi-Fi connection...") time.sleep(1) if wlan.status() != 3: print("Failed to connect.") return False print("Connection successful!") print("IP address:", wlan.ifconfig()[0]) return True def clamp(value, min_value, max_value): if not is_finite(value): return 0.0 if value < min_value: return min_value if value > max_value: return max_value return value def is_finite(value): return value == value and value != float("inf") and value != -float("inf") def all_axes_reasonable(values): for value in values: if not is_finite(value): return False if abs(value) > INPUT_AXIS_ABS_MAX: return False return True def unpack_input_axes(payload): little = struct.unpack("ffff", payload) little_ok = all_axes_reasonable(little) big_ok = all_axes_reasonable(big) if little_ok and not big_ok: return little if big_ok and not little_ok: return big if little_ok and big_ok: return little return None def apply_deadzone(value, deadzone): if abs(value) < deadzone: return 0.0 return value def set_pwm_duty(pwm, normalized_value): normalized = clamp(normalized_value, 0.0, 1.0) try: pwm.duty_u16(int(normalized * 65535)) except AttributeError: pwm.duty(int(normalized * 1023)) def set_motor_speed(forward_pwm, reverse_pwm, speed): speed = clamp(speed, MOTOR_SPEED_MIN, MOTOR_SPEED_MAX) magnitude = abs(speed) / float(MOTOR_SPEED_MAX) if speed >= 0: set_pwm_duty(forward_pwm, magnitude) set_pwm_duty(reverse_pwm, 0.0) else: set_pwm_duty(forward_pwm, 0.0) set_pwm_duty(reverse_pwm, magnitude) def setup_motors(): fl_forward = PWM(Pin(PIN_MOTOR_PWM_LEFT1), freq=PWM_FREQ_HZ) fl_reverse = PWM(Pin(PIN_MOTOR_PWM_LEFT2), freq=PWM_FREQ_HZ) rl_forward = PWM(Pin(PIN_MOTOR_PWM_LEFT3), freq=PWM_FREQ_HZ) rl_reverse = PWM(Pin(PIN_MOTOR_PWM_LEFT4), freq=PWM_FREQ_HZ) fr_forward = PWM(Pin(PIN_MOTOR_PWM_RIGHT1), freq=PWM_FREQ_HZ) fr_reverse = PWM(Pin(PIN_MOTOR_PWM_RIGHT2), freq=PWM_FREQ_HZ) rr_forward = PWM(Pin(PIN_MOTOR_PWM_RIGHT3), freq=PWM_FREQ_HZ) rr_reverse = PWM(Pin(PIN_MOTOR_PWM_RIGHT4), freq=PWM_FREQ_HZ) return { "fl": (fl_forward, fl_reverse), "rl": (rl_forward, rl_reverse), "fr": (fr_forward, fr_reverse), "rr": (rr_forward, rr_reverse), } def apply_tank_drive(motors, left_x, left_y, right_x, right_y): left_cmd = apply_deadzone(clamp(-left_y, -1.0, 1.0), JOYSTICK_DEADZONE) right_cmd = apply_deadzone(clamp(-right_y, -1.0, 1.0), JOYSTICK_DEADZONE) left_speed = left_cmd * MOTOR_SPEED_MAX right_speed = right_cmd * MOTOR_SPEED_MAX set_motor_speed(motors["fl"][0], motors["fl"][1], left_speed) set_motor_speed(motors["rl"][0], motors["rl"][1], left_speed) set_motor_speed(motors["fr"][0], motors["fr"][1], right_speed) set_motor_speed(motors["rr"][0], motors["rr"][1], right_speed) print( "Tank L:{:.1f} R:{:.1f} | LX:{:.3f} LY:{:.3f} RX:{:.3f} RY:{:.3f}".format( left_speed, right_speed, left_x, left_y, right_x, right_y ) ) def ip_string_to_int(ip_str): parts = ip_str.split(".") if len(parts) != 4: raise ValueError("Invalid IP address format") return ( (int(parts[0]) << 24) | (int(parts[1]) << 16) | (int(parts[2]) << 8) | int(parts[3]) ) def send_ip_advertisement(sock): ip = network.WLAN(network.STA_IF).ifconfig()[0] # Create IP based of existing config mask = network.WLAN(network.STA_IF).ifconfig()[1] maskInt = ip_string_to_int(mask) broadcastInt = ip_string_to_int(ip) | (~maskInt & 0xFFFFFFFF) broadcast_ip_str = "{}.{}.{}.{}".format( (broadcastInt >> 24) & 0xFF, (broadcastInt >> 16) & 0xFF, (broadcastInt >> 8) & 0xFF, broadcastInt & 0xFF, ) print("Broadcasting IP on {}:{}".format(broadcast_ip_str, PORT)) # Construct packet [0x01][IP as 4 bytes] message = bytes([0x01]) + ip_string_to_int(ip).to_bytes(4, "big") # big endian sock.sendto(message, (broadcast_ip_str, PORT)) async def periodic_ip_advertisement(sock): while True: try: send_ip_advertisement(sock) except Exception as err: print("IP advertisement failed:", err) await asyncio.sleep(1) async def udp_server_loop(sock, motors): while True: try: packet, sender = sock.recvfrom(BUFFER_SIZE) except OSError as err: if err.args and err.args[0] in WOULD_BLOCK_ERROR_CODES: await asyncio.sleep_ms(10) continue raise if not packet: await asyncio.sleep_ms(1) continue packet_type = packet[0] if packet_type == INPUT_PACKET_TYPE: if len(packet) != INPUT_PACKET_SIZE: print( "Invalid INPUT packet size: {} bytes from {}:{} (expected {})".format( len(packet), sender[0], sender[1], INPUT_PACKET_SIZE ) ) for motor in motors.values(): set_motor_speed(motor[0], motor[1], 0) continue decoded = unpack_input_axes(packet[1:17]) if decoded is None: print( "Dropped INPUT packet with invalid axis values from {}:{}".format( sender[0], sender[1] ) ) for motor in motors.values(): set_motor_speed(motor[0], motor[1], 0) continue left_x, left_y, right_x, right_y = decoded print( "INPUT from {}:{} | L({:.3f}, {:.3f}) R({:.3f}, {:.3f})".format( sender[0], sender[1], left_x, left_y, right_x, right_y ) ) apply_tank_drive(motors, left_x, left_y, right_x, right_y) else: print( "Unknown packet type 0x{:02X}: {} bytes from {}:{}".format( packet_type, len(packet), sender[0], sender[1] ) ) async def run_server(): if not init_wifi(SSID, PASSWORD): return motors = setup_motors() sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) sock.bind((HOST, PORT)) sock.setblocking(False) print("UDP listener started on {}:{}".format(HOST, PORT)) advertise_task = asyncio.create_task(periodic_ip_advertisement(sock)) try: await udp_server_loop(sock, motors) except KeyboardInterrupt: print("Program interrupted") finally: advertise_task.cancel() try: await advertise_task except Exception: pass for motor in motors.values(): set_motor_speed(motor[0], motor[1], 0) motor[0].deinit() motor[1].deinit() sock.close() def main(): asyncio.run(run_server()) if __name__ == "__main__": main()