Use context based session.

Signed-off-by: Jan Losinski <losinski@wh2.tu-dresden.de>
This commit is contained in:
Jan Losinski 2018-03-21 23:50:25 +01:00
parent 8ea47aafa4
commit 57a8851a55
2 changed files with 25 additions and 22 deletions

View File

@ -4,7 +4,7 @@ import asyncio
from logbook import Logger from logbook import Logger
from sqlalchemy.orm import joinedload from sqlalchemy.orm import joinedload
from wallabag_kindle_consumer.models import User, Job, session_maker from wallabag_kindle_consumer.models import User, Job, context_session
logger = Logger(__name__) logger = Logger(__name__)
@ -12,7 +12,7 @@ logger = Logger(__name__)
class Consumer: class Consumer:
def __init__(self, wallabag, cfg, sender): def __init__(self, wallabag, cfg, sender):
self.wallabag = wallabag self.wallabag = wallabag
self.session = session_maker(cfg.db_uri)() self.sessionmaker = context_session(cfg)
self.interval = cfg.consume_interval self.interval = cfg.consume_interval
self.sender = sender self.sender = sender
self.running = True self.running = True
@ -25,22 +25,23 @@ class Consumer:
user.jobs.append(job) user.jobs.append(job)
await self.wallabag.remove_tag(user, entry) await self.wallabag.remove_tag(user, entry)
async def process_job(self, job): async def process_job(self, job, session):
logger.info("Process export for job {id} ({format})", id=job.article, format=job.format) logger.info("Process export for job {id} ({format})", id=job.article, format=job.format)
data = await self.wallabag.export_article(job.user, job.article, job.format) data = await self.wallabag.export_article(job.user, job.article, job.format)
await self.sender.send_mail(job, data) await self.sender.send_mail(job, data)
self.session.delete(job) session.delete(job)
async def consume(self): async def consume(self):
while self.running: while self.running:
logger.info("Start consume run") with self.sessionmaker as session:
fetches = [self.fetch_jobs(user) for user in self.session.query(User).all()] logger.info("Start consume run")
await asyncio.gather(*fetches) fetches = [self.fetch_jobs(user) for user in session.query(User).all()]
self.session.commit() await asyncio.gather(*fetches)
session.commit()
jobs = [self.process_job(job) for job in self.session.query(Job).options(joinedload('user'))] jobs = [self.process_job(job, session) for job in session.query(Job).options(joinedload('user'))]
await asyncio.gather(*jobs) await asyncio.gather(*jobs)
self.session.commit() session.commit()
await asyncio.sleep(self.interval) await asyncio.sleep(self.interval)

View File

@ -3,19 +3,20 @@ from datetime import datetime, timedelta
from logbook import Logger from logbook import Logger
from sqlalchemy import func from sqlalchemy import func
from .models import User, session_maker
from .models import User, context_session
logger = Logger(__name__) logger = Logger(__name__)
class Refresher: class Refresher:
def __init__(self, config, wallabag): def __init__(self, config, wallabag):
self.session = session_maker(config.db_uri)() self.sessionmaker = context_session(config)
self.wallabag = wallabag self.wallabag = wallabag
self.grace = config.refresh_grace self.grace = config.refresh_grace
def _wait_time(self): def _wait_time(self, session):
next = self.session.query(func.min(User.token_valid).label("min")).first() next = session.query(func.min(User.token_valid).label("min")).first()
if next is None or next.min is None: if next is None or next.min is None:
return 3 return 3
delta = next.min - datetime.utcnow() delta = next.min - datetime.utcnow()
@ -27,15 +28,16 @@ class Refresher:
async def refresh(self): async def refresh(self):
while True: while True:
await asyncio.sleep(self._wait_time()) with self.sessionmaker as session:
await asyncio.sleep(self._wait_time(session))
ts = datetime.utcnow() + timedelta(seconds=self.grace) ts = datetime.utcnow() + timedelta(seconds=self.grace)
refreshes = [self._refresh_user(user) for user refreshes = [self._refresh_user(user) for user
in self.session.query(User).filter(User.token_valid < ts).all()] in session.query(User).filter(User.token_valid < ts).all()]
await asyncio.gather(*refreshes) await asyncio.gather(*refreshes)
self.session.commit() session.commit()
self.session.remove() session.remove()
async def _refresh_user(self, user): async def _refresh_user(self, user):
logger.info("Refresh token for {}", user.name) logger.info("Refresh token for {}", user.name)