Files
avto-kriselj/main.py
2026-05-15 08:12:54 +02:00

293 lines
8.0 KiB
Python

import time
import network
import socket
import struct
from machine import Pin, PWM
import asyncio
HOST = "0.0.0.0"
PORT = 8888
BUFFER_SIZE = 2048
INPUT_PACKET_TYPE = 0x00
INPUT_PACKET_SIZE = 17 # 1 byte type + 4 float32 values
PIN_MOTOR_PWM_RIGHT1 = 7
PIN_MOTOR_PWM_RIGHT2 = 6
PIN_MOTOR_PWM_RIGHT3 = 9
PIN_MOTOR_PWM_RIGHT4 = 8
PIN_MOTOR_PWM_LEFT1 = 18
PIN_MOTOR_PWM_LEFT2 = 19
PIN_MOTOR_PWM_LEFT3 = 21
PIN_MOTOR_PWM_LEFT4 = 20
MOTOR_SPEED_MIN = -100
MOTOR_SPEED_MAX = 100
PWM_FREQ_HZ = 1000
JOYSTICK_DEADZONE = 0.08
INPUT_AXIS_ABS_MAX = 1.5
SSID = "5"
PASSWORD = "nevem123"
wlan = None
WOULD_BLOCK_ERROR_CODES = (11, 110)
def init_wifi(ssid, password):
global wlan
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, password)
connection_timeout = 10
while connection_timeout > 0:
if wlan.status() >= 3:
break
connection_timeout -= 1
print("Waiting for Wi-Fi connection...")
time.sleep(1)
if wlan.status() != 3:
print("Failed to connect.")
return False
print("Connection successful!")
print("IP address:", wlan.ifconfig()[0])
return True
def clamp(value, min_value, max_value):
if not is_finite(value):
return 0.0
if value < min_value:
return min_value
if value > max_value:
return max_value
return value
def is_finite(value):
return value == value and value != float("inf") and value != -float("inf")
def all_axes_reasonable(values):
for value in values:
if not is_finite(value):
return False
if abs(value) > INPUT_AXIS_ABS_MAX:
return False
return True
def unpack_input_axes(payload):
little = struct.unpack("<ffff", payload)
big = struct.unpack(">ffff", payload)
little_ok = all_axes_reasonable(little)
big_ok = all_axes_reasonable(big)
if little_ok and not big_ok:
return little
if big_ok and not little_ok:
return big
if little_ok and big_ok:
return little
return None
def apply_deadzone(value, deadzone):
if abs(value) < deadzone:
return 0.0
return value
def set_pwm_duty(pwm, normalized_value):
normalized = clamp(normalized_value, 0.0, 1.0)
try:
pwm.duty_u16(int(normalized * 65535))
except AttributeError:
pwm.duty(int(normalized * 1023))
def set_motor_speed(forward_pwm, reverse_pwm, speed):
speed = clamp(speed, MOTOR_SPEED_MIN, MOTOR_SPEED_MAX)
magnitude = abs(speed) / float(MOTOR_SPEED_MAX)
if speed >= 0:
set_pwm_duty(forward_pwm, magnitude)
set_pwm_duty(reverse_pwm, 0.0)
else:
set_pwm_duty(forward_pwm, 0.0)
set_pwm_duty(reverse_pwm, magnitude)
def setup_motors():
fl_forward = PWM(Pin(PIN_MOTOR_PWM_LEFT1), freq=PWM_FREQ_HZ)
fl_reverse = PWM(Pin(PIN_MOTOR_PWM_LEFT2), freq=PWM_FREQ_HZ)
rl_forward = PWM(Pin(PIN_MOTOR_PWM_LEFT3), freq=PWM_FREQ_HZ)
rl_reverse = PWM(Pin(PIN_MOTOR_PWM_LEFT4), freq=PWM_FREQ_HZ)
fr_forward = PWM(Pin(PIN_MOTOR_PWM_RIGHT1), freq=PWM_FREQ_HZ)
fr_reverse = PWM(Pin(PIN_MOTOR_PWM_RIGHT2), freq=PWM_FREQ_HZ)
rr_forward = PWM(Pin(PIN_MOTOR_PWM_RIGHT3), freq=PWM_FREQ_HZ)
rr_reverse = PWM(Pin(PIN_MOTOR_PWM_RIGHT4), freq=PWM_FREQ_HZ)
return {
"fl": (fl_forward, fl_reverse),
"rl": (rl_forward, rl_reverse),
"fr": (fr_forward, fr_reverse),
"rr": (rr_forward, rr_reverse),
}
def apply_tank_drive(motors, left_x, left_y, right_x, right_y):
left_cmd = apply_deadzone(clamp(-left_y, -1.0, 1.0), JOYSTICK_DEADZONE)
right_cmd = apply_deadzone(clamp(-right_y, -1.0, 1.0), JOYSTICK_DEADZONE)
left_speed = left_cmd * MOTOR_SPEED_MAX
right_speed = right_cmd * MOTOR_SPEED_MAX
set_motor_speed(motors["fl"][0], motors["fl"][1], left_speed)
set_motor_speed(motors["rl"][0], motors["rl"][1], left_speed)
set_motor_speed(motors["fr"][0], motors["fr"][1], right_speed)
set_motor_speed(motors["rr"][0], motors["rr"][1], right_speed)
print(
"Tank L:{:.1f} R:{:.1f} | LX:{:.3f} LY:{:.3f} RX:{:.3f} RY:{:.3f}".format(
left_speed, right_speed, left_x, left_y, right_x, right_y
)
)
def ip_string_to_int(ip_str):
parts = ip_str.split(".")
if len(parts) != 4:
raise ValueError("Invalid IP address format")
return (
(int(parts[0]) << 24)
| (int(parts[1]) << 16)
| (int(parts[2]) << 8)
| int(parts[3])
)
def send_ip_advertisement(sock):
ip = network.WLAN(network.STA_IF).ifconfig()[0]
# Create IP based of existing config
mask = network.WLAN(network.STA_IF).ifconfig()[1]
maskInt = ip_string_to_int(mask)
broadcastInt = ip_string_to_int(ip) | (~maskInt & 0xFFFFFFFF)
broadcast_ip_str = "{}.{}.{}.{}".format(
(broadcastInt >> 24) & 0xFF,
(broadcastInt >> 16) & 0xFF,
(broadcastInt >> 8) & 0xFF,
broadcastInt & 0xFF,
)
print("Broadcasting IP on {}:{}".format(broadcast_ip_str, PORT))
# Construct packet [0x01][IP as 4 bytes]
message = bytes([0x01]) + ip_string_to_int(ip).to_bytes(4, "big") # big endian
sock.sendto(message, (broadcast_ip_str, PORT))
async def periodic_ip_advertisement(sock):
while True:
try:
send_ip_advertisement(sock)
except Exception as err:
print("IP advertisement failed:", err)
await asyncio.sleep(1)
async def udp_server_loop(sock, motors):
while True:
try:
packet, sender = sock.recvfrom(BUFFER_SIZE)
except OSError as err:
if err.args and err.args[0] in WOULD_BLOCK_ERROR_CODES:
await asyncio.sleep_ms(10)
continue
raise
if not packet:
await asyncio.sleep_ms(1)
continue
packet_type = packet[0]
if packet_type == INPUT_PACKET_TYPE:
if len(packet) != INPUT_PACKET_SIZE:
print(
"Invalid INPUT packet size: {} bytes from {}:{} (expected {})".format(
len(packet), sender[0], sender[1], INPUT_PACKET_SIZE
)
)
for motor in motors.values():
set_motor_speed(motor[0], motor[1], 0)
continue
decoded = unpack_input_axes(packet[1:17])
if decoded is None:
print(
"Dropped INPUT packet with invalid axis values from {}:{}".format(
sender[0], sender[1]
)
)
for motor in motors.values():
set_motor_speed(motor[0], motor[1], 0)
continue
left_x, left_y, right_x, right_y = decoded
print(
"INPUT from {}:{} | L({:.3f}, {:.3f}) R({:.3f}, {:.3f})".format(
sender[0], sender[1], left_x, left_y, right_x, right_y
)
)
apply_tank_drive(motors, left_x, left_y, right_x, right_y)
else:
print(
"Unknown packet type 0x{:02X}: {} bytes from {}:{}".format(
packet_type, len(packet), sender[0], sender[1]
)
)
async def run_server():
if not init_wifi(SSID, PASSWORD):
return
motors = setup_motors()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.bind((HOST, PORT))
sock.setblocking(False)
print("UDP listener started on {}:{}".format(HOST, PORT))
advertise_task = asyncio.create_task(periodic_ip_advertisement(sock))
try:
await udp_server_loop(sock, motors)
except KeyboardInterrupt:
print("Program interrupted")
finally:
advertise_task.cancel()
try:
await advertise_task
except Exception:
pass
for motor in motors.values():
set_motor_speed(motor[0], motor[1], 0)
motor[0].deinit()
motor[1].deinit()
sock.close()
def main():
asyncio.run(run_server())
if __name__ == "__main__":
main()