import argparse import serial import threading import time import websocket def parse_args(): parser = argparse.ArgumentParser(description="Bridge a serial port to a websocket") parser.add_argument("port", help="path to the serial port") parser.add_argument("--baudrate", default=115200) parser.add_argument("--escape", action='store_true') parser.add_argument("ws", help="URL of the websocket") return parser.parse_args() def device_thread(device, ws): while True: line = device.readline() ws.send(line) def slowwrite(device, data): for b in data: device.write([b]) time.sleep(0.01) def ws_thread(device, ws): while True: data = ws.recv(); slowwrite(device, data.replace(b'\n', b'\r\n')) def main(): args = parse_args() device = serial.Serial(args.port, baudrate=args.baudrate) ws = websocket.create_connection(args.ws) wst = threading.Thread(target=ws_thread, args=(device, ws), daemon=True) wst.start() if args.escape: device.write([0x03]) try: device_thread(device, ws) except KeyboardInterrupt: pass finally: if args.escape: device.write([0x02]) if __name__ == "__main__": main()