From b59f8440fd95ab2d36900833613b953ca55a07dd Mon Sep 17 00:00:00 2001 From: Jan Losinski Date: Sun, 6 Jan 2019 03:09:15 +0100 Subject: [PATCH] Add first version of the scraper --- punkow/scraper.py | 220 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 220 insertions(+) create mode 100644 punkow/scraper.py diff --git a/punkow/scraper.py b/punkow/scraper.py new file mode 100644 index 0000000..8376549 --- /dev/null +++ b/punkow/scraper.py @@ -0,0 +1,220 @@ +import argparse +import contextlib +import datetime +import logging +import time + +import requests + +from bs4 import BeautifulSoup + +logger = logging.getLogger(__name__) + +BASE_URL = 'https://service.berlin.de' +START_URL = '/terminvereinbarung/termin/tag.php?termin=1&dienstleister=122301&anliegen[]=120686&herkunft=1' # anmeldung wohnung pankow +# START_URL = '/terminvereinbarung/termin/tag.php?termin=1&dienstleister=327427&anliegen[]=318998&herkunft=1' # einbürgerung pankow +USER_AGENT = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36' +BLANK_URL = '/terminvereinbarung/termin/blank.png' +REGISTER_URL = "/terminvereinbarung/termin/register/" +ABORT_URL = '/terminvereinbarung/termin/abort/' +MANAGE_URL = '/terminvereinbarung/termin/manage/' + + +def print_url(r, *args, **kwargs): + logger.debug("loaded: %s - status: %d", r.url, r.status_code) + logger.debug("request headers: %s", r.request.headers) + logger.debug("response headers: %s", r.headers) + + +class BookingService(object): + def __init__(self, start_url, debug=True): + self.session = None + self.start_url = start_url + self.debug = debug + + self._init_session() + + def _init_session(self): + if self.session is not None: + self.session.close() + + self.session = requests.session() + self.session.headers.update({'User-Agent': USER_AGENT, }) + if self.debug: + self.session.hooks.update({'response': print_url}) + + @contextlib.contextmanager + def _local_referrer(self): + old_referrer = self.session.headers.get('Referrer', None) + yield + if old_referrer is not None: + self.session.headers.update({'Referrer': old_referrer}) + + def _print_details(self, zms, depth): + meta = {} + groups = zms.find_all("div", "collapsible-toggle") + for group in groups[:depth]: + title = group.find("div", {"class": "collapsible-title"}) + desc = group.find("div", {"class": "collapsible-description"}) + + if title is None or title == -1: + break + if desc is None or desc == -1: + break + if desc.text.strip() == "": + break + + title = title.text.strip() + desc = next(desc.stripped_strings) + + logging.info(" %s: %s", title, desc) + meta[title] = desc + + return meta + + def _fetch_soup(self, url): + response = self.session.get(BASE_URL + url) + + assert response.status_code == 200, f"Could not get a proper response for {url}" + + self.session.headers.update({'Referrer': response.url}) + self.session.get(BASE_URL + BLANK_URL) + + return BeautifulSoup(response.content, 'html.parser') + + def _iter_bookable_day_urls(self, start_url): + html = self._fetch_soup(start_url) + months = html.find_all("div", {"class": "calendar-month-table"}) + + zms = html.find("div", {"class": "zms"}) + self._print_details(zms, 2) + + for m_div in months: + month_name = m_div.find("th", {"class": "month"}) + bookable = m_div.find_all('td', {"class": 'buchbar'}) + logger.info("Found month %s with %d available days", month_name.text.strip(), len(bookable)) + + for day in bookable: + day_link = day.find("a") + + if day_link and day_link != -1: + logger.info("Search free slots for day %s. %s", day_link.text.strip(), month_name.text.strip()) + yield day_link.attrs["href"] + + logger.info("No more days with appointments") + + def _iter_bookable_times(self, day_url): + html = self._fetch_soup(day_url) + + timetable = html.find("div", {"class": "timetable"}) + if not timetable or timetable == -1: + logger.warning("No timetable found in %s", day_url) + return + + for row in timetable.find_all("tr"): + head = row.find("th", {"class": "buchbar"}) + + if head and head != 1: + logger.info("Found timeslot %s", head.text.strip()) + + content = row.find("td", {"class": "frei"}) + if not content or content == -1: + logger.warning("No link field for this slot") + continue + + link = content.find("a") + if not link or link == -1: + logger.warning("No Link tag for this slot") + continue + + yield link.attrs["href"] + + logger.info("No more free slots for the day.") + + def _book_appointment(self, slot_url, name=None, email=None): + html = self._fetch_soup(slot_url) + + zms = html.find("div", {"class": "zms"}) + + logger.info("Loaded booking form:") + self._print_details(zms, 4) + + formdata = {"familyName": name, "email": email, "form_validate": "1", "agbgelesen": "1"} + process = zms.find("input", {"id": "process"}) + if process is None or process == -1: + logger.error("No process id found") + self.session.get(BASE_URL + ABORT_URL) + return False + + formdata["process"] = process.attrs["value"] + + logger.info("Book with: %s", formdata) + + if self.debug: + logger.warning("Not really booked as we're in debug mode!") + self.session.get(BASE_URL + ABORT_URL) + logger.warning("Aborted!") + return True + + response = self.session.post(BASE_URL + REGISTER_URL, data=formdata) + if response.status_code != 200: + logger.error("Could not book appointment. Status: %d", response.status_code) + return False + + register_html = BeautifulSoup(response.content, 'html.parser') + success = register_html.find("div", {"class": "submit-success-message"}) + + if success is None or success == -1: + logger.error("Cannot find success message") + return False + + logger.info("Success: %s", success.text.strip()) + + result = {key: register_html.find("span", {"class": f"summary_{key}"}).text.strip() + for key in ("name", "mail", "authKey", "processId")} + logger.info(" Registratred for: %s (%s)", result["name"], result["mail"]) + logger.info(" To change or cancel use %s with the number %s and the code %s", BASE_URL + MANAGE_URL, + result["processId"], result["authKey"]) + + return True + + def book(self, name, email): + logging.info("Look for appontments at %s", BASE_URL + self.start_url) + for day_url in self._iter_bookable_day_urls(self.start_url): + with self._local_referrer(): + for slot_url in self._iter_bookable_times(day_url): + if self._book_appointment(slot_url, name, email): + return True + return False + + +parser = argparse.ArgumentParser(description="Try to book an appointment at berlin cvil services") +parser.add_argument("--name", required=True, help="The name of the applicant") +parser.add_argument("--email", required=True, help="The email of the applicant") +parser.add_argument("--url", default=START_URL, help="The start orl of the booking process") + +if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') + + args = parser.parse_args() + + url = args.url + if url.startswith(BASE_URL): + url = url[len(BASE_URL):] + logger.info("Use url %s", url) + + while True: + try: + logger.info("Try to get an appointment at %s", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + svc = BookingService(url, debug=False) + if svc.book(name=args.name, email=args.email): + break + time.sleep(30) + except KeyboardInterrupt: + logger.info("Got keyboard interrupt - stopping.") + break + except: + logger.exception("Got an exception while booking an appointment")