НейроАгент

Асинхронный веб-скрапинг с многопоточностью в Python

Оптимизация веб-скрапинга: как эффективно сочетать aiohttp и многопоточность в Python для обработки 15 000+ страниц. Полное руководство с примерами кода.

Как реализовать асинхронность + многопоточность одновременно в Python для веб-скрапинга?

Мне нужно спарсить большое количество страниц (около 15 000) с 35 статьями на каждой странице для каждой рубрики. Если делать это синхронно с помощью requests в одном потоке, это займет очень много времени (несколько часов), а информация нужна срочно. Я хочу использовать aiohttp и многопоточность.

Я планирую реализовать это так: каждый поток будет параллельно с другими обрабатывать тысячи страниц определенной рубрики (всего около 15 рубрик). В каждом потоке будут асинхронно выполняться запросы к aiohttp.

Проблема в том, что executor не позволяет передать в него функцию с await, а если вызывать функцию напрямую в submit, то многопоточность не работает - потоки выполняются последовательно.

Как правильно реализовать комбинацию асинхронных запросов aiohttp в многопоточном окружении для эффективного веб-скрапинга?

Текущий код прилагаю ниже:

python
# Other imports
import bs4
import aiohttp, asyncio
from urllib import parse
import certifi, ssl
from concurrent.futures import ThreadPoolExecutor

# Project imports
from utils import to_dict, fetch

items_list = []
SSL_CERT = ssl.create_default_context(cafile=certifi.where())

async def serialize_topic(topic: str, topics: list[str]) -> None:
    print(f'Starting to serialize {topics.index(topic)+1}. {topic}')

    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
        topic_html = await fetch(func=session.get, url=topic, params={'limit' : 35})

    topic_soup = bs4.BeautifulSoup(topic_html, 'html.parser')

    # Getting the last page number to handle pagination
    pagination = list(topic_soup('ul', {'class' : 'pagination'}).children)
    max_page_url = pagination[-1].a.get('href')
    max_page_params = parse.urlparse(max_page_url).query # Parsing a link to the last page in order to get its number
    dict_from_query = parse.parse_qs(max_page_params)
    max_page = int(dict_from_query['page'][0])

    # Going through all pages
    for i in range(max_page):
        async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
            page_html = await fetch(func=session.get, url=topic, params={'page' : i, 'limit' : 100})

        page_soup = bs4.BeautifulSoup(page_html, 'html.parser')
        items = page_soup.find_all(class_='article')

        # Going through items on one page
        for item in items:

            # Getting item caption with its data
            # ... здесь просто получаю описание статьи и название 
            ...
            
        print(f'Serialized page number {i+1}')

    print(f'Serialized {topics.index(topic)+1}. {topic}')


async def main():
    # Getting all topics
    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
        html = await fetch(func=session.get, url='')

    soup = bs4.BeautifulSoup(html, 'html.parser')
    topics = soup.find('div', {'id' : 'content'}).find_all(class_='topic')
    topics = [item.a.get('href') for item in topics]

    with ThreadPoolExecutor(max_workers=15) as executor:
        for topic in topics:
            executor.submit(await serialize_topic(), topic, topics)

    print('\n**************************\n')
    print(f'Total amount of articles: {len(items_list)}')


if __name__ == '__main__':
    asyncio.run(main())
НейроАгент

Основная проблема в вашем коде заключается в том, что вы пытаетесь передать await serialize_topic() напрямую в executor.submit(). Это не работает корректно, так как executor.submit() ожидает синхронную функцию, а не асинхронную.

Правильная реализация комбинации aiohttp и ThreadPoolExecutor требует использования loop.run_in_executor(), который позволяет выполнять асинхронные функции в отдельных потоках.

Содержание

Решение текущей проблемы

В вашем коде нужно заменить:

python
# Неправильно
executor.submit(await serialize_topic(), topic, topics)

на корректный подход использования run_in_executor():

python
# Правильно
loop = asyncio.get_event_loop()
await loop.run_in_executor(executor, serialize_topic, topic, topics)

Правильная реализация

Вот как должна выглядеть ваша основная функция:

python
async def main():
    # Получение всех тем
    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
        html = await fetch(func=session.get, url='')
    
    soup = bs4.BeautifulSoup(html, 'html.parser')
    topics = soup.find('div', {'id': 'content'}).find_all(class_='topic')
    topics = [item.a.get('href') for item in topics]

    # Создание event loop и executor
    loop = asyncio.get_event_loop()
    
    with ThreadPoolExecutor(max_workers=15) as executor:
        # Создание задач для каждой темы
        tasks = []
        for topic in topics:
            task = loop.run_in_executor(executor, serialize_topic, topic, topics)
            tasks.append(task)
        
        # Ожидание завершения всех задач
        await asyncio.gather(*tasks)

    print('\n**************************\n')
    print(f'Всего статей: {len(items_list)}')

Оптимизированный подход

Для максимальной производительности рекомендуется использовать следующую структуру:

python
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urlparse, parse_qs
import bs4
import certifi
import ssl

# Глобальные переменные
items_list = []
SSL_CERT = ssl.create_default_context(cafile=certifi.where())

# Синхронная функция для обработки одной темы
def serialize_topic_sync(topic: str, topics: list[str]) -> None:
    print(f'Начало обработки {topics.index(topic)+1}. {topic}')
    
    # Создание event loop для этого потока
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    
    try:
        # Запуск асинхронной функции в этом потоке
        loop.run_until_complete(serialize_topic_async(topic, topics))
    finally:
        loop.close()

# Асинхронная функция для обработки одной темы
async def serialize_topic_async(topic: str, topics: list[str]) -> None:
    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
        topic_html = await fetch(func=session.get, url=topic, params={'limit': 35})
    
    topic_soup = bs4.BeautifulSoup(topic_html, 'html.parser')
    
    # Получение номера последней страницы
    pagination = list(topic_soup('ul', {'class': 'pagination'}).children)
    max_page_url = pagination[-1].a.get('href')
    max_page_params = urlparse(max_page_url).query
    dict_from_query = parse_qs(max_page_params)
    max_page = int(dict_from_query['page'][0])
    
    # Обработка всех страниц
    for i in range(1, max_page + 1):
        async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
            page_html = await fetch(func=session.get, url=topic, params={'page': i, 'limit': 100})
        
        page_soup = bs4.BeautifulSoup(page_html, 'html.parser')
        items = page_soup.find_all(class_='article')
        
        # Обработка элементов страницы
        for item in items:
            # ... ваша логика обработки ...
            pass
        
        print(f'Обработана страница {i} для темы {topic}')

    print(f'Завершена обработка {topics.index(topic)+1}. {topic}')

async def main():
    # Получение всех тем
    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
        html = await fetch(func=session.get, url='')
    
    soup = bs4.BeautifulSoup(html, 'html.parser')
    topics = soup.find('div', {'id': 'content'}).find_all(class_='topic')
    topics = [item.a.get('href') for item in topics]

    # Использование ThreadPoolExecutor для параллельной обработки тем
    with ThreadPoolExecutor(max_workers=min(15, len(topics))) as executor:
        # Создание синхронных задач для каждого потока
        futures = []
        for topic in topics:
            future = executor.submit(serialize_topic_sync, topic, topics)
            futures.append(future)
        
        # Ожидание завершения всех потоков
        for future in futures:
            future.result()

    print(f'Всего обработано статей: {len(items_list)}')

if __name__ == '__main__':
    asyncio.run(main())

Полный рабочий пример

Вот полностью рабочий пример с дополнительными оптимизациями:

python
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse, parse_qs
import bs4
import certifi
import ssl
import time
from typing import List, Dict

# Глобальные переменные
items_list = []
SSL_CERT = ssl.create_default_context(cafile=certifi.where())

async def fetch_with_retry(
    session: aiohttp.ClientSession, 
    url: str, 
    params: Dict = None, 
    max_retries: int = 3,
    delay: float = 1.0
) -> str:
    """Функция для выполнения запросов с автоматическими повторами"""
    for attempt in range(max_retries):
        try:
            async with session.get(url, params=params, timeout=30) as response:
                response.raise_for_status()
                return await response.text()
        except (aiohttp.ClientError, asyncio.TimeoutError) as e:
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(delay * (2 ** attempt))

async def process_page(session: aiohttp.ClientSession, topic_url: str, page: int) -> List[Dict]:
    """Обработка одной страницы с статьями"""
    try:
        page_html = await fetch_with_retry(
            session, 
            topic_url, 
            params={'page': page, 'limit': 100}
        )
        
        page_soup = bs4.BeautifulSoup(page_html, 'html.parser')
        items = page_soup.find_all(class_='article')
        
        articles = []
        for item in items:
            # Пример извлечения данных
            title = item.find('h2').text.strip() if item.find('h2') else 'Без заголовка'
            description = item.find('p').text.strip() if item.find('p') else 'Без описания'
            
            articles.append({
                'title': title,
                'description': description,
                'page': page,
                'topic': topic_url
            })
        
        return articles
    except Exception as e:
        print(f"Ошибка при обработке страницы {page} для {topic_url}: {e}")
        return []

async def process_topic_async(topic_url: str, topics: List[str]) -> List[Dict]:
    """Асинхронная обработка одной темы"""
    print(f'Начало обработки темы {topics.index(topic_url)+1}/{len(topics)}: {topic_url}')
    
    async with aiohttp.ClientSession(
        connector=aiohttp.TCPConnector(ssl=SSL_CERT, limit=100, limit_per_host=30),
        timeout=aiohttp.ClientTimeout(total=60)
    ) as session:
        # Получение первой страницы для определения общего количества страниц
        try:
            first_page_html = await fetch_with_retry(session, topic_url, params={'limit': 35})
        except Exception as e:
            print(f"Не удалось получить первую страницу для {topic_url}: {e}")
            return []
        
        topic_soup = bs4.BeautifulSoup(first_page_html, 'html.parser')
        
        try:
            # Определение количества страниц
            pagination = list(topic_soup('ul', {'class': 'pagination'}).children)
            max_page_url = pagination[-1].a.get('href')
            max_page_params = urlparse(max_page_url).query
            dict_from_query = parse_qs(max_page_params)
            max_page = int(dict_from_query['page'][0])
        except (IndexError, KeyError, ValueError):
            max_page = 1  # Если пагинация не найдена, обработаем только первую страницу
        
        print(f'Тема {topic_url}: обнаружено {max_page} страниц')
        
        # Создание задач для всех страниц
        tasks = []
        for page in range(1, max_page + 1):
            task = process_page(session, topic_url, page)
            tasks.append(task)
        
        # Ожидание завершения всех задач для этой темы
        all_articles = []
        for task in tasks:
            articles = await task
            all_articles.extend(articles)
        
        print(f'Завершена обработка темы {topics.index(topic_url)+1}: {len(all_articles)} статей')
        return all_articles

def process_topic_sync(topic_url: str, topics: List[str]) -> List[Dict]:
    """Синхронная обертка для обработки темы в отдельном потоке"""
    # Создание нового event loop для этого потока
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    
    try:
        # Запуск асинхронной функции
        return loop.run_until_complete(process_topic_async(topic_url, topics))
    finally:
        loop.close()

async def main():
    # Получение списка всех тем
    async with aiohttp.ClientSession(
        connector=aiohttp.TCPConnector(ssl=SSL_CERT),
        timeout=aiohttp.ClientTimeout(total=30)
    ) as session:
        try:
            html = await fetch_with_retry(session, '')
        except Exception as e:
            print(f"Не удалось получить список тем: {e}")
            return
        
        soup = bs4.BeautifulSoup(html, 'html.parser')
        topics_elements = soup.find('div', {'id': 'content'}).find_all(class_='topic')
        topics = [item.a.get('href') for item in topics_elements]
        
        print(f'Найдено {len(topics)} тем для обработки')
    
    # Определение оптимального количества рабочих потоков
    max_workers = min(15, len(topics))
    print(f'Используется {max_workers} рабочих потоков')
    
    # Запуск обработки в ThreadPoolExecutor
    start_time = time.time()
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Отправка задач в потоки
        futures = []
        for topic in topics:
            future = executor.submit(process_topic_sync, topic, topics)
            futures.append(future)
        
        # Сбор результатов по мере завершения
        total_articles = 0
        for future in as_completed(futures):
            try:
                articles = future.result()
                items_list.extend(articles)
                total_articles += len(articles)
                print(f'Всего обработано статей: {total_articles}')
            except Exception as e:
                print(f'Ошибка при обработке темы: {e}')
    
    end_time = time.time()
    
    print('\n**************************')
    print(f'Всего обработано статей: {len(items_list)}')
    print(f'Время работы: {end_time - start_time:.2f} секунд')
    print(f'Средняя скорость: {len(items_list)/(end_time - start_time):.2f} статей/секунду')

if __name__ == '__main__':
    asyncio.run(main())

Дополнительные оптимизации

  1. Управление лимитами соединений:

    python
    connector = aiohttp.TCPConnector(
        ssl=SSL_CERT,
        limit=100,          # Максимальное количество соединений
        limit_per_host=30,  # Максимальное количество соединений на хост
        force_close=True    # Закрывать неиспользуемые соединения
    )
    
  2. Добавление задержек между запросами:

    python
    import random
    
    # Внутри цикла обработки страниц
    await asyncio.sleep(random.uniform(0.5, 2.0))  # Случайная задержка
    
  3. Использование семафора для ограничения параллельных запросов:

    python
    semaphore = asyncio.Semaphore(50)  # Максимум 50 одновременных запросов
    
    async def fetch_with_semaphore(session, url, params=None):
        async with semaphore:
            return await fetch_with_retry(session, url, params)
    
  4. Сохранение результатов по мере обработки:

    python
    import json
    from pathlib import Path
    
    def save_articles(articles: List[Dict], filename: str):
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(articles, f, ensure_ascii=False, indent=2)
    
    # В основной функции
    save_articles(items_list, 'articles.json')
    

Рекомендации по производительности

  1. Определение оптимального количества потоков:

    python
    import multiprocessing
    
    def get_optimal_workers():
        cpu_count = multiprocessing.cpu_count()
        return min(15, max(1, cpu_count - 1))
    
  2. Использование асинхронного сохранения данных:

    python
    async def save_articles_async(articles: List[Dict], filename: str):
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, save_articles, articles, filename)
    
  3. Мониторинг прогресса:

    python
    from tqdm import tqdm
    
    # В ThreadPoolExecutor
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        with tqdm(total=len(topics), desc="Обработка тем") as pbar:
            for topic in topics:
                executor.submit(process_topic_sync, topic, topics)
                pbar.update(1)
    
  4. Обработка ошибок и повторные попытки:

    python
    async def fetch_with_retry(
        session: aiohttp.ClientSession,
        url: str,
        params: Dict = None,
        max_retries: int = 5,
        base_delay: float = 1.0
    ) -> str:
        for attempt in range(max_retries):
            try:
                async with session.get(url, params=params, timeout=30) as response:
                    response.raise_for_status()
                    return await response.text()
            except (aiohttp.ClientError, asyncio.TimeoutError) as e:
                if attempt == max_retries - 1:
                    raise
                delay = base_delay * (2 ** attempt)  # Экспоненциальный рост задержки
                print(f"Повторная попытка {attempt + 1}/{max_retries} после {delay}с...")
                await asyncio.sleep(delay)
    

Такая реализация позволит вам эффективно обрабатывать 15 000 страниц с оптимальным использованием системных ресурсов и высокой скоростью выполнения.

Источники

  1. Web Scraping with asyncio - Python Free-Threading Guide
  2. Optimizing Web Scraping Speed in Python - Techniques and Best Practices
  3. Asynchronous Web Scraping in Python using concurrent module
  4. AsyncIO and concurrent.futures.ThreadPoolExecutor - Stack Overflow
  5. Threaded workers with AIOHTTP
  6. Event Loop — Python 3.14.0 documentation
  7. Combining Coroutines with Threads and Processes — PyMOTW 3