zetikettes/backend/old/web.py

269 lines
8.3 KiB
Python
Raw Normal View History

import argparse
2022-07-12 10:04:38 +00:00
from http.server import BaseHTTPRequestHandler, HTTPServer
import io
2022-07-12 10:04:38 +00:00
import json
import logging
2022-07-12 10:04:38 +00:00
import os
import re
import string
import subprocess
import tempfile
import time
import traceback
from makesticker import makesticker
from makeplanche import makeplanche
# defaults
2022-07-12 10:04:38 +00:00
PORT = 8000
OUT_DIR = tempfile.gettempdir()
DATA_DIR = '/data'
TEMPLATE_DIR = DATA_DIR
2022-07-12 10:04:38 +00:00
DEFAULT_DPI = 300
LEMOTDEPASSE = 'je veux ajouter une étiquette'
TIKETTES = os.path.join(DATA_DIR, 'tikettes.json')
config = {}
def tikettes_path():
return config.get('tikettes', TIKETTES)
def template_dir():
return config.get('template_dir', TEMPLATE_DIR)
2022-07-12 10:04:38 +00:00
def inkscapize(svg_in, pdf_out):
png_out = tempfile.mktemp(suffix='.png')
try:
cmd = ['inkscape', '--export-type=png', f'--export-filename={png_out}',
f'--export-dpi={DEFAULT_DPI}', svg_in]
subprocess.check_call(cmd)
cmd = ['convert', png_out, pdf_out]
subprocess.check_call(cmd)
finally:
if os.path.exists(png_out):
os.unlink(png_out)
def handle_newtikette(formdata):
with open(tikettes_path()) as f:
tikettes = json.load(f)
sticker = formdata['title'] + '.svg'
with open(os.path.join(template_dir(), sticker), 'w') as f:
f.write(formdata['sticker'])
known_subs = {
"dluo": "germinal 9999",
"fruit": "80",
"teneur": "50",
"lot": "0000-0",
"qty": "370",
"vol": "50",
}
newtikette = {
'title': formdata['title'],
'sticker': sticker,
'landscape': 'landscape' in formdata,
'subs': {k: v for k, v in known_subs.items() if k in formdata},
}
logging.info(f'adding newtikette: {newtikette}')
tikettes.append(newtikette)
with open(tikettes_path(), 'w') as f:
json.dump(tikettes, f, indent=2)
return json.dumps({'status': 'ok', 'message': 'newtikette added'})
def handle_generate(request):
2022-07-12 10:04:38 +00:00
request = json.loads(request)
# fill in sticker details
sticker_out = tempfile.mktemp(suffix='.svg')
try:
with open(sticker_out, 'w') as stickout:
landscape = request.get('landscape', False)
print(f'landscape: {landscape}')
makesticker(os.path.join(template_dir(), request['sticker']), stickout,
2022-07-12 10:04:38 +00:00
request['subs'], landscape=landscape)
# make sticker sheet
planche_out = tempfile.mktemp(suffix='.svg')
with open(sticker_out, 'r') as stickin:
with open(planche_out, 'w') as planchout:
makeplanche(stickin, planchout)
# process to printable pdf
pdf_out = tempfile.mktemp(dir=OUT_DIR, suffix='.pdf')
inkscapize(planche_out, pdf_out)
response = {'status': 'ok', 'file': os.path.basename(pdf_out),
'message': 'this is the way'}
return json.dumps(response)
finally:
if os.path.exists(sticker_out):
os.unlink(sticker_out)
if 'planche_out' in locals() and os.path.exists(planche_out):
os.unlink(planche_out)
def parse_multipart(data, boundary):
logging.debug(f'parsing data with boundary: {boundary}')
f = io.StringIO(data)
out = []
if boundary not in f.readline():
return out
while True:
headers = []
while True:
h = f.readline().strip()
try:
k, v = re.findall(r'^([a-zA-Z-]+): (.*)$', h)[0]
except IndexError:
break
headers.append((k, v))
metadata = None
otherheaders = []
logging.debug(headers)
for k, v in headers:
if k == 'Content-Disposition':
parts = v.split('; ')
if parts[0] != 'form-data':
continue
d = dict(re.findall(r'([^=]+)="(.*)"$', p)[0] for p in parts[1:])
metadata = d
else:
otherheaders.append((k, v))
value = io.StringIO()
while True:
l = f.readline()
if not l:
break
if boundary in l:
# value.write(l.split(boundary)[0])
break
value.write(l)
if metadata is None:
break
logging.debug(f'({metadata}, {value.getvalue()[:-2]}, {otherheaders})')
out.append((metadata, value.getvalue()[:-2], otherheaders))
return out
2022-07-12 10:04:38 +00:00
class MyServer(BaseHTTPRequestHandler):
def do_GET(self):
logging.info(f'GET {self.path}')
if self.path == '/list':
self.send_response(200)
self.send_header("Content-type", "application/json")
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
with open(tikettes_path()) as f:
tikettes = json.load(f)
resp = {'status': 'ok', 'message': 'this is the way',
'tikettes': tikettes}
self.wfile.write(json.dumps(resp).encode())
return
2022-07-26 15:27:22 +00:00
match = re.match(r'/data/(\w+\.pdf)', self.path)
if match is not None:
pdf_path = os.path.join(OUT_DIR, match.groups()[0])
if not os.path.exists(pdf_path):
2022-07-12 10:04:38 +00:00
self.send_response(404)
self.end_headers()
return
self.send_response(200)
self.send_header("Content-type", "application/pdf")
self.end_headers()
2022-07-26 15:27:22 +00:00
with open(pdf_path, 'rb') as f:
2022-07-12 10:04:38 +00:00
self.wfile.write(f.read())
return
self.send_response(200)
self.send_header("Content-type", "text/html; charset=UTF-8")
self.end_headers()
self.wfile.write(b"<html><head><title>Zetikettes</title></head>")
self.wfile.write(b"<body>")
self.wfile.write("<p>This is not the way.</p>".encode())
self.wfile.write(b"</body></html>")
def do_OPTIONS(self):
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
2022-07-12 10:04:38 +00:00
def do_POST(self):
logging.info(f'POST {self.path}')
2022-07-12 10:04:38 +00:00
try:
length = int(self.headers['content-length'])
req = self.rfile.read(length).decode()
if self.path == '/newtikette':
boundary = re.findall(r'boundary=(.*)$', self.headers['content-type'])[0]
formdata = {p['name']: v for p, v, _ in parse_multipart(req, boundary)}
logging.info({k: v[:100] for k, v in formdata.items()})
if formdata.get('lemotdepasse', None) != LEMOTDEPASSE:
logging.warning(f'wrong lemotdepasse')
self.send_response(403)
self.send_header("Content-type", "application/json")
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps({'status': 'notok', 'message': 'bad lemotdepasse'}).encode())
return
resp = handle_newtikette(formdata).encode()
else:
resp = handle_generate(req).encode()
2022-07-12 10:04:38 +00:00
except:
self.send_response(500)
self.send_header("Content-type", "text/plain")
self.send_header('Access-Control-Allow-Origin', '*')
2022-07-12 10:04:38 +00:00
self.end_headers()
self.wfile.write(traceback.format_exc().encode())
raise
self.send_response(200)
self.send_header("Content-type", "application/json")
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(resp)
def parse_args():
parser = argparse.ArgumentParser(description='Zetikette backend')
2023-06-28 21:27:44 +00:00
parser.add_argument('--port', type=int, default=PORT, help=f'default: {PORT}')
parser.add_argument('--data-dir', default=DATA_DIR, help=f'default: {DATA_DIR}')
return parser.parse_args()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
args = parse_args()
config.update({
'tikettes': os.path.join(args.data_dir, 'tikettes.json'),
'template_dir': args.data_dir,
})
webServer = HTTPServer(('', args.port), MyServer)
print(f"Server started on port {args.port}")
2022-07-12 10:04:38 +00:00
try:
webServer.serve_forever()
except KeyboardInterrupt:
pass
webServer.server_close()
print("Server stopped.")