import argparse import random import serial import sys import threading import time tty = "/dev/ttyUSB1" baud = 115200 def sendrecv(s, size, timeout=5): """Send some data and recv until the same data was received back.""" data = random.randbytes(size) return sendcheck(s, data, timeout) def sendcheck(s, data, timeout=5): s.write(data) received = 0 start = time.time() got = bytearray() while received < len(data): r = s.read() got += r try: assert(r == data[received: received + len(r)]) assert(time.time() < start + timeout) except: fr = received - 10 to = received + len(r) + 1 sdat = data[fr: to] rdat = bytes(got[fr: to]) print(f"failed after receiving {received} correct bytes") print(f"expected {sdat}") print(f" got {rdat}") sys.exit(-1) received += len(r) def parse_args(): parser = argparse.ArgumentParser() return parser.parse_args() def main(): args = parse_args() s = serial.Serial(tty, baud, timeout=1) for i in range(12): size = 2**i print(f"Trying with {size} bytes") sendrecv(s, size) data = b'ze%s\x96:M#\xd8\x98\x9d\x96\xf5?\x80c\xc6\xa7\x03\xe0i\x04V\xcb\xa3\x95#GC\xabf\x98' #sendcheck(s, data) print("All sizes passed.") if __name__ == "__main__": main()