import argparse import serial import threading import time import websocket kDefaultSerialPort = None kDefaultBaudrate = 115200 def getserial(port=None, baudrate=None, **kwargs): if port is not None: return serial.Serial(port, baudrate) # return first /dev/ttyACMx found for i in range(10): try: return serial.Serial(f"/dev/ttyACM{i}", baudrate) except serial.SerialException: pass raise RuntimeError("No serial device available.") def parse_args(): parser = argparse.ArgumentParser(description="Bridge a serial port to a websocket") parser.add_argument( "--port", help="path to the serial port", default=kDefaultSerialPort ) parser.add_argument("--baudrate", default=kDefaultBaudrate) parser.add_argument("ws", help="URL of the websocket") return parser.parse_args() def device_thread(device, ws): while True: line = device.readline() ws.send(line) def slowwrite(device, data): for b in data: device.write([b]) time.sleep(0.005) def ws_thread(device, ws): while True: data = ws.recv() slowwrite(device, data.replace(b"\n", b"\r\n")) def main(): args = parse_args() device = getserial(**vars(args)) ws = websocket.create_connection(args.ws) wst = threading.Thread(target=ws_thread, args=(device, ws), daemon=True) wst.start() device_thread(device, ws) if __name__ == "__main__": try: main() except KeyboardInterrupt: pass