commit 3443c5cc9e89a10e9fbe5d821fd72915f9f3ec08 Author: DcruBro Date: Fri May 15 08:12:54 2026 +0200 init diff --git a/.micropico b/.micropico new file mode 100644 index 0000000..3de3977 --- /dev/null +++ b/.micropico @@ -0,0 +1,3 @@ +{ + "info": "This file is just used to identify a project folder." +} \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..7046daf --- /dev/null +++ b/main.py @@ -0,0 +1,292 @@ +import time +import network +import socket +import struct +from machine import Pin, PWM +import asyncio + +HOST = "0.0.0.0" +PORT = 8888 +BUFFER_SIZE = 2048 +INPUT_PACKET_TYPE = 0x00 +INPUT_PACKET_SIZE = 17 # 1 byte type + 4 float32 values + +PIN_MOTOR_PWM_RIGHT1 = 7 +PIN_MOTOR_PWM_RIGHT2 = 6 +PIN_MOTOR_PWM_RIGHT3 = 9 +PIN_MOTOR_PWM_RIGHT4 = 8 +PIN_MOTOR_PWM_LEFT1 = 18 +PIN_MOTOR_PWM_LEFT2 = 19 +PIN_MOTOR_PWM_LEFT3 = 21 +PIN_MOTOR_PWM_LEFT4 = 20 + +MOTOR_SPEED_MIN = -100 +MOTOR_SPEED_MAX = 100 +PWM_FREQ_HZ = 1000 +JOYSTICK_DEADZONE = 0.08 +INPUT_AXIS_ABS_MAX = 1.5 + +SSID = "5" +PASSWORD = "nevem123" + +wlan = None +WOULD_BLOCK_ERROR_CODES = (11, 110) + +def init_wifi(ssid, password): + global wlan + wlan = network.WLAN(network.STA_IF) + wlan.active(True) + wlan.connect(ssid, password) + + connection_timeout = 10 + while connection_timeout > 0: + if wlan.status() >= 3: + break + connection_timeout -= 1 + print("Waiting for Wi-Fi connection...") + time.sleep(1) + + if wlan.status() != 3: + print("Failed to connect.") + return False + + print("Connection successful!") + print("IP address:", wlan.ifconfig()[0]) + return True + + +def clamp(value, min_value, max_value): + if not is_finite(value): + return 0.0 + if value < min_value: + return min_value + if value > max_value: + return max_value + return value + + +def is_finite(value): + return value == value and value != float("inf") and value != -float("inf") + + +def all_axes_reasonable(values): + for value in values: + if not is_finite(value): + return False + if abs(value) > INPUT_AXIS_ABS_MAX: + return False + return True + + +def unpack_input_axes(payload): + little = struct.unpack("ffff", payload) + + little_ok = all_axes_reasonable(little) + big_ok = all_axes_reasonable(big) + + if little_ok and not big_ok: + return little + if big_ok and not little_ok: + return big + if little_ok and big_ok: + return little + return None + + +def apply_deadzone(value, deadzone): + if abs(value) < deadzone: + return 0.0 + return value + + +def set_pwm_duty(pwm, normalized_value): + normalized = clamp(normalized_value, 0.0, 1.0) + try: + pwm.duty_u16(int(normalized * 65535)) + except AttributeError: + pwm.duty(int(normalized * 1023)) + + +def set_motor_speed(forward_pwm, reverse_pwm, speed): + speed = clamp(speed, MOTOR_SPEED_MIN, MOTOR_SPEED_MAX) + magnitude = abs(speed) / float(MOTOR_SPEED_MAX) + + if speed >= 0: + set_pwm_duty(forward_pwm, magnitude) + set_pwm_duty(reverse_pwm, 0.0) + else: + set_pwm_duty(forward_pwm, 0.0) + set_pwm_duty(reverse_pwm, magnitude) + + +def setup_motors(): + fl_forward = PWM(Pin(PIN_MOTOR_PWM_LEFT1), freq=PWM_FREQ_HZ) + fl_reverse = PWM(Pin(PIN_MOTOR_PWM_LEFT2), freq=PWM_FREQ_HZ) + + rl_forward = PWM(Pin(PIN_MOTOR_PWM_LEFT3), freq=PWM_FREQ_HZ) + rl_reverse = PWM(Pin(PIN_MOTOR_PWM_LEFT4), freq=PWM_FREQ_HZ) + + fr_forward = PWM(Pin(PIN_MOTOR_PWM_RIGHT1), freq=PWM_FREQ_HZ) + fr_reverse = PWM(Pin(PIN_MOTOR_PWM_RIGHT2), freq=PWM_FREQ_HZ) + + rr_forward = PWM(Pin(PIN_MOTOR_PWM_RIGHT3), freq=PWM_FREQ_HZ) + rr_reverse = PWM(Pin(PIN_MOTOR_PWM_RIGHT4), freq=PWM_FREQ_HZ) + + return { + "fl": (fl_forward, fl_reverse), + "rl": (rl_forward, rl_reverse), + "fr": (fr_forward, fr_reverse), + "rr": (rr_forward, rr_reverse), + } + + +def apply_tank_drive(motors, left_x, left_y, right_x, right_y): + left_cmd = apply_deadzone(clamp(-left_y, -1.0, 1.0), JOYSTICK_DEADZONE) + right_cmd = apply_deadzone(clamp(-right_y, -1.0, 1.0), JOYSTICK_DEADZONE) + + left_speed = left_cmd * MOTOR_SPEED_MAX + right_speed = right_cmd * MOTOR_SPEED_MAX + + set_motor_speed(motors["fl"][0], motors["fl"][1], left_speed) + set_motor_speed(motors["rl"][0], motors["rl"][1], left_speed) + set_motor_speed(motors["fr"][0], motors["fr"][1], right_speed) + set_motor_speed(motors["rr"][0], motors["rr"][1], right_speed) + + print( + "Tank L:{:.1f} R:{:.1f} | LX:{:.3f} LY:{:.3f} RX:{:.3f} RY:{:.3f}".format( + left_speed, right_speed, left_x, left_y, right_x, right_y + ) + ) + +def ip_string_to_int(ip_str): + parts = ip_str.split(".") + if len(parts) != 4: + raise ValueError("Invalid IP address format") + return ( + (int(parts[0]) << 24) + | (int(parts[1]) << 16) + | (int(parts[2]) << 8) + | int(parts[3]) + ) + +def send_ip_advertisement(sock): + ip = network.WLAN(network.STA_IF).ifconfig()[0] + + # Create IP based of existing config + mask = network.WLAN(network.STA_IF).ifconfig()[1] + maskInt = ip_string_to_int(mask) + broadcastInt = ip_string_to_int(ip) | (~maskInt & 0xFFFFFFFF) + broadcast_ip_str = "{}.{}.{}.{}".format( + (broadcastInt >> 24) & 0xFF, + (broadcastInt >> 16) & 0xFF, + (broadcastInt >> 8) & 0xFF, + broadcastInt & 0xFF, + ) + print("Broadcasting IP on {}:{}".format(broadcast_ip_str, PORT)) + + # Construct packet [0x01][IP as 4 bytes] + message = bytes([0x01]) + ip_string_to_int(ip).to_bytes(4, "big") # big endian + sock.sendto(message, (broadcast_ip_str, PORT)) + +async def periodic_ip_advertisement(sock): + while True: + try: + send_ip_advertisement(sock) + except Exception as err: + print("IP advertisement failed:", err) + await asyncio.sleep(1) + + +async def udp_server_loop(sock, motors): + while True: + try: + packet, sender = sock.recvfrom(BUFFER_SIZE) + except OSError as err: + if err.args and err.args[0] in WOULD_BLOCK_ERROR_CODES: + await asyncio.sleep_ms(10) + continue + raise + + if not packet: + await asyncio.sleep_ms(1) + continue + + packet_type = packet[0] + + if packet_type == INPUT_PACKET_TYPE: + if len(packet) != INPUT_PACKET_SIZE: + print( + "Invalid INPUT packet size: {} bytes from {}:{} (expected {})".format( + len(packet), sender[0], sender[1], INPUT_PACKET_SIZE + ) + ) + for motor in motors.values(): + set_motor_speed(motor[0], motor[1], 0) + continue + + decoded = unpack_input_axes(packet[1:17]) + if decoded is None: + print( + "Dropped INPUT packet with invalid axis values from {}:{}".format( + sender[0], sender[1] + ) + ) + for motor in motors.values(): + set_motor_speed(motor[0], motor[1], 0) + continue + + left_x, left_y, right_x, right_y = decoded + print( + "INPUT from {}:{} | L({:.3f}, {:.3f}) R({:.3f}, {:.3f})".format( + sender[0], sender[1], left_x, left_y, right_x, right_y + ) + ) + apply_tank_drive(motors, left_x, left_y, right_x, right_y) + else: + print( + "Unknown packet type 0x{:02X}: {} bytes from {}:{}".format( + packet_type, len(packet), sender[0], sender[1] + ) + ) + + +async def run_server(): + if not init_wifi(SSID, PASSWORD): + return + + motors = setup_motors() + + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + sock.bind((HOST, PORT)) + sock.setblocking(False) + + print("UDP listener started on {}:{}".format(HOST, PORT)) + advertise_task = asyncio.create_task(periodic_ip_advertisement(sock)) + + try: + await udp_server_loop(sock, motors) + except KeyboardInterrupt: + print("Program interrupted") + finally: + advertise_task.cancel() + try: + await advertise_task + except Exception: + pass + + for motor in motors.values(): + set_motor_speed(motor[0], motor[1], 0) + motor[0].deinit() + motor[1].deinit() + sock.close() + + +def main(): + asyncio.run(run_server()) + + +if __name__ == "__main__": + main() + diff --git a/webpage/index.html b/webpage/index.html new file mode 100644 index 0000000..2eaef9e --- /dev/null +++ b/webpage/index.html @@ -0,0 +1,100 @@ + + + + + + brum brum + + + + +
+
+ +
+
+ +
+
+ +
+
+ +
+ +
+ +