Advanced Web Scraping: Part I


Advanced Web Scraping: Part I

Web scraping requires a lot of analytical skills, plus can take some pretty complex code. I was given the task of gaining property information from a website. Easy right. eeemm, No. There were over a million properties on the site as well as dealing with all sorts of other issues. Needed to be able to scan the sites and see if I should scrape pages or not, plus dealing with a less than straight forward search engine.

I’m going to go through what I did for this site, the analysis I had to do and the code that was needed to do the job.

Part I: The Search Engine.

The search engine for this site, worked on geographical locations. I was in luck here since the customer only wanted properties from a given set of regions. Well sort of in luck, the list was on the large side. I converted each of these regions to url starting points for search the site. Easy so far.

I now faced my first set of problems. The site search pages only returned a maximum of 350 properties. No pagination was available, to see more sites, you needed to zoom in on the map, or filter the results.

There was also a second issue. Some of the data was displayed in the page, but the majority of the search results data was hidden in various tooltips and pop-ups, that were created by javascript routines. Many at this point would turn to Selenium or similar to gather the data, but I took a much closer look at the page.

All the data on the properties was passed to the page in a blob of json. Great, your saying, but no. The json data was actually part of a javascript script and escaped so it could be passed as part of the code. Yuk!

So to complete this first step, I needed to be able to load the pages in such a way as to get less than 350 properties on each request, and I then had to extract the data from an escaped mess, in the middle of a load of Javascript.

Plus lets not forget, I had to do a LOT of these calls so I could see all the properties.

Asyncio is a MUST

Lets talk about the amount of calls we would need to make and of course the time waiting for responses. Anyone who has done work like this would know that the time waiting for the response is way longer than it takes for the rest of the code to work, i.e., we are sitting there waiting on IO. Asyncio to the rescue.

We are going to need to run multiple Async loops so that we can make multiple calls and process the results while others are still waiting for a response. It would also be a good idea to make use of the other CPU’s on our server or instance. An Async loop is just a single thread and thus will only run on one cpu, so to use more than one cpu, we have to take a look at multiprocessing.

Getting All the Properties

A proper research of the site and the search page, showed that filter options were sent in the url in a get query. This is a rather important point as it means we can write urls with the filter options included, without having to click on items on the page etc.

The obvious value to filter on is the price. A nice numerical value that we can work with. On top of this, I created default filter options based on the property types.

So to get all the properties I used the following routine

  1. Create a queue and populate with a list of search page urls
  2. Each worker will pull a url and get the page
  3. Searching the page, we can get the total number of properties for that search
  4. If the total is ≥ 350

4.1 Get the High and low filter values for the search url.

4.2 Create two new urls using new High/low values by splitting the range into 2–3 parts

4.3 Add the newly created urls to the Queue holding all the search urls

5. if < 350

5.1 Search the html to find the script tags that hold the property data.

5.2 Use a recursive function or similar to extract the data from the javascript.

5.3 save the specific property URL to a table on the database

In the flow above, you can see that I started with a single process, that I used to generate four processes using the multiprocess library. I created the queue with the urls and passed this to all the new processes. Each process in turn started a set of Async Tasks. Creating the co-routines as Tasks is required to make sure they run at the same time.

The original process will just sit there and wait for all processes and their tasks to complete, which will happen when the Search URL queue is empty.

Main Code

import asyncio
import multiprocessing
from multiprocessing import cpu_count
from dotenv import dotenv_values, find_dotenv
from .populate_queues import populate_search_queues
from .search_worker import search_worker

system_params = dotenv_values(find_dotenv(".data_collector_config"))

"""
Most sites will expect you to have a header identfying your browser type. Setting the user agent
is required
"""


