Implement Batch processing for requests.

Requests for different targets can now processed in parallel. Also, the
requests for a specific target are now cancelled if we did not get a
result in the first place, as its unlikely that we get one for the
second request.
This commit is contained in:
Jan Losinski 2019-01-22 14:10:18 +01:00
parent 9ec6fd60cc
commit b576f4356d
1 changed files with 51 additions and 16 deletions

View File

@ -21,21 +21,37 @@ class _WorkerRequest(typing.NamedTuple):
target: str
class RequestQueue(object):
class RequestBatchQueue(object):
def __init__(self):
self._targets = [] # type: typing.List[str]
self._requests = [] # type: typing.List[_WorkerRequest]
self._requests = {} # type: typing.Dict[str, typing.List[_WorkerRequest]]
def enqueue(self, req: model.Request):
if req.target not in self._requests:
self._targets.append(req.target)
self._requests[req.target] = []
self._requests.append(
self._requests[req.target].append(
_WorkerRequest(req.id, req.data.name, req.data.email, req.target))
def iterate(self) -> typing.Generator[_WorkerRequest, None, None]:
for item in self._requests:
yield item
def batches(self) -> typing.Generator[typing.List[_WorkerRequest], typing.List[_WorkerRequest], None]:
iterators = {key: iter(val) for key, val in self._requests.items()}
exclude = set()
while True:
cur_batch = [req for req
in (next(iterators[target], None)
for target
in self._targets
if target not in exclude)
if req is not None]
if len(cur_batch) == 0:
break
failed = yield cur_batch
if failed is not None:
exclude.update(req.target for req in failed)
def is_empty(self):
return len(self._targets) == 0
@ -46,7 +62,7 @@ class RequestQueue(object):
@property
def request_cnt(self):
return len(self._requests)
return sum(len(queue) for queue in self._requests.values())
def _book(req: _WorkerRequest, debug=False) -> typing.Optional[scraper.BookingResult]:
@ -78,9 +94,9 @@ class Worker(object):
self._timer = tm
self._mailer = mail
self._debug = debug
self._executor = concurrent.futures.ProcessPoolExecutor(max_workers=1)
self._executor = concurrent.futures.ProcessPoolExecutor(max_workers=2)
def _request_queue(self) -> RequestQueue:
def _request_queue(self) -> RequestBatchQueue:
with self._db.make_session_context() as session:
active_targets = session.query(model.Request.target.label("target"),
func.min(model.Request.created).label("min_c")) \
@ -97,7 +113,7 @@ class Worker(object):
.filter(model.Request.data != None) \
.order_by(model.Request.id)
requests = RequestQueue()
requests = RequestBatchQueue()
for req in qry:
requests.enqueue(req)
@ -135,6 +151,15 @@ class Worker(object):
session.commit()
async def _process_request(self, request, mail):
booking = await self._loop.run_in_executor(self._executor,
functools.partial(_book, request, debug=self._debug))
if booking is not None:
mail.send_success_email(request.email, booking)
return request.id
return None
async def _run_once(self):
requests = await self._loop.run_in_executor(None, self._request_queue)
@ -143,12 +168,22 @@ class Worker(object):
booked_ids = []
async with self._mailer.start_queue() as mail:
for req in requests.iterate():
booking = await self._loop.run_in_executor(self._executor,
functools.partial(_book, req, debug=self._debug))
if booking is not None:
booked_ids.append(req.id)
mail.send_success_email(req.email, booking)
request_batch_generator = requests.batches()
request_batch = next(request_batch_generator, None)
while request_batch is not None and len(request_batch) > 0:
booked_result = await asyncio.gather(*[self._process_request(req, mail)
for req in request_batch])
booked_id_set = set([rid for rid in booked_result if rid is not None])
booked_ids.extend(booked_id_set)
try:
request_batch = request_batch_generator.send([req for req
in request_batch
if req.id not in booked_id_set])
except StopIteration:
break
await self._loop.run_in_executor(None, functools.partial(self.cleanup_booked, booked_ids))
await self._loop.run_in_executor(None, functools.partial(self.cleanup_old, mail))