mirror of https://github.com/janLo/punkow
Add first version of the scraper
This commit is contained in:
parent
9c8af634b1
commit
b59f8440fd
|
|
@ -0,0 +1,220 @@
|
||||||
|
import argparse
|
||||||
|
import contextlib
|
||||||
|
import datetime
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
|
||||||
|
import requests
|
||||||
|
|
||||||
|
from bs4 import BeautifulSoup
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
BASE_URL = 'https://service.berlin.de'
|
||||||
|
START_URL = '/terminvereinbarung/termin/tag.php?termin=1&dienstleister=122301&anliegen[]=120686&herkunft=1' # anmeldung wohnung pankow
|
||||||
|
# START_URL = '/terminvereinbarung/termin/tag.php?termin=1&dienstleister=327427&anliegen[]=318998&herkunft=1' # einbürgerung pankow
|
||||||
|
USER_AGENT = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36'
|
||||||
|
BLANK_URL = '/terminvereinbarung/termin/blank.png'
|
||||||
|
REGISTER_URL = "/terminvereinbarung/termin/register/"
|
||||||
|
ABORT_URL = '/terminvereinbarung/termin/abort/'
|
||||||
|
MANAGE_URL = '/terminvereinbarung/termin/manage/'
|
||||||
|
|
||||||
|
|
||||||
|
def print_url(r, *args, **kwargs):
|
||||||
|
logger.debug("loaded: %s - status: %d", r.url, r.status_code)
|
||||||
|
logger.debug("request headers: %s", r.request.headers)
|
||||||
|
logger.debug("response headers: %s", r.headers)
|
||||||
|
|
||||||
|
|
||||||
|
class BookingService(object):
|
||||||
|
def __init__(self, start_url, debug=True):
|
||||||
|
self.session = None
|
||||||
|
self.start_url = start_url
|
||||||
|
self.debug = debug
|
||||||
|
|
||||||
|
self._init_session()
|
||||||
|
|
||||||
|
def _init_session(self):
|
||||||
|
if self.session is not None:
|
||||||
|
self.session.close()
|
||||||
|
|
||||||
|
self.session = requests.session()
|
||||||
|
self.session.headers.update({'User-Agent': USER_AGENT, })
|
||||||
|
if self.debug:
|
||||||
|
self.session.hooks.update({'response': print_url})
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def _local_referrer(self):
|
||||||
|
old_referrer = self.session.headers.get('Referrer', None)
|
||||||
|
yield
|
||||||
|
if old_referrer is not None:
|
||||||
|
self.session.headers.update({'Referrer': old_referrer})
|
||||||
|
|
||||||
|
def _print_details(self, zms, depth):
|
||||||
|
meta = {}
|
||||||
|
groups = zms.find_all("div", "collapsible-toggle")
|
||||||
|
for group in groups[:depth]:
|
||||||
|
title = group.find("div", {"class": "collapsible-title"})
|
||||||
|
desc = group.find("div", {"class": "collapsible-description"})
|
||||||
|
|
||||||
|
if title is None or title == -1:
|
||||||
|
break
|
||||||
|
if desc is None or desc == -1:
|
||||||
|
break
|
||||||
|
if desc.text.strip() == "":
|
||||||
|
break
|
||||||
|
|
||||||
|
title = title.text.strip()
|
||||||
|
desc = next(desc.stripped_strings)
|
||||||
|
|
||||||
|
logging.info(" %s: %s", title, desc)
|
||||||
|
meta[title] = desc
|
||||||
|
|
||||||
|
return meta
|
||||||
|
|
||||||
|
def _fetch_soup(self, url):
|
||||||
|
response = self.session.get(BASE_URL + url)
|
||||||
|
|
||||||
|
assert response.status_code == 200, f"Could not get a proper response for {url}"
|
||||||
|
|
||||||
|
self.session.headers.update({'Referrer': response.url})
|
||||||
|
self.session.get(BASE_URL + BLANK_URL)
|
||||||
|
|
||||||
|
return BeautifulSoup(response.content, 'html.parser')
|
||||||
|
|
||||||
|
def _iter_bookable_day_urls(self, start_url):
|
||||||
|
html = self._fetch_soup(start_url)
|
||||||
|
months = html.find_all("div", {"class": "calendar-month-table"})
|
||||||
|
|
||||||
|
zms = html.find("div", {"class": "zms"})
|
||||||
|
self._print_details(zms, 2)
|
||||||
|
|
||||||
|
for m_div in months:
|
||||||
|
month_name = m_div.find("th", {"class": "month"})
|
||||||
|
bookable = m_div.find_all('td', {"class": 'buchbar'})
|
||||||
|
logger.info("Found month %s with %d available days", month_name.text.strip(), len(bookable))
|
||||||
|
|
||||||
|
for day in bookable:
|
||||||
|
day_link = day.find("a")
|
||||||
|
|
||||||
|
if day_link and day_link != -1:
|
||||||
|
logger.info("Search free slots for day %s. %s", day_link.text.strip(), month_name.text.strip())
|
||||||
|
yield day_link.attrs["href"]
|
||||||
|
|
||||||
|
logger.info("No more days with appointments")
|
||||||
|
|
||||||
|
def _iter_bookable_times(self, day_url):
|
||||||
|
html = self._fetch_soup(day_url)
|
||||||
|
|
||||||
|
timetable = html.find("div", {"class": "timetable"})
|
||||||
|
if not timetable or timetable == -1:
|
||||||
|
logger.warning("No timetable found in %s", day_url)
|
||||||
|
return
|
||||||
|
|
||||||
|
for row in timetable.find_all("tr"):
|
||||||
|
head = row.find("th", {"class": "buchbar"})
|
||||||
|
|
||||||
|
if head and head != 1:
|
||||||
|
logger.info("Found timeslot %s", head.text.strip())
|
||||||
|
|
||||||
|
content = row.find("td", {"class": "frei"})
|
||||||
|
if not content or content == -1:
|
||||||
|
logger.warning("No link field for this slot")
|
||||||
|
continue
|
||||||
|
|
||||||
|
link = content.find("a")
|
||||||
|
if not link or link == -1:
|
||||||
|
logger.warning("No Link tag for this slot")
|
||||||
|
continue
|
||||||
|
|
||||||
|
yield link.attrs["href"]
|
||||||
|
|
||||||
|
logger.info("No more free slots for the day.")
|
||||||
|
|
||||||
|
def _book_appointment(self, slot_url, name=None, email=None):
|
||||||
|
html = self._fetch_soup(slot_url)
|
||||||
|
|
||||||
|
zms = html.find("div", {"class": "zms"})
|
||||||
|
|
||||||
|
logger.info("Loaded booking form:")
|
||||||
|
self._print_details(zms, 4)
|
||||||
|
|
||||||
|
formdata = {"familyName": name, "email": email, "form_validate": "1", "agbgelesen": "1"}
|
||||||
|
process = zms.find("input", {"id": "process"})
|
||||||
|
if process is None or process == -1:
|
||||||
|
logger.error("No process id found")
|
||||||
|
self.session.get(BASE_URL + ABORT_URL)
|
||||||
|
return False
|
||||||
|
|
||||||
|
formdata["process"] = process.attrs["value"]
|
||||||
|
|
||||||
|
logger.info("Book with: %s", formdata)
|
||||||
|
|
||||||
|
if self.debug:
|
||||||
|
logger.warning("Not really booked as we're in debug mode!")
|
||||||
|
self.session.get(BASE_URL + ABORT_URL)
|
||||||
|
logger.warning("Aborted!")
|
||||||
|
return True
|
||||||
|
|
||||||
|
response = self.session.post(BASE_URL + REGISTER_URL, data=formdata)
|
||||||
|
if response.status_code != 200:
|
||||||
|
logger.error("Could not book appointment. Status: %d", response.status_code)
|
||||||
|
return False
|
||||||
|
|
||||||
|
register_html = BeautifulSoup(response.content, 'html.parser')
|
||||||
|
success = register_html.find("div", {"class": "submit-success-message"})
|
||||||
|
|
||||||
|
if success is None or success == -1:
|
||||||
|
logger.error("Cannot find success message")
|
||||||
|
return False
|
||||||
|
|
||||||
|
logger.info("Success: %s", success.text.strip())
|
||||||
|
|
||||||
|
result = {key: register_html.find("span", {"class": f"summary_{key}"}).text.strip()
|
||||||
|
for key in ("name", "mail", "authKey", "processId")}
|
||||||
|
logger.info(" Registratred for: %s (%s)", result["name"], result["mail"])
|
||||||
|
logger.info(" To change or cancel use %s with the number %s and the code %s", BASE_URL + MANAGE_URL,
|
||||||
|
result["processId"], result["authKey"])
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def book(self, name, email):
|
||||||
|
logging.info("Look for appontments at %s", BASE_URL + self.start_url)
|
||||||
|
for day_url in self._iter_bookable_day_urls(self.start_url):
|
||||||
|
with self._local_referrer():
|
||||||
|
for slot_url in self._iter_bookable_times(day_url):
|
||||||
|
if self._book_appointment(slot_url, name, email):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(description="Try to book an appointment at berlin cvil services")
|
||||||
|
parser.add_argument("--name", required=True, help="The name of the applicant")
|
||||||
|
parser.add_argument("--email", required=True, help="The email of the applicant")
|
||||||
|
parser.add_argument("--url", default=START_URL, help="The start orl of the booking process")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s %(levelname)s: %(message)s',
|
||||||
|
datefmt='%Y-%m-%d %H:%M:%S')
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
url = args.url
|
||||||
|
if url.startswith(BASE_URL):
|
||||||
|
url = url[len(BASE_URL):]
|
||||||
|
logger.info("Use url %s", url)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
logger.info("Try to get an appointment at %s", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
|
||||||
|
svc = BookingService(url, debug=False)
|
||||||
|
if svc.book(name=args.name, email=args.email):
|
||||||
|
break
|
||||||
|
time.sleep(30)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logger.info("Got keyboard interrupt - stopping.")
|
||||||
|
break
|
||||||
|
except:
|
||||||
|
logger.exception("Got an exception while booking an appointment")
|
||||||
Loading…
Reference in New Issue