From 71c34761b2f266ab8f8eda96eb5055883e2c214d Mon Sep 17 00:00:00 2001 From: Jan Losinski Date: Sat, 26 Jan 2019 21:49:59 +0100 Subject: [PATCH] Allow ggracefl stop of consumer --- wallabag_kindle_consumer/consumer.py | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/wallabag_kindle_consumer/consumer.py b/wallabag_kindle_consumer/consumer.py index ae676a3..d11f92c 100644 --- a/wallabag_kindle_consumer/consumer.py +++ b/wallabag_kindle_consumer/consumer.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 import asyncio +import datetime from logbook import Logger from sqlalchemy.orm import joinedload @@ -17,6 +18,8 @@ class Consumer: self.sender = sender self.running = True + self._wait_fut = None # type: asyncio.Future + async def fetch_jobs(self, user): logger.debug("Fetch entries for user {}", user.name) async for entry in self.wallabag.fetch_entries(user): @@ -31,8 +34,26 @@ class Consumer: await self.sender.send_mail(job, data) session.delete(job) + async def _wait_since(self, since: datetime.datetime): + now = datetime.datetime.utcnow() + wait = max(0.0, self.interval - (now - since).total_seconds()) + + if not self.running: + return + + self._wait_fut = asyncio.ensure_future(asyncio.sleep(wait)) + + try: + await self._wait_fut + except asyncio.CancelledError: + pass + finally: + self._wait_fut = None + async def consume(self): while self.running: + start = datetime.datetime.utcnow() + with self.sessionmaker as session: logger.debug("Start consume run") fetches = [self.fetch_jobs(user) for user in session.query(User).filter(User.active == True).all()] @@ -43,5 +64,8 @@ class Consumer: await asyncio.gather(*jobs) session.commit() - await asyncio.sleep(self.interval) + await self._wait_since(start) + def stop(self): + self.running = False + self._wait_fut.cancel()