68 lines
1.5 KiB
Python
68 lines
1.5 KiB
Python
import argparse
|
|
import serial
|
|
import threading
|
|
import time
|
|
import websocket
|
|
|
|
kDefaultSerialPort = None
|
|
kDefaultBaudrate = 115200
|
|
|
|
|
|
def getserial(port=None, baudrate=None, **kwargs):
|
|
if port is not None:
|
|
return serial.Serial(port, baudrate)
|
|
|
|
# return first /dev/ttyACMx found
|
|
for i in range(10):
|
|
try:
|
|
return serial.Serial(f"/dev/ttyACM{i}", baudrate)
|
|
except serial.SerialException:
|
|
pass
|
|
raise RuntimeError("No serial device available.")
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(description="Bridge a serial port to a websocket")
|
|
parser.add_argument(
|
|
"--port", help="path to the serial port", default=kDefaultSerialPort
|
|
)
|
|
parser.add_argument("--baudrate", default=kDefaultBaudrate)
|
|
parser.add_argument("ws", help="URL of the websocket")
|
|
return parser.parse_args()
|
|
|
|
|
|
def device_thread(device, ws):
|
|
while True:
|
|
line = device.readline()
|
|
ws.send(line)
|
|
|
|
|
|
def slowwrite(device, data):
|
|
for b in data:
|
|
device.write([b])
|
|
time.sleep(0.005)
|
|
|
|
|
|
def ws_thread(device, ws):
|
|
while True:
|
|
data = ws.recv()
|
|
slowwrite(device, data.replace(b"\n", b"\r\n"))
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
device = getserial(**vars(args))
|
|
ws = websocket.create_connection(args.ws)
|
|
|
|
wst = threading.Thread(target=ws_thread, args=(device, ws), daemon=True)
|
|
wst.start()
|
|
|
|
device_thread(device, ws)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except KeyboardInterrupt:
|
|
pass
|