Allow graceful exit on SIGTERM and SIGINT

This commit is contained in:
Jan Losinski 2019-01-22 14:36:12 +01:00
parent b576f4356d
commit 046bd77008
4 changed files with 52 additions and 6 deletions

View File

@ -2,6 +2,7 @@
import asyncio
import logging
import signal
import uvloop
import click
@ -51,11 +52,21 @@ def main(host, port, db, interval, debug, tz, special,
tm = timer.Timer(interval=interval, special_times=special, time_zone=tz)
wrk = worker.Worker(loop, db_mngr, tm=tm, mail=mail, debug=debug)
loop.create_task(wrk.run())
wrk.start()
app = interface.App(db_mngr, mail, base_url=url)
loop.create_task(app.register_server(host, port))
def stop():
async def _do_stop():
await asyncio.gather(wrk.stop(), app.stop())
loop.stop()
loop.create_task(_do_stop())
loop.add_signal_handler(signal.SIGINT, stop)
loop.add_signal_handler(signal.SIGTERM, stop)
loop.run_forever()

View File

@ -242,6 +242,7 @@ class App(object):
self.mail = mail
self.base_url = base_url
self.app = web.Application()
self.site = None # type: web.TCPSite
self.setup_app()
self.setup_routes()
@ -271,5 +272,9 @@ class App(object):
async def register_server(self, host: str = None, port: int = None):
app_runner = web.AppRunner(self.app, access_log=logger)
await app_runner.setup()
site = web.TCPSite(app_runner, host, port)
await site.start()
self.site = web.TCPSite(app_runner, host, port)
await self.site.start()
async def stop(self):
if self.site is not None:
await self.site.stop()

View File

@ -48,6 +48,8 @@ class Timer(object):
self._special_times = [] # type: typing.List[TimeStream]
self._time_zone = pytz.timezone(time_zone)
self._sleep_coro = None # type: asyncio.Future
now = self._now()
for line in special_times:
@ -76,7 +78,17 @@ class Timer(object):
elapsed = (end - start).total_seconds()
sleep = max(0.0, self._wait_time(end) - elapsed)
logger.debug("Booking run completed in %0.2f seconds - now sleep for %0.2f seconds", elapsed, sleep)
await asyncio.sleep(sleep)
self._sleep_coro = asyncio.sleep(sleep)
try:
await self._sleep_coro
except asyncio.CancelledError:
pass
finally:
self._sleep_coro = None
def cancel(self):
if self._sleep_coro is not None:
self._sleep_coro.cancel()
if __name__ == '__main__':

View File

@ -94,6 +94,10 @@ class Worker(object):
self._timer = tm
self._mailer = mail
self._debug = debug
self._running = False
self._run_future = None
self._executor = concurrent.futures.ProcessPoolExecutor(max_workers=2)
def _request_queue(self) -> RequestBatchQueue:
@ -189,6 +193,20 @@ class Worker(object):
await self._loop.run_in_executor(None, functools.partial(self.cleanup_old, mail))
async def run(self):
while True:
while self._running:
async with self._timer.timed():
await self._run_once()
if self._running:
await self._run_once()
def start(self):
assert not self._running, "Already running"
self._running = True
self._run_future = self._loop.create_task(self.run())
async def stop(self):
self._running = False
if self._run_future is not None:
await self._run_future
self._run_future = None