From 046bd770087f6ae16a323993e07c1c7fff7f8610 Mon Sep 17 00:00:00 2001 From: Jan Losinski Date: Tue, 22 Jan 2019 14:36:12 +0100 Subject: [PATCH] Allow graceful exit on SIGTERM and SIGINT --- booking_service.py | 13 ++++++++++++- punkow/service/interface.py | 9 +++++++-- punkow/service/timer.py | 14 +++++++++++++- punkow/service/worker.py | 22 ++++++++++++++++++++-- 4 files changed, 52 insertions(+), 6 deletions(-) diff --git a/booking_service.py b/booking_service.py index 0177211..65951de 100755 --- a/booking_service.py +++ b/booking_service.py @@ -2,6 +2,7 @@ import asyncio import logging +import signal import uvloop import click @@ -51,11 +52,21 @@ def main(host, port, db, interval, debug, tz, special, tm = timer.Timer(interval=interval, special_times=special, time_zone=tz) wrk = worker.Worker(loop, db_mngr, tm=tm, mail=mail, debug=debug) - loop.create_task(wrk.run()) + wrk.start() app = interface.App(db_mngr, mail, base_url=url) loop.create_task(app.register_server(host, port)) + def stop(): + async def _do_stop(): + await asyncio.gather(wrk.stop(), app.stop()) + loop.stop() + + loop.create_task(_do_stop()) + + loop.add_signal_handler(signal.SIGINT, stop) + loop.add_signal_handler(signal.SIGTERM, stop) + loop.run_forever() diff --git a/punkow/service/interface.py b/punkow/service/interface.py index ed9f777..bbcd066 100644 --- a/punkow/service/interface.py +++ b/punkow/service/interface.py @@ -242,6 +242,7 @@ class App(object): self.mail = mail self.base_url = base_url self.app = web.Application() + self.site = None # type: web.TCPSite self.setup_app() self.setup_routes() @@ -271,5 +272,9 @@ class App(object): async def register_server(self, host: str = None, port: int = None): app_runner = web.AppRunner(self.app, access_log=logger) await app_runner.setup() - site = web.TCPSite(app_runner, host, port) - await site.start() + self.site = web.TCPSite(app_runner, host, port) + await self.site.start() + + async def stop(self): + if self.site is not None: + await self.site.stop() diff --git a/punkow/service/timer.py b/punkow/service/timer.py index bf70718..301be42 100644 --- a/punkow/service/timer.py +++ b/punkow/service/timer.py @@ -48,6 +48,8 @@ class Timer(object): self._special_times = [] # type: typing.List[TimeStream] self._time_zone = pytz.timezone(time_zone) + self._sleep_coro = None # type: asyncio.Future + now = self._now() for line in special_times: @@ -76,7 +78,17 @@ class Timer(object): elapsed = (end - start).total_seconds() sleep = max(0.0, self._wait_time(end) - elapsed) logger.debug("Booking run completed in %0.2f seconds - now sleep for %0.2f seconds", elapsed, sleep) - await asyncio.sleep(sleep) + self._sleep_coro = asyncio.sleep(sleep) + try: + await self._sleep_coro + except asyncio.CancelledError: + pass + finally: + self._sleep_coro = None + + def cancel(self): + if self._sleep_coro is not None: + self._sleep_coro.cancel() if __name__ == '__main__': diff --git a/punkow/service/worker.py b/punkow/service/worker.py index 53fb56d..e9d8663 100644 --- a/punkow/service/worker.py +++ b/punkow/service/worker.py @@ -94,6 +94,10 @@ class Worker(object): self._timer = tm self._mailer = mail self._debug = debug + + self._running = False + self._run_future = None + self._executor = concurrent.futures.ProcessPoolExecutor(max_workers=2) def _request_queue(self) -> RequestBatchQueue: @@ -189,6 +193,20 @@ class Worker(object): await self._loop.run_in_executor(None, functools.partial(self.cleanup_old, mail)) async def run(self): - while True: + while self._running: async with self._timer.timed(): - await self._run_once() + if self._running: + await self._run_once() + + def start(self): + assert not self._running, "Already running" + self._running = True + + self._run_future = self._loop.create_task(self.run()) + + async def stop(self): + self._running = False + + if self._run_future is not None: + await self._run_future + self._run_future = None