headers = {
'User-Agent':
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36'}


async def worker(queue: multiprocessing.Queue):
"""
:param queue: multiprocessing.Queue object from which the worker will receive search data.
:return: None

This function starts an async coroutine for each search record it obtains from the incoming multiprocessing queue.
"""

while not queue.empty():
search_data = queue.get()
await search_worker(search_data, headers)


def process_tasks(queue: multiprocessing.Queue):
"""
:param queue:
:return:

This is the function that represents the process started by the multiprocessing library
"""

asyncio.run(run_workers(queue))


async def run_workers(queue: multiprocessing.Queue):
"""
:param queue:
:return:

This function starts the number of async coroutines that will take search records off the queue and start
processing them. THis is the coroutines run in each process, so remember that if you have started 4 processes, and
6 async tasks, you will have 24 workers running. This is important as you need to montior the amount of
consecutive connections you are making
"""

tasks = [asyncio.create_task(worker(queue)) for i in range(6)]
[await t for t in tasks]


def search_scanner() -> None:
"""
This function creates the multiprocessing queue and requests for it to be populated. Once populated,
it will start a given number of multiprocess processes. It is told to wait for all processes to completed
before it ends.
:return:
"""

url_queue = multiprocessing.Queue()
headers_local = headers
headers_local["X-API-Key"] = system_params["X-API-KEY"]
populate_search_queues(url_queue, headers=headers_local)
tasks = [multiprocessing.Process(target=process_tasks, args=(url_queue,)) for i in range(cpu_count())]
[i.start() for i in tasks]
[i.join() for i in tasks]


if __name__ == '__main__':
search_scanner()

Here is the bulk of the code that starts the processes, and Async workers. For good coding practice, you need to remember to keeps your functions as reasonably small. Not need to have huge code elements. Also note that I use “dotenv”. Its always good practice on ANY application to ensure that your security is done right.

I’ve also wrapped database access in a small django/django-ninja server. Given the distributed nature of this app, its not a good idea to code database access and connections into each worker.

Search Worker Code

from bs4 import BeautifulSoup
import re
import json
import aiohttp
from .populate_queues import generate_ranges


async def search_worker(search_info, headers) -> None:
search_page_url = await create_url(search_info)
async with aiohttp.ClientSession() as session:
async with session.get(search_page_url, headers=headers) as resp:
if resp.status == 200:
results = await resp.text()
soup = BeautifulSoup(results, "html.parser")
count_text = soup.find("div", attrs={"data-rf-test-id": "homes-description"}).text
property_count = int(count_text.strip().replace(",", "").split()[0])
if property_count > 350:
await new_search(search_info, headers)
else:
await search_extract(soup, search_info, headers)


async def new_search(search_info, headers) -> None:
step = divide_whole(search_info["start_price"], search_info["end_price"])
if step > 1:
rg = generate_ranges(search_info["start_price"], search_info["end_price"], step)
for val in rg:
search_info["start_price"] = val[0]
search_info["end_price"] = val[1]
await search_worker(search_info, headers)


def divide_whole(n1, n2, dv=5):
if n2 - n1 < 1:
return 0
else:
res = (n2 - n1) // dv
if res < 2 and dv > 1:
return divide_whole(n1, n2, dv-1)
else:
return res


async def search_extract(soup, search_info, headers):
save_property_url = "http://localhost/api/save/property/url"
try:
scripts = soup.find_all("script")
homes = []
for sc in scripts:
sc_data = sc.get_text()
if 'ServerState.InitialContext' in sc_data:
test_data = re.findall(r'\{"homes":\[.+]', sc_data.replace('\\', '').replace("u002F", "/"))
test_data3 = test_data[0].split(',"dataSources":')[0]
homes = json.loads(test_data3.split('"homes":')[1])
if homes:
for home in homes:
data = home
data['status'] = search_info['status']
data['search_url'] = search_info['url']
async with aiohttp.ClientSession() as session:
async with session.post(save_property_url, json=data, headers=headers) as resp:
if resp.status > 350:
print(resp.status)
except Exception as e:
print("search page extract failed")
print(e)


async def create_url(record: dict) -> str:
filters = [
f"property-type=house+condo+townhouse+multifamily", f"min-beds=1",
f"min-baths=1",
f"min-price={record['start_price']}k", f"max-price={record['end_price']}k"
]
if record["status"] == "active":
filters.append("status=active")
elif record["status"] == "pending":
filters.append("status=pending")
elif record["status"] == "contingent":
filters.append("status=contingent")
elif record["status"] == "sold":
filters.append("include=sold-6mo")
if record["url"][-1] != "/":
url = f"{record['url']}/filter/{','.join(filters)}"
else:
url = f"{record['url']}filter/{','.join(filters)}"
return url

In this code, you can see me fetching the pages, extracting and testing the data. BeautifulSoup and regex are required to obtained the needed data. Getting as much done with a single library is always best, but its also a a good idea to not tie yourself into knots trying to do so.

Populating Queues

import requests


def generate_ranges(start, stop, step):
return [(i, i + step - 1) for i in range(start, stop+1, step)]


def populate_search_queues(search_queue, headers):
"""
This function will get a list of starter points for the website, for the search system to begin with.
As this is a property site and will return a lot of properties, it creates multiple search records for
each record obtained from django. The filter options are based on the property status and the price range.
generate ranges just generates a set of tuples with a start and end price. We create a record for each
set of price ranges and for the four main status's they are interested in.

Note the headers passed for the request, include an API validation key to be used for the Django server
This is quite easy to setup in Django using Django-Ninja, but some sort of security should always be in
place
:param search_queue:
:param headers:
:return:
"""

price_ranges = generate_ranges(1, 4000, 100)
resp = requests.get('http://127.0.0.1/api/search/urls', headers=headers)
if resp.status_code == 200:
data = resp.json()
for url in data["urls"]:
for val in price_ranges:
for status in ["active", "pending", "sold", "contingent"]:
search_queue.put({"url": url, "start_price": val[0],
"end_price": val[1], "status": status})

Here is the last code snippet. This is getting the urls via the API server. Note that I’m using requests here instead of aiohttp since I’m not working inside a co-routine.

Final Words

This routine allow for running 500 or more requests at any one time, which is a great way to get yourself banned. A rotating proxy is almost essential for using this, but I also recommend throttling back on the number of processes and tasks being run.

You will note very little of this code is actually considered part of the web scraping tool kit. It goes to show that web scraping needs a broad range of skills and is far from simple when it comes to the larger websites