mbv: debugging the async app
This commit is contained in:
62
mbv/testecho.py
Normal file
62
mbv/testecho.py
Normal file
@@ -0,0 +1,62 @@
|
||||
import argparse
|
||||
import random
|
||||
import serial
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
||||
tty = "/dev/ttyUSB1"
|
||||
baud = 115200
|
||||
|
||||
|
||||
def sendrecv(s, size, timeout=5):
|
||||
"""Send some data and recv until the same data was received back."""
|
||||
data = random.randbytes(size)
|
||||
return sendcheck(s, data, timeout)
|
||||
|
||||
|
||||
def sendcheck(s, data, timeout=5):
|
||||
s.write(data)
|
||||
received = 0
|
||||
start = time.time()
|
||||
got = bytearray()
|
||||
while received < len(data):
|
||||
r = s.read()
|
||||
got += r
|
||||
try:
|
||||
assert(r == data[received: received + len(r)])
|
||||
assert(time.time() < start + timeout)
|
||||
except:
|
||||
fr = received - 10
|
||||
to = received + len(r) + 1
|
||||
sdat = data[fr: to]
|
||||
rdat = bytes(got[fr: to])
|
||||
print(f"failed after receiving {received} correct bytes")
|
||||
print(f"expected {sdat}")
|
||||
print(f" got {rdat}")
|
||||
sys.exit(-1)
|
||||
|
||||
received += len(r)
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser()
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
|
||||
s = serial.Serial(tty, baud, timeout=1)
|
||||
for i in range(12):
|
||||
size = 2**i
|
||||
print(f"Trying with {size} bytes")
|
||||
sendrecv(s, size)
|
||||
data = b'ze%s\x96:M#\xd8\x98\x9d\x96\xf5?\x80c\xc6\xa7\x03\xe0i\x04V\xcb\xa3\x95#GC\xabf\x98'
|
||||
#sendcheck(s, data)
|
||||
|
||||
print("All sizes passed.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user