Адаптированный перевод статьи Building a Concurrent Web Scraper with Python and Selenium.
В этом посте мы разберем вопрос, как ускорить выполнение скрипта Python при парсинге информации с веб-страниц, используя модуль concurrent.futures
для реализации многопоточности его выполнения. Также мы разберем сам сценарий и покажем, как проверить корректность его работы с помощью pytest
.
После прочтения этой статьи вы сможете:
- Собирать информацию и перемещаться по страницам веб-сайтов с помощью Selenium и Beautiful Soup
- Настроить
pytest
для проверки функциональности процедур вашего кода. - Запускать парсер для параллельного выполнения запросов, используя модуль
concurrent.futures
. - Настроить драйвер
ChromeDriver
Selenium для работы вheadless
режиме работы.
Содержание
Настройка проекта
Сначала клонируем репозиторий с кодом нашего проекта. Для этого из командной строки выполним следующие команды:
$ git clone [email protected]:testdrivenio/concurrent-web-scraping.git $ cd concurent-web-scraping $ python3.8 -m venv env $ source env/bin/activate (env)$ pip install -r requirements.txt
Приведенные выше команды могут отличаться в зависимости от вашей среды разработки.
Установите ChromeDriver глобально. (Мы же используем версию 85.0.4183.87).
Разбор работы скрипта
Скрипт перебирает и собирает информацию с первых 20 страниц сайта Hacker News о последних статьях, используя Selenium для автоматизации взаимодействия с сайтом и Beautiful Soup для анализа HTML.
import datetime import sys from time import sleep, time from scrapers.scraper import connect_to_base, get_driver, parse_html, write_to_file def run_process(page_number, filename, browser): if connect_to_base(browser, page_number): sleep(2) html = browser.page_source output_list = parse_html(html) write_to_file(output_list, filename) else: print("Error connecting to hacker news") if __name__ == "__main__": # используется headless mod? headless = False if len(sys.argv) > 1: if sys.argv[1] == "headless": print("Running in headless mode") headless = True # устанавливаем значения вспомоготельных переменных start_time = time() current_page = 1 output_timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") output_filename = f"output_{output_timestamp}.csv" # инициализируем веб драйвер browser = get_driver(headless=headless) # перебираем страницы и собираем нужную информацию while current_pageНачнем с основного файла. Вначале определим функцию
run_process
в которой будет запущен основной цикл работы нашего скрипта. Затем проверяем будет ли ChromeDriver работать в headless режиме, после чего определим и инициализируем несколько переменных, а также инициализируем сам драйвер с помощью методаget_driver()
, импортированного из файлаscrapers/scraper.py
. В циклеwhile
перебираем страницы сайта с которых будем собирать нужные нам данные. Для этого вызывается функцияrun_process()
, которая непосредственно управляет соединением WebDriver и вызовами, соответствующих функций для сбора информации со страниц. Для этого вrun_process()
передается экземпляр WebDriver, а также номер страницы, которые передаются в функциюconnect_to_base()
, которая импортируется из файлаscraper.py
, код которого приводится ниже.import csv from pathlib import Path import requests from bs4 import BeautifulSoup from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait BASE_DIR = Path(__file__).resolve(strict=True).parent.parent def get_driver(headless): options = webdriver.ChromeOptions() if headless: options.add_argument("--headless") # инициализируем драйвер с нужными опциями driver = webdriver.Chrome(chrome_options=options) return driver def connect_to_base(browser, page_number): base_url = f"https://news.ycombinator.com/news?p={page_number}" connection_attempts = 0 while connection_attemptsФункция
connect_to_base()
пытается подключиться к сайту Hacker News, а затем использует функционал явного ожидания explicit wait Selenium, чтобы убедиться, что элемент сid = 'hnmain'
загружен на страницу, прежде чем продолжить и получить из него нужные нам данные.Просмотрите документацию Selenium для получения дополнительной информации о порядке использования explicit wait.
if connect_to_base(browser, page_number): ######## # здесь может быть код имитирующие поведение пользователя на странице # ######## sleep(2) html = browser.page_source output_list = parse_html(html) write_to_file(output_list, filename) else: print("Error connecting to hacker news")Чтобы подражать человеку-пользователю, вызывается метод
sleep(2)
который вносит задержку на 2 секунды после того, как драйвер подключился к Hacker News. После загрузки страницы и выполненияsleep(2)
, драйвер захватывает HTML код страницы, который затем передается в функциюparse_html()
.def parse_html(html): # создадим новый объект soup soup = BeautifulSoup(html, "html.parser") output_list = [] # ищем в объекте soup object id, rank, score и title статьи tr_blocks = soup.find_all("tr", class_="athing") article = 0 for tr in tr_blocks: article_id = tr.get("id") article_url = tr.find_all("a")[1]["href"] # определяем, что статья новая if "item?id=" in article_url: article_url = f"https://news.ycombinator.com/{article_url}" load_time = get_load_time(article_url) try: score = soup.find(id=f"score_{article_id}").string except Exception as e: print(e) score = "0 points" article_info = { "id": article_id, "load_time": load_time, "rank": tr.span.string, "score": score, "title": tr.find(class_="storylink").string, "url": article_url, } # добавляем информацию о статье в список output_list.append(article_info) article += 1 return output_listВ свою очередь функция
parse_html()
использует возможности библиотеки Beautiful Soup для синтаксического разбора HTML кода страницы для сбора нужной информации со страниц, которая будет помещена в списокdicts
. Эта функция также передает URL адрес статьи в функциюget_load_time()
.def get_load_time(article_url): try: # устанавливаем значения заголовков запроса headers = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36" } # делаем запрос по url статьи article_url response = requests.get( article_url, headers=headers, stream=True, timeout=3.000 ) # получаем время загрузки страницы load_time = response.elapsed.total_seconds() except Exception as e: print(e) load_time = "Loading Error" return load_timeЭта функция загружает страницу статьи по переданному в нее URL адресу
article_url
, а также запоминает время ее загрузки.Полученная информация о статье добавляется в CSV файл после передачи списка с полученными данными о статьях в функцию
write_to_file
.def write_to_file(output_list, filename): for row in output_list: with open(Path(BASE_DIR).joinpath(filename), "a") as csvfile: fieldnames = ["id", "load_time", "rank", "score", "title", "url"] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writerow(row)После запуска скрипта нам потребовалось ожидать около 355 секунд (почти 6 минут) до окончания его работы:
(env)$ python script.py Scraping page #1... Scraping page #2... Scraping page #3... Scraping page #4... Scraping page #5... Scraping page #6... Scraping page #7... Scraping page #8... Scraping page #9... Scraping page #10... Scraping page #11... Scraping page #12... Scraping page #13... Scraping page #14... Scraping page #15... Scraping page #16... Scraping page #17... Scraping page #18... Scraping page #19... Scraping page #20... Elapsed run time: 385.49500608444214 secondsИмейте в виду, что контент может быть не на всех 20 страницах, поэтому прошедшее время может отличаться в вашем случае. Когда этот скрипт запускался, контент присутствовал на 18 страницах (около 530 записей).
Это достаточно большое время. Ну что ж, сначала добавим к нашему коду возможность базового тестирования.
Тестируем наш код
С целью проверить функциональность нашего парсера без запуска браузера и, таким образом, не повторять GET запросы к сайту Hacker News, вы можете загрузить HTML код страницы и сохранить его в папку
test/test.html
, а затем парсить его локальную копию. Это поможет избежать блокировки вашего IP-адреса из-за слишком быстрого выполнения большого количества запросов, в ходе отладки и тестирования функций парсинга данных, этот подход также сэкономит ваше время, поскольку вам не нужно запускать браузер при каждом запуске скрипта.Ниже представлен код файла тестов, который находится в папке test/test_scraper.py:
from pathlib import Path import pytest from scrapers import scraper BASE_DIR = Path(__file__).resolve(strict=True).parent @pytest.fixture(scope="module") def html_output(): with open(Path(BASE_DIR).joinpath("test.html"), encoding="utf-8") as f: html = f.read() yield scraper.parse_html(html) def test_output_is_not_none(html_output): assert html_output def test_output_is_a_list(html_output): assert isinstance(html_output, list) def test_output_is_a_list_of_dicts(html_output): assert all(isinstance(elem, dict) for elem in html_output)Убедимся, что все работает как надо:
(env)$ python -m pytest test/test_scraper.py ================================ test session starts ================================= platform darwin -- Python 3.8.5, pytest-6.0.1, py-1.9.0, pluggy-0.13.1 rootdir: /Users/michael/repos/testdriven/async-web-scraping collected 3 items test/test_scraper.py ... [100%] ================================= 3 passed in 20.10s =================================Код выполнялся всего 20 секунд. Попробуем имитировать работу функции
get_load_time()
, исключив отправку GET запроса.test / test_scraper_mock.py:
from pathlib import Path import pytest from scrapers import scraper BASE_DIR = Path(__file__).resolve(strict=True).parent @pytest.fixture(scope="function") def html_output(monkeypatch): def mock_get_load_time(url): return "mocked!" monkeypatch.setattr(scraper, "get_load_time", mock_get_load_time) with open(Path(BASE_DIR).joinpath("test.html"), encoding="utf-8") as f: html = f.read() yield scraper.parse_html(html) def test_output_is_not_none(html_output): assert html_output def test_output_is_a_list(html_output): assert isinstance(html_output, list) def test_output_is_a_list_of_dicts(html_output): assert all(isinstance(elem, dict) for elem in html_output)Снова тестируем:
(env)$ python -m pytest test/test_scraper_mock.py ================================ test session starts ================================= platform darwin -- Python 3.8.5, pytest-6.0.1, py-1.9.0, pluggy-0.13.1 rootdir: /Users/michael/repos/testdriven/async-web-scraping collected 3 items test/test_scraper.py ... [100%] ================================= 3 passed in 0.37s =================================Настраиваем многопоточность
А теперь самое интересное! Внеся всего лишь несколько изменений в код нашего сценария, мы можем ускорить процесс его выполнения:
import datetime import sys from concurrent.futures import ThreadPoolExecutor, wait from time import sleep, time from scrapers.scraper import connect_to_base, get_driver, parse_html, write_to_file def run_process(page_number, filename, headless): # инициализируем веб драйвер browser = get_driver(headless) if connect_to_base(browser, page_number): sleep(2) html = browser.page_source output_list = parse_html(html) write_to_file(output_list, filename) # закрываем соединие browser.quit() else: print("Error connecting to hacker news") browser.quit() if __name__ == "__main__": # устанавливаем headless mode? headless = False if len(sys.argv) > 1: if sys.argv[1] == "headless": print("Running in headless mode") headless = True # инициализируем переменные start_time = time() output_timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") output_filename = f"output_{output_timestamp}.csv" futures = [] # перебираем страницы и получаем информацию о них with ThreadPoolExecutor() as executor: for number in range(1, 21): futures.append( executor.submit(run_process, number, output_filename, headless) ) wait(futures) end_time = time() elapsed_time = end_time - start_time print(f"Elapsed run time: {elapsed_time} seconds")В модуле
concurrent.futures
классThreadPoolExecutor
используется для создания пула потоков и асинхронного выполнения вызовов функцийrun_process()
. Метод submit принимает функцию вместе с аргументами, передаваемыми при ее вызове, и возвращает результат выполнения функцииrun_process
. Метод wait используется для блокировки запущенного асинхронно кода до завершения всех выполняемых им задач.Стоит отметить, что вы можете легко использовать в вашем коде многопроцессорность, используя класс
ProcessPoolExecutor
, поскольку иProcessPoolExecutor
, иThreadPoolExecutor
реализуют для один и тот же интерфейс:# перебираем страницы и получаем информацию о них with ProcessPoolExecutor() as executor: for number in range(1, 21): futures.append( executor.submit(run_process, number, output_filename, headless) )Почему в нашем примере мы используем многопоточность вместо многопроцессорности?
Скарпинг веб-страниц в большей степени связан с выполнением операций ввода-вывода I/O, поскольку получение по сети HTML кода (I/O) происходит медленнее, чем непосредственно его парсинг (ЦП). Чтобы узнать об этом больше, а также о разнице между параллелизмом parallelism (многопроцессорностью) и параллелизмом concurrency (многопоточностью), ознакомьтесь со статьей Speeding Up Python with Concurrency, Parallelism, and asyncio.
Запустим наш усовершенствованный парсер:
(env)$ python script_concurrent.py Elapsed run time: 38.73605298995972 secondsС кодом нашего парсера, работающего в многопоточном режиме, можно ознакомиться по ссылке.
Чтобы еще более ускорить процесс, мы можем запустить Chrome в headless режиме, передав в качестве аргумента в командной строке значение
headless
:(env)$ python script_concurrent.py headless Running in headless mode Elapsed run time: 35.12011382590508 secondsЗаключение
Немного модифицировав исходный код нашего веб-скарпера мы смогли распараллелить его работу, что позволило сократить время выполнения сценария от 385 секунд до чуть более 35 секунд. В нашем случае этот подход позволил увеличить быстродействие скрипта на 90%, что является достаточно эффективным решением.
Надеюсь, материалы этой статьи помогут вам в вашей работе.