From b576f4356d6f4f70ceceb0cc941a75a5ebb7b1b0 Mon Sep 17 00:00:00 2001 From: Jan Losinski Date: Tue, 22 Jan 2019 14:10:18 +0100 Subject: [PATCH] Implement Batch processing for requests. Requests for different targets can now processed in parallel. Also, the requests for a specific target are now cancelled if we did not get a result in the first place, as its unlikely that we get one for the second request. --- punkow/service/worker.py | 67 ++++++++++++++++++++++++++++++---------- 1 file changed, 51 insertions(+), 16 deletions(-) diff --git a/punkow/service/worker.py b/punkow/service/worker.py index d72a01d..53fb56d 100644 --- a/punkow/service/worker.py +++ b/punkow/service/worker.py @@ -21,21 +21,37 @@ class _WorkerRequest(typing.NamedTuple): target: str -class RequestQueue(object): +class RequestBatchQueue(object): def __init__(self): self._targets = [] # type: typing.List[str] - self._requests = [] # type: typing.List[_WorkerRequest] + self._requests = {} # type: typing.Dict[str, typing.List[_WorkerRequest]] def enqueue(self, req: model.Request): if req.target not in self._requests: self._targets.append(req.target) + self._requests[req.target] = [] - self._requests.append( + self._requests[req.target].append( _WorkerRequest(req.id, req.data.name, req.data.email, req.target)) - def iterate(self) -> typing.Generator[_WorkerRequest, None, None]: - for item in self._requests: - yield item + def batches(self) -> typing.Generator[typing.List[_WorkerRequest], typing.List[_WorkerRequest], None]: + iterators = {key: iter(val) for key, val in self._requests.items()} + + exclude = set() + + while True: + cur_batch = [req for req + in (next(iterators[target], None) + for target + in self._targets + if target not in exclude) + if req is not None] + if len(cur_batch) == 0: + break + + failed = yield cur_batch + if failed is not None: + exclude.update(req.target for req in failed) def is_empty(self): return len(self._targets) == 0 @@ -46,7 +62,7 @@ class RequestQueue(object): @property def request_cnt(self): - return len(self._requests) + return sum(len(queue) for queue in self._requests.values()) def _book(req: _WorkerRequest, debug=False) -> typing.Optional[scraper.BookingResult]: @@ -78,9 +94,9 @@ class Worker(object): self._timer = tm self._mailer = mail self._debug = debug - self._executor = concurrent.futures.ProcessPoolExecutor(max_workers=1) + self._executor = concurrent.futures.ProcessPoolExecutor(max_workers=2) - def _request_queue(self) -> RequestQueue: + def _request_queue(self) -> RequestBatchQueue: with self._db.make_session_context() as session: active_targets = session.query(model.Request.target.label("target"), func.min(model.Request.created).label("min_c")) \ @@ -97,7 +113,7 @@ class Worker(object): .filter(model.Request.data != None) \ .order_by(model.Request.id) - requests = RequestQueue() + requests = RequestBatchQueue() for req in qry: requests.enqueue(req) @@ -135,6 +151,15 @@ class Worker(object): session.commit() + async def _process_request(self, request, mail): + booking = await self._loop.run_in_executor(self._executor, + functools.partial(_book, request, debug=self._debug)) + if booking is not None: + mail.send_success_email(request.email, booking) + return request.id + + return None + async def _run_once(self): requests = await self._loop.run_in_executor(None, self._request_queue) @@ -143,12 +168,22 @@ class Worker(object): booked_ids = [] async with self._mailer.start_queue() as mail: - for req in requests.iterate(): - booking = await self._loop.run_in_executor(self._executor, - functools.partial(_book, req, debug=self._debug)) - if booking is not None: - booked_ids.append(req.id) - mail.send_success_email(req.email, booking) + request_batch_generator = requests.batches() + request_batch = next(request_batch_generator, None) + + while request_batch is not None and len(request_batch) > 0: + + booked_result = await asyncio.gather(*[self._process_request(req, mail) + for req in request_batch]) + + booked_id_set = set([rid for rid in booked_result if rid is not None]) + booked_ids.extend(booked_id_set) + try: + request_batch = request_batch_generator.send([req for req + in request_batch + if req.id not in booked_id_set]) + except StopIteration: + break await self._loop.run_in_executor(None, functools.partial(self.cleanup_booked, booked_ids)) await self._loop.run_in_executor(None, functools.partial(self.cleanup_old, mail))