Allow ggracefl stop of consumer
This commit is contained in:
parent
ae19e9a11d
commit
71c34761b2
|
|
@ -1,5 +1,6 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import datetime
|
||||||
|
|
||||||
from logbook import Logger
|
from logbook import Logger
|
||||||
from sqlalchemy.orm import joinedload
|
from sqlalchemy.orm import joinedload
|
||||||
|
|
@ -17,6 +18,8 @@ class Consumer:
|
||||||
self.sender = sender
|
self.sender = sender
|
||||||
self.running = True
|
self.running = True
|
||||||
|
|
||||||
|
self._wait_fut = None # type: asyncio.Future
|
||||||
|
|
||||||
async def fetch_jobs(self, user):
|
async def fetch_jobs(self, user):
|
||||||
logger.debug("Fetch entries for user {}", user.name)
|
logger.debug("Fetch entries for user {}", user.name)
|
||||||
async for entry in self.wallabag.fetch_entries(user):
|
async for entry in self.wallabag.fetch_entries(user):
|
||||||
|
|
@ -31,8 +34,26 @@ class Consumer:
|
||||||
await self.sender.send_mail(job, data)
|
await self.sender.send_mail(job, data)
|
||||||
session.delete(job)
|
session.delete(job)
|
||||||
|
|
||||||
|
async def _wait_since(self, since: datetime.datetime):
|
||||||
|
now = datetime.datetime.utcnow()
|
||||||
|
wait = max(0.0, self.interval - (now - since).total_seconds())
|
||||||
|
|
||||||
|
if not self.running:
|
||||||
|
return
|
||||||
|
|
||||||
|
self._wait_fut = asyncio.ensure_future(asyncio.sleep(wait))
|
||||||
|
|
||||||
|
try:
|
||||||
|
await self._wait_fut
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
self._wait_fut = None
|
||||||
|
|
||||||
async def consume(self):
|
async def consume(self):
|
||||||
while self.running:
|
while self.running:
|
||||||
|
start = datetime.datetime.utcnow()
|
||||||
|
|
||||||
with self.sessionmaker as session:
|
with self.sessionmaker as session:
|
||||||
logger.debug("Start consume run")
|
logger.debug("Start consume run")
|
||||||
fetches = [self.fetch_jobs(user) for user in session.query(User).filter(User.active == True).all()]
|
fetches = [self.fetch_jobs(user) for user in session.query(User).filter(User.active == True).all()]
|
||||||
|
|
@ -43,5 +64,8 @@ class Consumer:
|
||||||
await asyncio.gather(*jobs)
|
await asyncio.gather(*jobs)
|
||||||
session.commit()
|
session.commit()
|
||||||
|
|
||||||
await asyncio.sleep(self.interval)
|
await self._wait_since(start)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self.running = False
|
||||||
|
self._wait_fut.cancel()
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue