catprint/cheshire.py

165 lines
4.8 KiB
Python
Raw Permalink Normal View History

2022-09-10 06:40:26 +00:00
import argparse
import asyncio
import os
import socket
import sys
import time
import bleak
CAT_NAME = 'GB03'
CAT_SERVICE = '0000ae30-0000-1000-8000-00805f9b34fb'
CAT_CHARAC = '0000ae01-0000-1000-8000-00805f9b34fb'
DEFAULT_SOCKET = '/tmp/catprint.s'
async def getcat():
devices = await bleak.BleakScanner.discover()
for d in devices:
print(f'- {d.name}')
if d.name == CAT_NAME:
return d
return None
async def getcharac(client):
await client.get_services()
for service in client.services:
if service.uuid != CAT_SERVICE:
continue
for charac in service.characteristics:
if charac.uuid != CAT_CHARAC:
continue
return charac
return None
async def endprint(client):
pass
CHECKSUM_TABLE = [0, 7, 14, 9, 28, 27, 18, 21, 56, 63, 54, 49, 36, 35, 42, 45,
112, 119, 126, 121, 108, 107, 98, 101, 72, 79, 70, 65, 84, 83, 90, 93,
224, 231, 238, 233, 252, 251, 242, 245, 216, 223, 214, 209, 196, 195,
202, 205, 144, 151, 158, 153, 140, 139, 130, 133, 168, 175, 166, 161,
180, 179, 186, 189, 199, 192, 201, 206, 219, 220, 213, 210, 255, 248,
241, 246, 227, 228, 237, 234, 183, 176, 185, 190, 171, 172, 165, 162,
143, 136, 129, 134, 147, 148, 157, 154, 39, 32, 41, 46, 59, 60, 53, 50,
31, 24, 17, 22, 3, 4, 13, 10, 87, 80, 89, 94, 75, 76, 69, 66, 111, 104,
97, 102, 115, 116, 125, 122, 137, 142, 135, 128, 149, 146, 155, 156,
177, 182, 191, 184, 173, 170, 163, 164, 249, 254, 247, 240, 229, 226,
235, 236, 193, 198, 207, 200, 221, 218, 211, 212, 105, 110, 103, 96,
117, 114, 123, 124, 81, 86, 95, 88, 77, 74, 67, 68, 25, 30, 23, 16, 5,
2, 11, 12, 33, 38, 47, 40, 61, 58, 51, 52, 78, 73, 64, 71, 82, 85, 92,
91, 118, 113, 120, 127, 106, 109, 100, 99, 62, 57, 48, 55, 34, 37, 44,
43, 6, 1, 8, 15, 26, 29, 20, 19, 174, 169, 160, 167, 178, 181, 188,
187, 150, 145, 152, 159, 138, 141, 132, 131, 222, 217, 208, 215, 194,
197, 204, 203, 230, 225, 232, 239, 250, 253, 244, 243]
def checksum(data):
cs = 0
for d in data:
cs = CHECKSUM_TABLE[cs ^ d]
return cs
def packet(command, payload):
return bytes([
0x51, 0x78, # magic
command,
0, # always 0
len(payload),
0, # always 0
]) + bytes(payload) + bytes([checksum(payload), 0xff])
async def startprint(client):
await client.write_gatt_char(CAT_CHARAC, packet(0xbe, [0]))
await asyncio.sleep(0.05)
await client.write_gatt_char(CAT_CHARAC, packet(0xaf, [0xff, 0xff]))
await asyncio.sleep(0.05)
sent = [0]
async def scanline(client, data):
if sent[0] == 15:
sent[0] = 0
response = True
else:
sent[0] += 1
response = False
response = False
await client.write_gatt_char(CAT_CHARAC, packet(0xa2, data), response)
#await client.write_gatt_char(CAT_CHARAC, packet(0xa1, [1, 0]))
await asyncio.sleep(0.05)
async def feedpaper(client, lines=120):
await client.write_gatt_char(CAT_CHARAC, packet(0xa1, [lines, 0]))
await asyncio.sleep(0.02 * lines)
def parse_args():
parser = argparse.ArgumentParser(description='Print stuff on a cat.')
parser.add_argument('--address', help='address of the printer')
parser.add_argument('--socket', dest='socket_path',
help='socket of the daemon',
default=DEFAULT_SOCKET)
return parser.parse_args()
async def main(address=None, socket_path=None):
if address is None:
while True:
device = await getcat()
if device is None:
print('No cat printer found :(')
time.sleep(1)
continue
print('Cat printer found!')
print(f'{device.address}')
address = device.address
break
while True:
client = bleak.BleakClient(address)
await client.connect()
if client is not None:
print('connected!')
break
else:
print('could not connect')
time.sleep(1)
await startprint(client)
if os.path.exists(socket_path):
os.remove(socket_path)
server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
server.bind(socket_path)
while True:
datagram, addr = server.recvfrom(1024)
cmd = datagram[0]
print(f'got stuff from {addr}: {cmd}')
start = time.time()
if cmd == ord('f'): # feed
await feedpaper(client)
elif cmd == ord('s'): # scan
data = datagram[1:]
await scanline(client, data)
else:
continue
print(f'done in {time.time() - start} s')
server.sendto(b'done', addr)
if __name__ == "__main__":
asyncio.run(main(**vars(parse_args())))