zetikettes/web.py

269 lines
8.3 KiB
Python

import argparse
from http.server import BaseHTTPRequestHandler, HTTPServer
import io
import json
import logging
import os
import re
import string
import subprocess
import tempfile
import time
import traceback
from makesticker import makesticker
from makeplanche import makeplanche
# defaults
PORT = 8000
OUT_DIR = tempfile.gettempdir()
DATA_DIR = '/data'
TEMPLATE_DIR = DATA_DIR
DEFAULT_DPI = 300
LEMOTDEPASSE = 'je veux ajouter une étiquette'
TIKETTES = os.path.join(DATA_DIR, 'tikettes.json')
config = {}
def tikettes_path():
return config.get('tikettes', TIKETTES)
def template_dir():
return config.get('template_dir', TEMPLATE_DIR)
def inkscapize(svg_in, pdf_out):
png_out = tempfile.mktemp(suffix='.png')
try:
cmd = ['inkscape', '--export-type=png', f'--export-filename={png_out}',
f'--export-dpi={DEFAULT_DPI}', svg_in]
subprocess.check_call(cmd)
cmd = ['convert', png_out, pdf_out]
subprocess.check_call(cmd)
finally:
if os.path.exists(png_out):
os.unlink(png_out)
def handle_newtikette(formdata):
with open(tikettes_path()) as f:
tikettes = json.load(f)
sticker = formdata['title'] + '.svg'
with open(os.path.join(template_dir(), sticker), 'w') as f:
f.write(formdata['sticker'])
known_subs = {
"dluo": "germinal 9999",
"fruit": "80",
"teneur": "50",
"lot": "0000-0",
"qty": "370",
"vol": "50",
}
newtikette = {
'title': formdata['title'],
'sticker': sticker,
'landscape': 'landscape' in formdata,
'subs': {k: v for k, v in known_subs.items() if k in formdata},
}
logging.info(f'adding newtikette: {newtikette}')
tikettes.append(newtikette)
with open(tikettes_path(), 'w') as f:
json.dump(tikettes, f, indent=2)
return json.dumps({'status': 'ok', 'message': 'newtikette added'})
def handle_generate(request):
request = json.loads(request)
# fill in sticker details
sticker_out = tempfile.mktemp(suffix='.svg')
try:
with open(sticker_out, 'w') as stickout:
landscape = request.get('landscape', False)
print(f'landscape: {landscape}')
makesticker(os.path.join(template_dir(), request['sticker']), stickout,
request['subs'], landscape=landscape)
# make sticker sheet
planche_out = tempfile.mktemp(suffix='.svg')
with open(sticker_out, 'r') as stickin:
with open(planche_out, 'w') as planchout:
makeplanche(stickin, planchout)
# process to printable pdf
pdf_out = tempfile.mktemp(dir=OUT_DIR, suffix='.pdf')
inkscapize(planche_out, pdf_out)
response = {'status': 'ok', 'file': os.path.basename(pdf_out),
'message': 'this is the way'}
return json.dumps(response)
finally:
if os.path.exists(sticker_out):
os.unlink(sticker_out)
if 'planche_out' in locals() and os.path.exists(planche_out):
os.unlink(planche_out)
def parse_multipart(data, boundary):
logging.debug(f'parsing data with boundary: {boundary}')
f = io.StringIO(data)
out = []
if boundary not in f.readline():
return out
while True:
headers = []
while True:
h = f.readline().strip()
try:
k, v = re.findall(r'^([a-zA-Z-]+): (.*)$', h)[0]
except IndexError:
break
headers.append((k, v))
metadata = None
otherheaders = []
logging.debug(headers)
for k, v in headers:
if k == 'Content-Disposition':
parts = v.split('; ')
if parts[0] != 'form-data':
continue
d = dict(re.findall(r'([^=]+)="(.*)"$', p)[0] for p in parts[1:])
metadata = d
else:
otherheaders.append((k, v))
value = io.StringIO()
while True:
l = f.readline()
if not l:
break
if boundary in l:
# value.write(l.split(boundary)[0])
break
value.write(l)
if metadata is None:
break
logging.debug(f'({metadata}, {value.getvalue()[:-2]}, {otherheaders})')
out.append((metadata, value.getvalue()[:-2], otherheaders))
return out
class MyServer(BaseHTTPRequestHandler):
def do_GET(self):
logging.info(f'GET {self.path}')
if self.path == '/list':
self.send_response(200)
self.send_header("Content-type", "application/json")
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
with open(tikettes_path()) as f:
tikettes = json.load(f)
resp = {'status': 'ok', 'message': 'this is the way',
'tikettes': tikettes}
self.wfile.write(json.dumps(resp).encode())
return
match = re.match(r'/data/(\w+\.pdf)', self.path)
if match is not None:
pdf_path = os.path.join(OUT_DIR, match.groups()[0])
if not os.path.exists(pdf_path):
self.send_response(404)
self.end_headers()
return
self.send_response(200)
self.send_header("Content-type", "application/pdf")
self.end_headers()
with open(pdf_path, 'rb') as f:
self.wfile.write(f.read())
return
self.send_response(200)
self.send_header("Content-type", "text/html; charset=UTF-8")
self.end_headers()
self.wfile.write(b"<html><head><title>Zetikettes</title></head>")
self.wfile.write(b"<body>")
self.wfile.write("<p>This is not the way.</p>".encode())
self.wfile.write(b"</body></html>")
def do_OPTIONS(self):
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
def do_POST(self):
logging.info(f'POST {self.path}')
try:
length = int(self.headers['content-length'])
req = self.rfile.read(length).decode()
if self.path == '/newtikette':
boundary = re.findall(r'boundary=(.*)$', self.headers['content-type'])[0]
formdata = {p['name']: v for p, v, _ in parse_multipart(req, boundary)}
logging.info({k: v[:100] for k, v in formdata.items()})
if formdata.get('lemotdepasse', None) != LEMOTDEPASSE:
logging.warning(f'wrong lemotdepasse')
self.send_response(403)
self.send_header("Content-type", "application/json")
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps({'status': 'notok', 'message': 'bad lemotdepasse'}).encode())
return
resp = handle_newtikette(formdata).encode()
else:
resp = handle_generate(req).encode()
except:
self.send_response(500)
self.send_header("Content-type", "text/plain")
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(traceback.format_exc().encode())
raise
self.send_response(200)
self.send_header("Content-type", "application/json")
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(resp)
def parse_args():
parser = argparse.ArgumentParser(description='Zetikette backend')
parser.add_argument('--port', type=int, default=PORT, help=f'default: {PORT}')
parser.add_argument('--data-dir', default=DATA_DIR, help=f'default: {DATA_DIR}')
return parser.parse_args()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
args = parse_args()
config.update({
'tikettes': os.path.join(args.data_dir, 'tikettes.json'),
'template_dir': args.data_dir,
})
webServer = HTTPServer(('', args.port), MyServer)
print(f"Server started on port {args.port}")
try:
webServer.serve_forever()
except KeyboardInterrupt:
pass
webServer.server_close()
print("Server stopped.")