From 9cdb9877bc1180d2f82fb957261ebd62588809b2 Mon Sep 17 00:00:00 2001 From: Jan Losinski Date: Mon, 21 Jan 2019 18:35:11 +0100 Subject: [PATCH] Integrate mailer into booking service --- booking_service.py | 19 ++++++++++++++++--- punkow/service/mailer.py | 4 ++-- punkow/service/worker.py | 36 ++++++++++++++++++++---------------- 3 files changed, 38 insertions(+), 21 deletions(-) diff --git a/booking_service.py b/booking_service.py index f3c76b1..6cc504e 100755 --- a/booking_service.py +++ b/booking_service.py @@ -6,7 +6,7 @@ import logging import uvloop import click -from punkow.service import interface, model, worker, timer +from punkow.service import interface, model, worker, timer, mailer @click.command() @@ -15,9 +15,17 @@ from punkow.service import interface, model, worker, timer @click.option("--db", default="sqlite:////tmp/punkow.db", help="The database uri") @click.option("--interval", default=50 * 5, type=int, help="The interval in which the worker should operate") @click.option("--debug", is_flag=True, help="Run in debug mode") +@click.option("--mail-from", required=True, help="The Email Address to send the mails from") +@click.option("--mail-host", required=True, help="The Email SMTP Host") +@click.option("--mail-port", default=587, type=int, help="The Email SMTP Port") +@click.option("--mail-tls", is_flag=True, help="Use TLS for SMTP") +@click.option("--mail-user", default=None, help="The Email SMTP password") +@click.option("--mail-passwd", default=None, help="The Email SMTP username") +@click.option("--domain", required=True, help="The domain this service is running on") @click.option("--tz", default="CET", help="Timezone to use for special times") @click.option("--special", help="special time where the interval should be increased", multiple=True) -def main(host, port, db, interval, debug, tz, special): +def main(host, port, db, interval, debug, tz, special, + mail_from, mail_host, mail_port, mail_tls, mail_user, mail_passwd, domain): asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) loop = asyncio.get_event_loop() @@ -35,9 +43,14 @@ def main(host, port, db, interval, debug, tz, special): db_mngr = model.DatabaseManager(db) db_mngr.create_schema() + url = f"https://{domain}" + mail_cfg = mailer.MailConfig(from_addr=mail_from, host=mail_host, port=mail_port, tls=mail_tls, + user=mail_user, passwd=mail_passwd) + mail = mailer.Mailer(loop=loop, config=mail_cfg, base_url=url) + tm = timer.Timer(interval=interval, special_times=special, time_zone=tz) - wrk = worker.Worker(loop, db_mngr, tm=tm, debug=debug) + wrk = worker.Worker(loop, db_mngr, tm=tm, mail=mail, debug=debug) loop.create_task(wrk.run()) app = interface.App(db_mngr) diff --git a/punkow/service/mailer.py b/punkow/service/mailer.py index 2c32f52..5d08740 100644 --- a/punkow/service/mailer.py +++ b/punkow/service/mailer.py @@ -102,7 +102,7 @@ class AsyncMailQueue(object): self._mailer = mailer self._queue = [] - async def __aenter__(self): + async def __aenter__(self) -> AsyncMailQueue: return self async def __aexit__(self, exc_type, exc_val, exc_tb): @@ -110,7 +110,7 @@ class AsyncMailQueue(object): await asyncio.gather(*self._queue) def _append_task(self, coro): - self._queue.append(self._loop.create_task(coro)) + self._queue.append(asyncio.run_coroutine_threadsafe(coro, self._loop)) def send_success_email(self, email, booking: scraper.BookingResult): self._append_task(self._mailer.send_success_email(email, booking)) diff --git a/punkow/service/worker.py b/punkow/service/worker.py index c02ff34..3f9ac86 100644 --- a/punkow/service/worker.py +++ b/punkow/service/worker.py @@ -8,7 +8,7 @@ import typing from sqlalchemy import func from sqlalchemy.orm import joinedload -from . import model, timer +from . import model, timer, mailer from .. import scraper logger = logging.getLogger(__name__) @@ -49,7 +49,7 @@ class RequestQueue(object): return len(self._requests) -def _book(req: _WorkerRequest, debug=False) -> typing.Dict[int, scraper.BookingResult]: +def _book(req: _WorkerRequest, debug=False) -> typing.Optional[scraper.BookingResult]: data = scraper.BookingData(name=req.name, email=req.email) target = req.target if req.target.startswith(scraper.BASE_URL): @@ -57,26 +57,26 @@ def _book(req: _WorkerRequest, debug=False) -> typing.Dict[int, scraper.BookingR logger.debug("Try to book one appointment for %s", target) - bookings = {} # type: typing.Dict[int, scraper.BookingResult] try: svc = scraper.BookingService(target, debug=debug, hide_sensitive_data=True) booked = svc.book(data) if booked is not None: - bookings[req.id] = booked + logger.info("Booked an appointments for %s", target) + return booked except: logger.exception("Exception while booking") - logger.info("Booked an appointments for %s", target) - - return bookings + return None class Worker(object): - def __init__(self, loop: asyncio.AbstractEventLoop, db: model.DatabaseManager, tm: timer.Timer, debug=True): + def __init__(self, loop: asyncio.AbstractEventLoop, db: model.DatabaseManager, + tm: timer.Timer, mail: mailer.Mailer, debug=True): self._loop = loop self._db = db self._timer = tm + self._mailer = mail self._debug = debug self._executor = concurrent.futures.ProcessPoolExecutor(max_workers=1) @@ -118,7 +118,7 @@ class Worker(object): session.commit() - def cleanup_old(self): + def cleanup_old(self, mail_queue: mailer.AsyncMailQueue): cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=20) with self._db.make_session_context() as session: qry = session.query(model.Request) \ @@ -130,6 +130,7 @@ class Worker(object): item.resolved = datetime.datetime.utcnow() item.state = "timeout" if item.data is not None: + mail_queue.send_cancel_email(item.data.email, item.key) session.delete(item.data) session.commit() @@ -140,14 +141,17 @@ class Worker(object): if requests.is_empty(): return - booked = [] - for req in requests.iterate(): - booking = await self._loop.run_in_executor(self._executor, - functools.partial(_book, req, debug=self._debug)) - booked.extend(booking) + booked_ids = [] + async with self._mailer.start_queue() as mail: + for req in requests.iterate(): + booking = await self._loop.run_in_executor(self._executor, + functools.partial(_book, req, debug=self._debug)) + if booking is not None: + booked_ids.append(req.id) + mail.send_success_email(req.email, booking) - await self._loop.run_in_executor(None, functools.partial(self.cleanup_booked, booked)) - await self._loop.run_in_executor(None, self.cleanup_old) + await self._loop.run_in_executor(None, functools.partial(self.cleanup_booked, booked_ids)) + await self._loop.run_in_executor(None, self.cleanup_old) async def run(self): while True: