Как реализовать асинхронность + многопоточность одновременно в Python для веб-скрапинга?
Мне нужно спарсить большое количество страниц (около 15 000) с 35 статьями на каждой странице для каждой рубрики. Если делать это синхронно с помощью requests в одном потоке, это займет очень много времени (несколько часов), а информация нужна срочно. Я хочу использовать aiohttp и многопоточность.
Я планирую реализовать это так: каждый поток будет параллельно с другими обрабатывать тысячи страниц определенной рубрики (всего около 15 рубрик). В каждом потоке будут асинхронно выполняться запросы к aiohttp.
Проблема в том, что executor не позволяет передать в него функцию с await, а если вызывать функцию напрямую в submit, то многопоточность не работает - потоки выполняются последовательно.
Как правильно реализовать комбинацию асинхронных запросов aiohttp в многопоточном окружении для эффективного веб-скрапинга?
Текущий код прилагаю ниже:
# Other imports
import bs4
import aiohttp, asyncio
from urllib import parse
import certifi, ssl
from concurrent.futures import ThreadPoolExecutor
# Project imports
from utils import to_dict, fetch
items_list = []
SSL_CERT = ssl.create_default_context(cafile=certifi.where())
async def serialize_topic(topic: str, topics: list[str]) -> None:
print(f'Starting to serialize {topics.index(topic)+1}. {topic}')
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
topic_html = await fetch(func=session.get, url=topic, params={'limit' : 35})
topic_soup = bs4.BeautifulSoup(topic_html, 'html.parser')
# Getting the last page number to handle pagination
pagination = list(topic_soup('ul', {'class' : 'pagination'}).children)
max_page_url = pagination[-1].a.get('href')
max_page_params = parse.urlparse(max_page_url).query # Parsing a link to the last page in order to get its number
dict_from_query = parse.parse_qs(max_page_params)
max_page = int(dict_from_query['page'][0])
# Going through all pages
for i in range(max_page):
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
page_html = await fetch(func=session.get, url=topic, params={'page' : i, 'limit' : 100})
page_soup = bs4.BeautifulSoup(page_html, 'html.parser')
items = page_soup.find_all(class_='article')
# Going through items on one page
for item in items:
# Getting item caption with its data
# ... здесь просто получаю описание статьи и название
...
print(f'Serialized page number {i+1}')
print(f'Serialized {topics.index(topic)+1}. {topic}')
async def main():
# Getting all topics
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
html = await fetch(func=session.get, url='')
soup = bs4.BeautifulSoup(html, 'html.parser')
topics = soup.find('div', {'id' : 'content'}).find_all(class_='topic')
topics = [item.a.get('href') for item in topics]
with ThreadPoolExecutor(max_workers=15) as executor:
for topic in topics:
executor.submit(await serialize_topic(), topic, topics)
print('\n**************************\n')
print(f'Total amount of articles: {len(items_list)}')
if __name__ == '__main__':
asyncio.run(main())
Основная проблема в вашем коде заключается в том, что вы пытаетесь передать await serialize_topic() напрямую в executor.submit(). Это не работает корректно, так как executor.submit() ожидает синхронную функцию, а не асинхронную.
Правильная реализация комбинации aiohttp и ThreadPoolExecutor требует использования loop.run_in_executor(), который позволяет выполнять асинхронные функции в отдельных потоках.
Содержание
- Решение текущей проблемы
- Правильная реализация
- Оптимизированный подход
- Полный рабочий пример
- Дополнительные оптимизации
- Рекомендации по производительности
Решение текущей проблемы
В вашем коде нужно заменить:
# Неправильно
executor.submit(await serialize_topic(), topic, topics)
на корректный подход использования run_in_executor():
# Правильно
loop = asyncio.get_event_loop()
await loop.run_in_executor(executor, serialize_topic, topic, topics)
Правильная реализация
Вот как должна выглядеть ваша основная функция:
async def main():
# Получение всех тем
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
html = await fetch(func=session.get, url='')
soup = bs4.BeautifulSoup(html, 'html.parser')
topics = soup.find('div', {'id': 'content'}).find_all(class_='topic')
topics = [item.a.get('href') for item in topics]
# Создание event loop и executor
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=15) as executor:
# Создание задач для каждой темы
tasks = []
for topic in topics:
task = loop.run_in_executor(executor, serialize_topic, topic, topics)
tasks.append(task)
# Ожидание завершения всех задач
await asyncio.gather(*tasks)
print('\n**************************\n')
print(f'Всего статей: {len(items_list)}')
Оптимизированный подход
Для максимальной производительности рекомендуется использовать следующую структуру:
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urlparse, parse_qs
import bs4
import certifi
import ssl
# Глобальные переменные
items_list = []
SSL_CERT = ssl.create_default_context(cafile=certifi.where())
# Синхронная функция для обработки одной темы
def serialize_topic_sync(topic: str, topics: list[str]) -> None:
print(f'Начало обработки {topics.index(topic)+1}. {topic}')
# Создание event loop для этого потока
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Запуск асинхронной функции в этом потоке
loop.run_until_complete(serialize_topic_async(topic, topics))
finally:
loop.close()
# Асинхронная функция для обработки одной темы
async def serialize_topic_async(topic: str, topics: list[str]) -> None:
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
topic_html = await fetch(func=session.get, url=topic, params={'limit': 35})
topic_soup = bs4.BeautifulSoup(topic_html, 'html.parser')
# Получение номера последней страницы
pagination = list(topic_soup('ul', {'class': 'pagination'}).children)
max_page_url = pagination[-1].a.get('href')
max_page_params = urlparse(max_page_url).query
dict_from_query = parse_qs(max_page_params)
max_page = int(dict_from_query['page'][0])
# Обработка всех страниц
for i in range(1, max_page + 1):
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
page_html = await fetch(func=session.get, url=topic, params={'page': i, 'limit': 100})
page_soup = bs4.BeautifulSoup(page_html, 'html.parser')
items = page_soup.find_all(class_='article')
# Обработка элементов страницы
for item in items:
# ... ваша логика обработки ...
pass
print(f'Обработана страница {i} для темы {topic}')
print(f'Завершена обработка {topics.index(topic)+1}. {topic}')
async def main():
# Получение всех тем
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
html = await fetch(func=session.get, url='')
soup = bs4.BeautifulSoup(html, 'html.parser')
topics = soup.find('div', {'id': 'content'}).find_all(class_='topic')
topics = [item.a.get('href') for item in topics]
# Использование ThreadPoolExecutor для параллельной обработки тем
with ThreadPoolExecutor(max_workers=min(15, len(topics))) as executor:
# Создание синхронных задач для каждого потока
futures = []
for topic in topics:
future = executor.submit(serialize_topic_sync, topic, topics)
futures.append(future)
# Ожидание завершения всех потоков
for future in futures:
future.result()
print(f'Всего обработано статей: {len(items_list)}')
if __name__ == '__main__':
asyncio.run(main())
Полный рабочий пример
Вот полностью рабочий пример с дополнительными оптимизациями:
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse, parse_qs
import bs4
import certifi
import ssl
import time
from typing import List, Dict
# Глобальные переменные
items_list = []
SSL_CERT = ssl.create_default_context(cafile=certifi.where())
async def fetch_with_retry(
session: aiohttp.ClientSession,
url: str,
params: Dict = None,
max_retries: int = 3,
delay: float = 1.0
) -> str:
"""Функция для выполнения запросов с автоматическими повторами"""
for attempt in range(max_retries):
try:
async with session.get(url, params=params, timeout=30) as response:
response.raise_for_status()
return await response.text()
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(delay * (2 ** attempt))
async def process_page(session: aiohttp.ClientSession, topic_url: str, page: int) -> List[Dict]:
"""Обработка одной страницы с статьями"""
try:
page_html = await fetch_with_retry(
session,
topic_url,
params={'page': page, 'limit': 100}
)
page_soup = bs4.BeautifulSoup(page_html, 'html.parser')
items = page_soup.find_all(class_='article')
articles = []
for item in items:
# Пример извлечения данных
title = item.find('h2').text.strip() if item.find('h2') else 'Без заголовка'
description = item.find('p').text.strip() if item.find('p') else 'Без описания'
articles.append({
'title': title,
'description': description,
'page': page,
'topic': topic_url
})
return articles
except Exception as e:
print(f"Ошибка при обработке страницы {page} для {topic_url}: {e}")
return []
async def process_topic_async(topic_url: str, topics: List[str]) -> List[Dict]:
"""Асинхронная обработка одной темы"""
print(f'Начало обработки темы {topics.index(topic_url)+1}/{len(topics)}: {topic_url}')
async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(ssl=SSL_CERT, limit=100, limit_per_host=30),
timeout=aiohttp.ClientTimeout(total=60)
) as session:
# Получение первой страницы для определения общего количества страниц
try:
first_page_html = await fetch_with_retry(session, topic_url, params={'limit': 35})
except Exception as e:
print(f"Не удалось получить первую страницу для {topic_url}: {e}")
return []
topic_soup = bs4.BeautifulSoup(first_page_html, 'html.parser')
try:
# Определение количества страниц
pagination = list(topic_soup('ul', {'class': 'pagination'}).children)
max_page_url = pagination[-1].a.get('href')
max_page_params = urlparse(max_page_url).query
dict_from_query = parse_qs(max_page_params)
max_page = int(dict_from_query['page'][0])
except (IndexError, KeyError, ValueError):
max_page = 1 # Если пагинация не найдена, обработаем только первую страницу
print(f'Тема {topic_url}: обнаружено {max_page} страниц')
# Создание задач для всех страниц
tasks = []
for page in range(1, max_page + 1):
task = process_page(session, topic_url, page)
tasks.append(task)
# Ожидание завершения всех задач для этой темы
all_articles = []
for task in tasks:
articles = await task
all_articles.extend(articles)
print(f'Завершена обработка темы {topics.index(topic_url)+1}: {len(all_articles)} статей')
return all_articles
def process_topic_sync(topic_url: str, topics: List[str]) -> List[Dict]:
"""Синхронная обертка для обработки темы в отдельном потоке"""
# Создание нового event loop для этого потока
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Запуск асинхронной функции
return loop.run_until_complete(process_topic_async(topic_url, topics))
finally:
loop.close()
async def main():
# Получение списка всех тем
async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(ssl=SSL_CERT),
timeout=aiohttp.ClientTimeout(total=30)
) as session:
try:
html = await fetch_with_retry(session, '')
except Exception as e:
print(f"Не удалось получить список тем: {e}")
return
soup = bs4.BeautifulSoup(html, 'html.parser')
topics_elements = soup.find('div', {'id': 'content'}).find_all(class_='topic')
topics = [item.a.get('href') for item in topics_elements]
print(f'Найдено {len(topics)} тем для обработки')
# Определение оптимального количества рабочих потоков
max_workers = min(15, len(topics))
print(f'Используется {max_workers} рабочих потоков')
# Запуск обработки в ThreadPoolExecutor
start_time = time.time()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Отправка задач в потоки
futures = []
for topic in topics:
future = executor.submit(process_topic_sync, topic, topics)
futures.append(future)
# Сбор результатов по мере завершения
total_articles = 0
for future in as_completed(futures):
try:
articles = future.result()
items_list.extend(articles)
total_articles += len(articles)
print(f'Всего обработано статей: {total_articles}')
except Exception as e:
print(f'Ошибка при обработке темы: {e}')
end_time = time.time()
print('\n**************************')
print(f'Всего обработано статей: {len(items_list)}')
print(f'Время работы: {end_time - start_time:.2f} секунд')
print(f'Средняя скорость: {len(items_list)/(end_time - start_time):.2f} статей/секунду')
if __name__ == '__main__':
asyncio.run(main())
Дополнительные оптимизации
-
Управление лимитами соединений:
pythonconnector = aiohttp.TCPConnector( ssl=SSL_CERT, limit=100, # Максимальное количество соединений limit_per_host=30, # Максимальное количество соединений на хост force_close=True # Закрывать неиспользуемые соединения ) -
Добавление задержек между запросами:
pythonimport random # Внутри цикла обработки страниц await asyncio.sleep(random.uniform(0.5, 2.0)) # Случайная задержка -
Использование семафора для ограничения параллельных запросов:
pythonsemaphore = asyncio.Semaphore(50) # Максимум 50 одновременных запросов async def fetch_with_semaphore(session, url, params=None): async with semaphore: return await fetch_with_retry(session, url, params) -
Сохранение результатов по мере обработки:
pythonimport json from pathlib import Path def save_articles(articles: List[Dict], filename: str): with open(filename, 'w', encoding='utf-8') as f: json.dump(articles, f, ensure_ascii=False, indent=2) # В основной функции save_articles(items_list, 'articles.json')
Рекомендации по производительности
-
Определение оптимального количества потоков:
pythonimport multiprocessing def get_optimal_workers(): cpu_count = multiprocessing.cpu_count() return min(15, max(1, cpu_count - 1)) -
Использование асинхронного сохранения данных:
pythonasync def save_articles_async(articles: List[Dict], filename: str): loop = asyncio.get_event_loop() await loop.run_in_executor(None, save_articles, articles, filename) -
Мониторинг прогресса:
pythonfrom tqdm import tqdm # В ThreadPoolExecutor with ThreadPoolExecutor(max_workers=max_workers) as executor: with tqdm(total=len(topics), desc="Обработка тем") as pbar: for topic in topics: executor.submit(process_topic_sync, topic, topics) pbar.update(1) -
Обработка ошибок и повторные попытки:
pythonasync def fetch_with_retry( session: aiohttp.ClientSession, url: str, params: Dict = None, max_retries: int = 5, base_delay: float = 1.0 ) -> str: for attempt in range(max_retries): try: async with session.get(url, params=params, timeout=30) as response: response.raise_for_status() return await response.text() except (aiohttp.ClientError, asyncio.TimeoutError) as e: if attempt == max_retries - 1: raise delay = base_delay * (2 ** attempt) # Экспоненциальный рост задержки print(f"Повторная попытка {attempt + 1}/{max_retries} после {delay}с...") await asyncio.sleep(delay)
Такая реализация позволит вам эффективно обрабатывать 15 000 страниц с оптимальным использованием системных ресурсов и высокой скоростью выполнения.
Источники
- Web Scraping with asyncio - Python Free-Threading Guide
- Optimizing Web Scraping Speed in Python - Techniques and Best Practices
- Asynchronous Web Scraping in Python using concurrent module
- AsyncIO and concurrent.futures.ThreadPoolExecutor - Stack Overflow
- Threaded workers with AIOHTTP
- Event Loop — Python 3.14.0 documentation
- Combining Coroutines with Threads and Processes — PyMOTW 3