import argparse from http.server import BaseHTTPRequestHandler, HTTPServer import io import json import logging import os import re import string import subprocess import tempfile import time import traceback from makesticker import makesticker from makeplanche import makeplanche # defaults PORT = 8000 OUT_DIR = tempfile.gettempdir() DATA_DIR = '/data' TEMPLATE_DIR = DATA_DIR DEFAULT_DPI = 300 LEMOTDEPASSE = 'je veux ajouter une étiquette' TIKETTES = os.path.join(DATA_DIR, 'tikettes.json') config = {} def tikettes_path(): return config.get('tikettes', TIKETTES) def template_dir(): return config.get('template_dir', TEMPLATE_DIR) def inkscapize(svg_in, pdf_out): png_out = tempfile.mktemp(suffix='.png') try: cmd = ['inkscape', '--export-type=png', f'--export-filename={png_out}', f'--export-dpi={DEFAULT_DPI}', svg_in] subprocess.check_call(cmd) cmd = ['convert', png_out, pdf_out] subprocess.check_call(cmd) finally: if os.path.exists(png_out): os.unlink(png_out) def handle_newtikette(formdata): with open(tikettes_path()) as f: tikettes = json.load(f) sticker = formdata['title'] + '.svg' with open(os.path.join(template_dir(), sticker), 'w') as f: f.write(formdata['sticker']) known_subs = { "dluo": "germinal 9999", "fruit": "80", "teneur": "50", "lot": "0000-0", "qty": "370", "vol": "50", } newtikette = { 'title': formdata['title'], 'sticker': sticker, 'landscape': 'landscape' in formdata, 'subs': {k: v for k, v in known_subs.items() if k in formdata}, } logging.info(f'adding newtikette: {newtikette}') tikettes.append(newtikette) with open(tikettes_path(), 'w') as f: json.dump(tikettes, f, indent=2) return json.dumps({'status': 'ok', 'message': 'newtikette added'}) def handle_generate(request): request = json.loads(request) # fill in sticker details sticker_out = tempfile.mktemp(suffix='.svg') try: with open(sticker_out, 'w') as stickout: landscape = request.get('landscape', False) print(f'landscape: {landscape}') makesticker(os.path.join(template_dir(), request['sticker']), stickout, request['subs'], landscape=landscape) # make sticker sheet planche_out = tempfile.mktemp(suffix='.svg') with open(sticker_out, 'r') as stickin: with open(planche_out, 'w') as planchout: makeplanche(stickin, planchout) # process to printable pdf pdf_out = tempfile.mktemp(dir=OUT_DIR, suffix='.pdf') inkscapize(planche_out, pdf_out) response = {'status': 'ok', 'file': os.path.basename(pdf_out), 'message': 'this is the way'} return json.dumps(response) finally: if os.path.exists(sticker_out): os.unlink(sticker_out) if 'planche_out' in locals() and os.path.exists(planche_out): os.unlink(planche_out) def parse_multipart(data, boundary): logging.debug(f'parsing data with boundary: {boundary}') f = io.StringIO(data) out = [] if boundary not in f.readline(): return out while True: headers = [] while True: h = f.readline().strip() try: k, v = re.findall(r'^([a-zA-Z-]+): (.*)$', h)[0] except IndexError: break headers.append((k, v)) metadata = None otherheaders = [] logging.debug(headers) for k, v in headers: if k == 'Content-Disposition': parts = v.split('; ') if parts[0] != 'form-data': continue d = dict(re.findall(r'([^=]+)="(.*)"$', p)[0] for p in parts[1:]) metadata = d else: otherheaders.append((k, v)) value = io.StringIO() while True: l = f.readline() if not l: break if boundary in l: # value.write(l.split(boundary)[0]) break value.write(l) if metadata is None: break logging.debug(f'({metadata}, {value.getvalue()[:-2]}, {otherheaders})') out.append((metadata, value.getvalue()[:-2], otherheaders)) return out class MyServer(BaseHTTPRequestHandler): def do_GET(self): logging.info(f'GET {self.path}') if self.path == '/list': self.send_response(200) self.send_header("Content-type", "application/json") self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() with open(tikettes_path()) as f: tikettes = json.load(f) resp = {'status': 'ok', 'message': 'this is the way', 'tikettes': tikettes} self.wfile.write(json.dumps(resp).encode()) return match = re.match(r'/data/(\w+\.pdf)', self.path) if match is not None: pdf_path = os.path.join(OUT_DIR, match.groups()[0]) if not os.path.exists(pdf_path): self.send_response(404) self.end_headers() return self.send_response(200) self.send_header("Content-type", "application/pdf") self.end_headers() with open(pdf_path, 'rb') as f: self.wfile.write(f.read()) return self.send_response(200) self.send_header("Content-type", "text/html; charset=UTF-8") self.end_headers() self.wfile.write(b"Zetikettes") self.wfile.write(b"") self.wfile.write("

This is not the way.

".encode()) self.wfile.write(b"") def do_OPTIONS(self): self.send_response(200) self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() def do_POST(self): logging.info(f'POST {self.path}') try: length = int(self.headers['content-length']) req = self.rfile.read(length).decode() if self.path == '/newtikette': boundary = re.findall(r'boundary=(.*)$', self.headers['content-type'])[0] formdata = {p['name']: v for p, v, _ in parse_multipart(req, boundary)} logging.info({k: v[:100] for k, v in formdata.items()}) if formdata.get('lemotdepasse', None) != LEMOTDEPASSE: logging.warning(f'wrong lemotdepasse') self.send_response(403) self.send_header("Content-type", "application/json") self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps({'status': 'notok', 'message': 'bad lemotdepasse'}).encode()) return resp = handle_newtikette(formdata).encode() else: resp = handle_generate(req).encode() except: self.send_response(500) self.send_header("Content-type", "text/plain") self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(traceback.format_exc().encode()) raise self.send_response(200) self.send_header("Content-type", "application/json") self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(resp) def parse_args(): parser = argparse.ArgumentParser(description='Zetikette backend') parser.add_argument('--port', type=int, default=PORT, help=f'default: {PORT}') parser.add_argument('--data-dir', default=DATA_DIR, help=f'default: {DATA_DIR}') return parser.parse_args() if __name__ == "__main__": logging.basicConfig(level=logging.INFO) args = parse_args() config.update({ 'tikettes': os.path.join(args.data_dir, 'tikettes.json'), 'template_dir': args.data_dir, }) webServer = HTTPServer(('', args.port), MyServer) print(f"Server started on port {args.port}") try: webServer.serve_forever() except KeyboardInterrupt: pass webServer.server_close() print("Server stopped.")