commit 27d5763065b2c9519950a1d995cbcb44eda70434 Author: Paul Mathieu Date: Sat Sep 10 08:40:26 2022 +0200 Initial commit diff --git a/README.md b/README.md new file mode 100644 index 0000000..59dd31e --- /dev/null +++ b/README.md @@ -0,0 +1,31 @@ +catprint +======== + +Setup +----- + +``` +python3 -m venv venv/ +. venv/bin/activate +python -m pip install -r requirements.txt +``` + +Every time +---------- + +``` +. venv/bin/activate +``` + +Then one of the 2 ways to print: + +The self-contained, one-time print utility: +``` +python catprint.py --help +``` + +The daemon/client pair when several things need to be printed: +``` +python cheshire.py --help +python catclient.py --help +``` diff --git a/blelist.py b/blelist.py new file mode 100644 index 0000000..f7feef8 --- /dev/null +++ b/blelist.py @@ -0,0 +1,9 @@ +import asyncio +from bleak import BleakScanner + +async def main(): + devices = await BleakScanner.discover() + for d in devices: + print(d) + +asyncio.run(main()) diff --git a/catclient.py b/catclient.py new file mode 100644 index 0000000..cf50346 --- /dev/null +++ b/catclient.py @@ -0,0 +1,148 @@ +import argparse +import asyncio +import os +import socket +import sys +import tempfile + +import PIL.Image +import PIL.ImageDraw +import PIL.ImageFont +import qrcode + +HERE = os.path.dirname(__file__) +font = PIL.ImageFont.truetype(os.path.join(HERE, 'inconsolata.ttf'), 20) + +DEFAULT_SOCKET = '/tmp/catprint.s' + + +def text2img(text): + maxwidth=384 + height=21 + img = PIL.Image.new('1', (maxwidth, height)) + d = PIL.ImageDraw.Draw(img) + tw, th = d.textsize(text, font=font) + img = PIL.Image.new('1', (tw, height)) + d = PIL.ImageDraw.Draw(img) + print(f'({tw}, {th}) {text}') + d.text((0, 0), text, font=font, fill=(0xff,)) + dat = list(img.getdata()) + if tw == 0: + return height * [[]] + return [dat[i:i+tw] for i in range(0, len(dat), tw)] + + +def qr2img(payload): + img = qrcode.make(payload, box_size=3).get_image() + width, _ = img.size + dat = [0 if x else 1 for x in img.getdata()] + return [dat[i:i+width] for i in range(0, len(dat), width)] + + +def get_template0(qr_payload, line0, line1, line2=''): + qr = qrcode.make(qr_payload, box_size=3).get_image() + qr.putdata([0 if x else 1 for x in qr.getdata()]) + qrw, qrh = qr.size + width = 384 + height = qrh + textheight = 21 + texttop = (height - 3 * textheight) / 3 + textleft = qrw + 20 + img = PIL.Image.new('1', (width, height)) + img.paste(qr) + d = PIL.ImageDraw.Draw(img) + d.text((textleft, texttop + 0 * textheight), line0, font=font, fill=(0xff,)) + d.text((textleft, texttop + 1 * textheight), line1, font=font, fill=(0xff,)) + d.text((textleft, texttop + 2 * textheight), line2, font=font, fill=(0xff,)) + dat = list(img.getdata()) + return [dat[i:i+width] for i in range(0, len(dat), width)] + + +def img2printable(img): + for line in img: + out = [] + for i in range(0, len(line), 8): + val = 0 + for j in range(8): + if i+j >= len(line): + break + if line[i+j]: + val |= (1 << j) + out.append(val) + yield out + + + +def parse_args(): + parser = argparse.ArgumentParser(description='Print stuff on a cat.') + parser.add_argument('--socket', dest='socket_path', + help='socket of the daemon', + default=DEFAULT_SOCKET) + parser.add_argument('--text', action='store_true', help='print text from stdin') + parser.add_argument('--feed', action='store_true', help='feed paper') + parser.add_argument('--debug', action='store_true', help='only for debug') + parser.add_argument('--template0', help='print template0') + + return parser.parse_args() + + +# XXX: not async +class CatClient: + def __init__(self, socket_path): + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + self.clsock = tempfile.mktemp() + self.sock.bind(self.clsock) + self.socket_path = socket_path + + def flush(self): + try: + self.sock.setblocking(False) + while self.sock.recvfrom(1024): + pass + except BlockingIOError: + pass + finally: + self.sock.setblocking(True) + + def feedpaper(self): + self.flush() + self.sock.sendto(b'f', self.socket_path) + dat, addr = self.sock.recvfrom(1024) + + def scanline(self, data): + self.flush() + self.sock.sendto(b's' + data, self.socket_path) + dat, addr = self.sock.recvfrom(1024) + + +async def main(socket_path=None, text=None, feed=False, debug=False, template0=None): + if debug: + for line in sys.stdin: + img = text2img(line) + print('\n'.join(''.join('x' if x else ' ' for x in y) for y in img)) + return + + client = CatClient(socket_path) + + if text: + for line in sys.stdin: + if line.startswith('qr:'): + img = qr2img(line[3:]) + else: + img = text2img(line) + for data in img2printable(img): + if data: + client.scanline(bytes(data)) + else: + client.feedpaper(lines=1) + elif template0 is not None: + qr, line0, line1, line2 = template0.split(';') + img = get_template0(qr, line0, line1, line2) + for data in img2printable(img): + client.scanline(bytes(data)) + if feed: + client.feedpaper() + + +if __name__ == "__main__": + asyncio.run(main(**vars(parse_args()))) diff --git a/catprint.py b/catprint.py new file mode 100644 index 0000000..fda9c46 --- /dev/null +++ b/catprint.py @@ -0,0 +1,218 @@ +import argparse +import asyncio +import os +import sys + +import bleak +import PIL.Image +import PIL.ImageDraw +import PIL.ImageFont +import qrcode + +CAT_NAME = 'GB03' +CAT_SERVICE = '0000ae30-0000-1000-8000-00805f9b34fb' +CAT_CHARAC = '0000ae01-0000-1000-8000-00805f9b34fb' + + +async def getcat(): + devices = await bleak.BleakScanner.discover() + for d in devices: + print(f'- {d.name}') + if d.name == CAT_NAME: + return d + return None + + +async def getcharac(client): + await client.get_services() + for service in client.services: + if service.uuid != CAT_SERVICE: + continue + for charac in service.characteristics: + if charac.uuid != CAT_CHARAC: + continue + return charac + return None + + +async def endprint(client): + pass + + +HERE = os.path.dirname(__file__) +font = PIL.ImageFont.truetype(os.path.join(HERE, 'inconsolata.ttf'), 20) + + +def text2img(text): + maxwidth=384 + height=21 + img = PIL.Image.new('1', (maxwidth, height)) + d = PIL.ImageDraw.Draw(img) + tw, th = d.textsize(text, font=font) + img = PIL.Image.new('1', (tw, height)) + d = PIL.ImageDraw.Draw(img) + print(f'({tw}, {th}) {text}') + d.text((0, 0), text, font=font, fill=(0xff,)) + dat = list(img.getdata()) + if tw == 0: + return height * [[]] + return [dat[i:i+tw] for i in range(0, len(dat), tw)] + + +def qr2img(payload): + img = qrcode.make(payload, box_size=3).get_image() + width, _ = img.size + dat = [0 if x else 1 for x in img.getdata()] + return [dat[i:i+width] for i in range(0, len(dat), width)] + + +def get_template0(qr_payload, line0, line1, line2=''): + qr = qrcode.make(qr_payload, box_size=3).get_image() + qr.putdata([0 if x else 1 for x in qr.getdata()]) + qrw, qrh = qr.size + width = 384 + height = qrh + textheight = 21 + texttop = (height - 3 * textheight) / 3 + textleft = qrw + 20 + img = PIL.Image.new('1', (width, height)) + img.paste(qr) + d = PIL.ImageDraw.Draw(img) + d.text((textleft, texttop + 0 * textheight), line0, font=font, fill=(0xff,)) + d.text((textleft, texttop + 1 * textheight), line1, font=font, fill=(0xff,)) + d.text((textleft, texttop + 2 * textheight), line2, font=font, fill=(0xff,)) + dat = list(img.getdata()) + return [dat[i:i+width] for i in range(0, len(dat), width)] + + +def img2printable(img): + for line in img: + out = [] + for i in range(0, len(line), 8): + val = 0 + for j in range(8): + if i+j >= len(line): + break + if line[i+j]: + val |= (1 << j) + out.append(val) + yield out + + +CHECKSUM_TABLE = [0, 7, 14, 9, 28, 27, 18, 21, 56, 63, 54, 49, 36, 35, 42, 45, + 112, 119, 126, 121, 108, 107, 98, 101, 72, 79, 70, 65, 84, 83, 90, 93, + 224, 231, 238, 233, 252, 251, 242, 245, 216, 223, 214, 209, 196, 195, + 202, 205, 144, 151, 158, 153, 140, 139, 130, 133, 168, 175, 166, 161, + 180, 179, 186, 189, 199, 192, 201, 206, 219, 220, 213, 210, 255, 248, + 241, 246, 227, 228, 237, 234, 183, 176, 185, 190, 171, 172, 165, 162, + 143, 136, 129, 134, 147, 148, 157, 154, 39, 32, 41, 46, 59, 60, 53, 50, + 31, 24, 17, 22, 3, 4, 13, 10, 87, 80, 89, 94, 75, 76, 69, 66, 111, 104, + 97, 102, 115, 116, 125, 122, 137, 142, 135, 128, 149, 146, 155, 156, + 177, 182, 191, 184, 173, 170, 163, 164, 249, 254, 247, 240, 229, 226, + 235, 236, 193, 198, 207, 200, 221, 218, 211, 212, 105, 110, 103, 96, + 117, 114, 123, 124, 81, 86, 95, 88, 77, 74, 67, 68, 25, 30, 23, 16, 5, + 2, 11, 12, 33, 38, 47, 40, 61, 58, 51, 52, 78, 73, 64, 71, 82, 85, 92, + 91, 118, 113, 120, 127, 106, 109, 100, 99, 62, 57, 48, 55, 34, 37, 44, + 43, 6, 1, 8, 15, 26, 29, 20, 19, 174, 169, 160, 167, 178, 181, 188, + 187, 150, 145, 152, 159, 138, 141, 132, 131, 222, 217, 208, 215, 194, + 197, 204, 203, 230, 225, 232, 239, 250, 253, 244, 243] + +def checksum(data): + cs = 0 + for d in data: + cs = CHECKSUM_TABLE[cs ^ d] + return cs + + +def packet(command, payload): + return bytes([ + 0x51, 0x78, # magic + command, + 0, # always 0 + len(payload), + 0, # always 0 + ]) + bytes(payload) + bytes([checksum(payload), 0xff]) + + +async def startprint(client): + await client.write_gatt_char(CAT_CHARAC, packet(0xbe, [0])) + await asyncio.sleep(0.05) + await client.write_gatt_char(CAT_CHARAC, packet(0xaf, [0xff, 0xff])) + await asyncio.sleep(0.05) + +sent = [0] + + +async def scanline(client, data): + if sent[0] == 15: + sent[0] = 0 + response = True + else: + sent[0] += 1 + response = False + response = False + await client.write_gatt_char(CAT_CHARAC, packet(0xa2, data), response) + #await client.write_gatt_char(CAT_CHARAC, packet(0xa1, [1, 0])) + await asyncio.sleep(0.05) + + +async def feedpaper(client, lines=120): + await client.write_gatt_char(CAT_CHARAC, packet(0xa1, [lines, 0])) + await asyncio.sleep(0.02 * lines) + + +def parse_args(): + parser = argparse.ArgumentParser(description='Print stuff on a cat.') + parser.add_argument('--address', help='BlE address of the printer') + parser.add_argument('--text', action='store_true', help='print text from stdin') + parser.add_argument('--feed', action='store_true', help='feed paper') + parser.add_argument('--debug', action='store_true', help='only for debug') + parser.add_argument('--template0', help='print template0') + + return parser.parse_args() + + +async def main(address=None, text=None, feed=False, debug=False, template0=None): + if debug: + for line in sys.stdin: + img = text2img(line) + print('\n'.join(''.join('x' if x else ' ' for x in y) for y in img)) + return + + if address is None: + device = await getcat() + if device is None: + print('No cat printer found :(') + sys.exit(1) + print('Cat printer found!') + print(f'{device.address}') + address = device.address + + client = bleak.BleakClient(address) + await client.connect() + if client is not None: + print('connected!') + await startprint(client) + + if text: + for line in sys.stdin: + if line.startswith('qr:'): + img = qr2img(line[3:]) + else: + img = text2img(line) + for data in img2printable(img): + if data: + await scanline(client, bytes(data)) + else: + await feedpaper(client, lines=1) + elif template0 is not None: + qr, line0, line1, line2 = template0.split(';') + img = get_template0(qr, line0, line1, line2) + for data in img2printable(img): + await scanline(client, bytes(data)) + if feed: + await feedpaper(client) + + +if __name__ == "__main__": + asyncio.run(main(**vars(parse_args()))) diff --git a/cheshire.py b/cheshire.py new file mode 100644 index 0000000..0e3f60a --- /dev/null +++ b/cheshire.py @@ -0,0 +1,164 @@ +import argparse +import asyncio +import os +import socket +import sys +import time + + +import bleak + +CAT_NAME = 'GB03' +CAT_SERVICE = '0000ae30-0000-1000-8000-00805f9b34fb' +CAT_CHARAC = '0000ae01-0000-1000-8000-00805f9b34fb' + +DEFAULT_SOCKET = '/tmp/catprint.s' + + +async def getcat(): + devices = await bleak.BleakScanner.discover() + for d in devices: + print(f'- {d.name}') + if d.name == CAT_NAME: + return d + return None + + +async def getcharac(client): + await client.get_services() + for service in client.services: + if service.uuid != CAT_SERVICE: + continue + for charac in service.characteristics: + if charac.uuid != CAT_CHARAC: + continue + return charac + return None + + +async def endprint(client): + pass + + +CHECKSUM_TABLE = [0, 7, 14, 9, 28, 27, 18, 21, 56, 63, 54, 49, 36, 35, 42, 45, + 112, 119, 126, 121, 108, 107, 98, 101, 72, 79, 70, 65, 84, 83, 90, 93, + 224, 231, 238, 233, 252, 251, 242, 245, 216, 223, 214, 209, 196, 195, + 202, 205, 144, 151, 158, 153, 140, 139, 130, 133, 168, 175, 166, 161, + 180, 179, 186, 189, 199, 192, 201, 206, 219, 220, 213, 210, 255, 248, + 241, 246, 227, 228, 237, 234, 183, 176, 185, 190, 171, 172, 165, 162, + 143, 136, 129, 134, 147, 148, 157, 154, 39, 32, 41, 46, 59, 60, 53, 50, + 31, 24, 17, 22, 3, 4, 13, 10, 87, 80, 89, 94, 75, 76, 69, 66, 111, 104, + 97, 102, 115, 116, 125, 122, 137, 142, 135, 128, 149, 146, 155, 156, + 177, 182, 191, 184, 173, 170, 163, 164, 249, 254, 247, 240, 229, 226, + 235, 236, 193, 198, 207, 200, 221, 218, 211, 212, 105, 110, 103, 96, + 117, 114, 123, 124, 81, 86, 95, 88, 77, 74, 67, 68, 25, 30, 23, 16, 5, + 2, 11, 12, 33, 38, 47, 40, 61, 58, 51, 52, 78, 73, 64, 71, 82, 85, 92, + 91, 118, 113, 120, 127, 106, 109, 100, 99, 62, 57, 48, 55, 34, 37, 44, + 43, 6, 1, 8, 15, 26, 29, 20, 19, 174, 169, 160, 167, 178, 181, 188, + 187, 150, 145, 152, 159, 138, 141, 132, 131, 222, 217, 208, 215, 194, + 197, 204, 203, 230, 225, 232, 239, 250, 253, 244, 243] + + +def checksum(data): + cs = 0 + for d in data: + cs = CHECKSUM_TABLE[cs ^ d] + return cs + + +def packet(command, payload): + return bytes([ + 0x51, 0x78, # magic + command, + 0, # always 0 + len(payload), + 0, # always 0 + ]) + bytes(payload) + bytes([checksum(payload), 0xff]) + + +async def startprint(client): + await client.write_gatt_char(CAT_CHARAC, packet(0xbe, [0])) + await asyncio.sleep(0.05) + await client.write_gatt_char(CAT_CHARAC, packet(0xaf, [0xff, 0xff])) + await asyncio.sleep(0.05) + + +sent = [0] + + +async def scanline(client, data): + if sent[0] == 15: + sent[0] = 0 + response = True + else: + sent[0] += 1 + response = False + response = False + await client.write_gatt_char(CAT_CHARAC, packet(0xa2, data), response) + #await client.write_gatt_char(CAT_CHARAC, packet(0xa1, [1, 0])) + await asyncio.sleep(0.05) + + +async def feedpaper(client, lines=120): + await client.write_gatt_char(CAT_CHARAC, packet(0xa1, [lines, 0])) + await asyncio.sleep(0.02 * lines) + + +def parse_args(): + parser = argparse.ArgumentParser(description='Print stuff on a cat.') + parser.add_argument('--address', help='address of the printer') + parser.add_argument('--socket', dest='socket_path', + help='socket of the daemon', + default=DEFAULT_SOCKET) + + return parser.parse_args() + + +async def main(address=None, socket_path=None): + if address is None: + while True: + device = await getcat() + if device is None: + print('No cat printer found :(') + time.sleep(1) + continue + print('Cat printer found!') + print(f'{device.address}') + address = device.address + break + + while True: + client = bleak.BleakClient(address) + await client.connect() + if client is not None: + print('connected!') + break + else: + print('could not connect') + time.sleep(1) + + await startprint(client) + + if os.path.exists(socket_path): + os.remove(socket_path) + + server = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + server.bind(socket_path) + while True: + datagram, addr = server.recvfrom(1024) + cmd = datagram[0] + print(f'got stuff from {addr}: {cmd}') + start = time.time() + if cmd == ord('f'): # feed + await feedpaper(client) + elif cmd == ord('s'): # scan + data = datagram[1:] + await scanline(client, data) + else: + continue + print(f'done in {time.time() - start} s') + server.sendto(b'done', addr) + + +if __name__ == "__main__": + asyncio.run(main(**vars(parse_args()))) diff --git a/inconsolata.ttf b/inconsolata.ttf new file mode 100644 index 0000000..6532467 Binary files /dev/null and b/inconsolata.ttf differ diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..39d08d7 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +bleak +qrcode +Pillow