mirror of https://github.com/janLo/punkow
Integrate mailer into booking service
This commit is contained in:
parent
ba5037d512
commit
9cdb9877bc
|
|
@ -6,7 +6,7 @@ import logging
|
|||
import uvloop
|
||||
import click
|
||||
|
||||
from punkow.service import interface, model, worker, timer
|
||||
from punkow.service import interface, model, worker, timer, mailer
|
||||
|
||||
|
||||
@click.command()
|
||||
|
|
@ -15,9 +15,17 @@ from punkow.service import interface, model, worker, timer
|
|||
@click.option("--db", default="sqlite:////tmp/punkow.db", help="The database uri")
|
||||
@click.option("--interval", default=50 * 5, type=int, help="The interval in which the worker should operate")
|
||||
@click.option("--debug", is_flag=True, help="Run in debug mode")
|
||||
@click.option("--mail-from", required=True, help="The Email Address to send the mails from")
|
||||
@click.option("--mail-host", required=True, help="The Email SMTP Host")
|
||||
@click.option("--mail-port", default=587, type=int, help="The Email SMTP Port")
|
||||
@click.option("--mail-tls", is_flag=True, help="Use TLS for SMTP")
|
||||
@click.option("--mail-user", default=None, help="The Email SMTP password")
|
||||
@click.option("--mail-passwd", default=None, help="The Email SMTP username")
|
||||
@click.option("--domain", required=True, help="The domain this service is running on")
|
||||
@click.option("--tz", default="CET", help="Timezone to use for special times")
|
||||
@click.option("--special", help="special time where the interval should be increased", multiple=True)
|
||||
def main(host, port, db, interval, debug, tz, special):
|
||||
def main(host, port, db, interval, debug, tz, special,
|
||||
mail_from, mail_host, mail_port, mail_tls, mail_user, mail_passwd, domain):
|
||||
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
|
|
@ -35,9 +43,14 @@ def main(host, port, db, interval, debug, tz, special):
|
|||
db_mngr = model.DatabaseManager(db)
|
||||
db_mngr.create_schema()
|
||||
|
||||
url = f"https://{domain}"
|
||||
mail_cfg = mailer.MailConfig(from_addr=mail_from, host=mail_host, port=mail_port, tls=mail_tls,
|
||||
user=mail_user, passwd=mail_passwd)
|
||||
mail = mailer.Mailer(loop=loop, config=mail_cfg, base_url=url)
|
||||
|
||||
tm = timer.Timer(interval=interval, special_times=special, time_zone=tz)
|
||||
|
||||
wrk = worker.Worker(loop, db_mngr, tm=tm, debug=debug)
|
||||
wrk = worker.Worker(loop, db_mngr, tm=tm, mail=mail, debug=debug)
|
||||
loop.create_task(wrk.run())
|
||||
|
||||
app = interface.App(db_mngr)
|
||||
|
|
|
|||
|
|
@ -102,7 +102,7 @@ class AsyncMailQueue(object):
|
|||
self._mailer = mailer
|
||||
self._queue = []
|
||||
|
||||
async def __aenter__(self):
|
||||
async def __aenter__(self) -> AsyncMailQueue:
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||
|
|
@ -110,7 +110,7 @@ class AsyncMailQueue(object):
|
|||
await asyncio.gather(*self._queue)
|
||||
|
||||
def _append_task(self, coro):
|
||||
self._queue.append(self._loop.create_task(coro))
|
||||
self._queue.append(asyncio.run_coroutine_threadsafe(coro, self._loop))
|
||||
|
||||
def send_success_email(self, email, booking: scraper.BookingResult):
|
||||
self._append_task(self._mailer.send_success_email(email, booking))
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import typing
|
|||
from sqlalchemy import func
|
||||
from sqlalchemy.orm import joinedload
|
||||
|
||||
from . import model, timer
|
||||
from . import model, timer, mailer
|
||||
from .. import scraper
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -49,7 +49,7 @@ class RequestQueue(object):
|
|||
return len(self._requests)
|
||||
|
||||
|
||||
def _book(req: _WorkerRequest, debug=False) -> typing.Dict[int, scraper.BookingResult]:
|
||||
def _book(req: _WorkerRequest, debug=False) -> typing.Optional[scraper.BookingResult]:
|
||||
data = scraper.BookingData(name=req.name, email=req.email)
|
||||
target = req.target
|
||||
if req.target.startswith(scraper.BASE_URL):
|
||||
|
|
@ -57,26 +57,26 @@ def _book(req: _WorkerRequest, debug=False) -> typing.Dict[int, scraper.BookingR
|
|||
|
||||
logger.debug("Try to book one appointment for %s", target)
|
||||
|
||||
bookings = {} # type: typing.Dict[int, scraper.BookingResult]
|
||||
try:
|
||||
svc = scraper.BookingService(target, debug=debug, hide_sensitive_data=True)
|
||||
booked = svc.book(data)
|
||||
if booked is not None:
|
||||
bookings[req.id] = booked
|
||||
logger.info("Booked an appointments for %s", target)
|
||||
return booked
|
||||
except:
|
||||
logger.exception("Exception while booking")
|
||||
|
||||
logger.info("Booked an appointments for %s", target)
|
||||
|
||||
return bookings
|
||||
return None
|
||||
|
||||
|
||||
class Worker(object):
|
||||
|
||||
def __init__(self, loop: asyncio.AbstractEventLoop, db: model.DatabaseManager, tm: timer.Timer, debug=True):
|
||||
def __init__(self, loop: asyncio.AbstractEventLoop, db: model.DatabaseManager,
|
||||
tm: timer.Timer, mail: mailer.Mailer, debug=True):
|
||||
self._loop = loop
|
||||
self._db = db
|
||||
self._timer = tm
|
||||
self._mailer = mail
|
||||
self._debug = debug
|
||||
self._executor = concurrent.futures.ProcessPoolExecutor(max_workers=1)
|
||||
|
||||
|
|
@ -118,7 +118,7 @@ class Worker(object):
|
|||
|
||||
session.commit()
|
||||
|
||||
def cleanup_old(self):
|
||||
def cleanup_old(self, mail_queue: mailer.AsyncMailQueue):
|
||||
cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=20)
|
||||
with self._db.make_session_context() as session:
|
||||
qry = session.query(model.Request) \
|
||||
|
|
@ -130,6 +130,7 @@ class Worker(object):
|
|||
item.resolved = datetime.datetime.utcnow()
|
||||
item.state = "timeout"
|
||||
if item.data is not None:
|
||||
mail_queue.send_cancel_email(item.data.email, item.key)
|
||||
session.delete(item.data)
|
||||
|
||||
session.commit()
|
||||
|
|
@ -140,14 +141,17 @@ class Worker(object):
|
|||
if requests.is_empty():
|
||||
return
|
||||
|
||||
booked = []
|
||||
for req in requests.iterate():
|
||||
booking = await self._loop.run_in_executor(self._executor,
|
||||
functools.partial(_book, req, debug=self._debug))
|
||||
booked.extend(booking)
|
||||
booked_ids = []
|
||||
async with self._mailer.start_queue() as mail:
|
||||
for req in requests.iterate():
|
||||
booking = await self._loop.run_in_executor(self._executor,
|
||||
functools.partial(_book, req, debug=self._debug))
|
||||
if booking is not None:
|
||||
booked_ids.append(req.id)
|
||||
mail.send_success_email(req.email, booking)
|
||||
|
||||
await self._loop.run_in_executor(None, functools.partial(self.cleanup_booked, booked))
|
||||
await self._loop.run_in_executor(None, self.cleanup_old)
|
||||
await self._loop.run_in_executor(None, functools.partial(self.cleanup_booked, booked_ids))
|
||||
await self._loop.run_in_executor(None, self.cleanup_old)
|
||||
|
||||
async def run(self):
|
||||
while True:
|
||||
|
|
|
|||
Loading…
Reference in New Issue