Другое

Полное решение проблем с памятью при многопоточности в PySpark

Узнайте, как исправлять ошибки OOM при многопоточности в PySpark, когда запросы выполняются успешно, но таблицы не содержат данных. Полное руководство с решениями, лучшими практиками и примерами кода.

Проблема с многопоточностью в PySpark: Запросы сообщают об успехе, но конечная таблица не содержит данных несмотря на ошибки OOM

Я столкнулся с проблемой при использовании многопоточности в PySpark, когда некоторые запросы сообщают об успехе, но в конечной таблице отсутствуют данные. В Spark UI видно, что некоторые задачи в стадиях сообщают об ошибках OOM (Out of Memory), однако общая задача считается успешной.

Вот моя реализация:

python
def run_single_rule(self, log):
    try:
        dataset = self.spark.sql(sql_filter)
        result_count = dataset.count()
        print(
            f"""
              Statement for building rule [{log.rule_id}] result:
              {sql_filter}
              dataset after processed contains {result_count} records
              """
        )
        write_to_uc_table(dataset, self.job_output)

    except Exception:
        logger.warning(
            f"""
                       rule_id = {log.rule_id} has failed with log:
                       {full_error_log}
                       """
        )
    return update_values
python
with ThreadPoolExecutor(max_workers=10) as executor:
    future_job_runs = {
        executor.submit(self.run_single_rule, query): query
        for query in not_processed_log
    }
    wait(future_job_runs)
    for future in as_completed(future_job_runs):
        log = future_job_runs[future]
        try:
            log_records.append(future.result())
        except Exception as exc:
            print(f"rule_id: {log.rule_id}  generated an exception: {exc}")
        else:
            print(
                f"Finist log_id: {log.rule_id} finised with result: {future.result()}"
            )
python
def write_to_uc_table(df, job_output: JobOutput):
    writer = df.write
    if job_output.partition_columns:
        writer = writer.partitionBy(job_output.partition_columns)
    if job_output.write_mode == WriteMode.OVERWRITE_PARTITION:
        writer = writer.option("partitionOverwriteMode", "dynamic")
        writer = writer.mode("overwrite")
    else:
        writer = writer.mode(job_output.write_mode)
    if job_output.options:
        for k, v in job_output.options.items():
            writer = writer.option(k, v)
    writer.saveAsTable(
        f"{job_output.target_catalog}.{job_output.target_schema}.{job_output.target_table}"
    )

Например, один запрос показывает:

Statement for building rule [10030006] result:
SELECT ... FROM ...
dataset after processed contains 650048 records

Но конечный результат показывает:

Finist log_id: 10030006 finised with result: {'job_start_time': datetime.datetime(2025, 10, 31, 1, 7, 2, 469565, tzinfo=datetime.timezone.utc), 'log_id': '5763b7d8-b5ee-11f0-a43c-00163eb2a776', 'result_record_count': None, 'state': 'FAILED', 'job_end_time': datetime.datetime(2025, 10, 31, 1, 7, 25, 763043, tzinfo=datetime.timezone.utc)}

Сталкивался ли кто-нибудь с подобными случаями в PySpark при многопоточности, когда запросы кажутся успешными, но конечная таблица не содержит данных из-за ошибок OOM? Что может вызывать такое поведение, и есть ли какие-либо рекомендуемые решения или обходные пути?

Содержание


Понимание основной причины

Основная проблема связана с параллельным потреблением памяти при одновременном выполнении нескольких операций PySpark. Как объясняет Gupta Akashdeep, “Отправка слишком большого количества заданий одновременно может привести к высокому потреблению памяти драйвера”.

В вашей реализации ThreadPoolExecutor(max_workers=10) создает 10 параллельных потоков, каждый из которых потенциально:

  • Загружает данные в память
  • Обрабатывает SQL-запросы
  • Записывает результаты в таблицы
  • Хранит промежуточные наборы данных

Когда емкость памяти драйвера превышена, задачи Spark завершаются с ошибками OOM, но ваша обработка исключений может не перехватывать все сценарии сбоев, что приводит к вводящим в заблуждение сообщениям об “успехе”.

Проблемы управления памятью

Нагрузка на память драйвера

JVM драйвера управляет планированием задач и координирует работу с исполнителями. При 10 параллельных потоках, конкурирующих за память, возникают несколько проблем:

  1. Исчерпание пространства в куче: Как отмечено в анализе ReviewInsights, “Выделение слишком большого количества памяти драйверу снижает ресурсы, доступные другим исполнителям и всей операционной системе, что потенциально может снизить общую производительность.”

  2. Статические границы памяти: Согласно анализу Омкара, “Ключевая проблема статического управления памятью заключалась в жестких границах между выполнением и хранением. Если память для хранения не использовалась полностью, задачи выполнения не могли заимствовать эту память, что приводило к неэффективному использованию памяти.”

  3. Конфигурация памяти исполнителя: Как предупреждает Unravel Data, “Если это значение установлено на более высокое значение без должного учета памяти, исполнители могут завершиться с ошибкой OOM.”

Точки потребления памяти в вашем коде

python
dataset = self.spark.sql(sql_filter)
result_count = dataset.count()
write_to_uc_table(dataset, self.job_output)

Операция count() заставляет данные собираться на драйвере, а write_to_uc_table() может удерживать данные в памяти в процессе записи.

Проблемы параллельной записи

Ваша функция write_to_uc_table с режимом перезаписи разделов создает дополнительные проблемы параллелизма:

python
if job_output.write_mode == WriteMode.OVERWRITE_PARTITION:
    writer = writer.option("partitionOverwriteMode", "dynamic")
    writer = writer.mode("overwrite")

Как предупреждает Gupta Akashdeep, “Будьте осторожны при записи в таблицу одновременно, это может привести к ошибкам ConcurrentModification или неожиданным результатам, особенно когда вы выполняете partitionOverwrite.”

Когда несколько потоков одновременно пытаются записать в одну и ту же таблицу в режиме перезаписи, вы можете получить:

  • Состояние гонки в управлении разделами
  • Частичные записи, которые выглядят завершенными
  • Тихие сбои, которые не распространяются вверх до вашего обработчика исключений

Недостатки обработки исключений

Ваша текущая обработка исключений имеет несколько уязвимостей:

python
except Exception:
    logger.warning(f"""
                       rule_id = {log.rule_id} has failed with log:
                       {full_error_log}
                       """)

Проблема в том, что ошибки OOM в Spark не всегда распространяются как исключения Python. Задачи завершаются сбоем на стороне исполнителя, но драйвер может не обнаружить эти сбои немедленно. Как отмечено в исследовании, “Обработка исключений крайне важна в таких сценариях, то есть обработка исключений при вызове future.result(), так как это упрощает отладку и помогает понять, что именно вызывает проблему.”

Ваш вызов future.result() может возвращать результат без возбуждения исключения, даже когда базовая задача Spark завершилась сбоем.

Решения и лучшие практики

1. Сокращение параллелизма и реализация пакетной обработки

Вместо одновременного выполнения всех запросов реализуйте контролируемый подход:

python
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def run_rules_in_batches(self, not_processed_log, batch_size=3):
    """Обработка правил в контролируемых пакетах для управления памятью"""
    for i in range(0, len(not_processed_log), batch_size):
        batch = not_processed_log[i:i + batch_size]
        
        with ThreadPoolExecutor(max_workers=min(batch_size, 5)) as executor:
            future_job_runs = {
                executor.submit(self.run_single_rule, query): query
                for query in batch
            }
            
            for future in as_completed(future_job_runs):
                log = future_job_runs[future]
                try:
                    result = future.result()
                    # Проверяем, что результат действительно содержит данные
                    if result and result.get('result_record_count') == 0:
                        logger.warning(f"Правило {log.rule_id} завершено с 0 записями")
                except Exception as exc:
                    logger.error(f"Правило {log.rule_id} не выполнено: {exc}")
        
        # Добавляем задержку между пакетами для восстановления памяти
        time.sleep(2)

2. Улучшенная конфигурация памяти

Настройте параметры памяти Spark соответствующим образом:

python
# Конфигурируем параметры памяти перед созданием Spark сессии
spark.conf.set("spark.driver.memory", "4g")
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.executor.memoryOverhead", "1g")
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

3. Улучшенная обработка исключений с проверкой задач Spark

python
def run_single_rule_with_validation(self, log):
    try:
        dataset = self.spark.sql(sql_filter)
        result_count = dataset.count()
        
        # Дополнительная проверка
        if result_count == 0:
            logger.warning(f"Правило {log.rule_id} вернуло 0 записей")
            return None
            
        print(f"""
        Инструкция для построения правила [{log.rule_id}] результат:
        {sql_filter}
        набор данных после обработки содержит {result_count} записей
        """)
        
        write_to_uc_table(dataset, self.job_output)
        
        # Проверяем, что запись действительно удалась
        verify_write_success = self.verify_table_write(
            job_output.target_table,
            expected_count=result_count
        )
        
        if not verify_write_success:
            logger.error(f"Проверка записи правила {log.rule_id} не удалась")
            return None
            
        return update_values
        
    except Exception as e:
        logger.error(f"Правило {log.rule_id} не выполнено с исключением: {str(e)}")
        return None

def verify_table_write(self, table_name, expected_count):
    """Проверяем, что запись в таблицу действительно удалась"""
    try:
        actual_count = self.spark.sql(f"SELECT COUNT(*) FROM {table_name}").collect()[0][0]
        return actual_count == expected_count
    except Exception:
        return False

4. Последовательная альтернатива для критических операций

Для операций, которые должны гарантированно выполняться успешно, рассмотрите последовательный подход:

python
def run_rules_sequentially(self, not_processed_log):
    """Последовательная обработка для целостности данных"""
    successful_logs = []
    
    for log in not_processed_log:
        try:
            result = self.run_single_rule_with_validation(log)
            if result:
                successful_logs.append(result)
            else:
                logger.error(f"Обработка правила {log.rule_id} не удалась")
        except Exception as e:
            logger.error(f"Правило {log.rule_id} столкнулось с исключением: {e}")
    
    return successful_logs

5. Мониторинг и логирование событий OOM

Добавьте комплексное логирование для обнаружения событий OOM:

python
import logging

# Настраиваем детальное логирование
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

class SparkOOMMonitor:
    def __init__(self, spark):
        self.spark = spark
        
    def check_oom_events(self):
        """Проверяем наличие событий OOM в Spark UI"""
        try:
            # Получаем статус приложения
            app_status = self.spark.sparkContext.statusTracker().getActiveJobsIds()
            # В реальной реализации вы будете анализировать логи Spark UI
            # или использовать инструменты мониторинга для обнаружения событий OOM
            return self._parse_ui_logs_for_oom()
        except Exception:
            return False
            
    def _parse_ui_logs_for_oom(self):
        """Анализируем логи на наличие индикаторов OOM"""
        # Реализация будет зависеть от вашей настройки логирования
        # Это заглушка для фактической реализации
        return False

Рекомендуемые изменения в реализации

На основе анализа, вот измененная версия вашего кода, которая решает указанные проблемы:

python
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

class PySparkRuleProcessor:
    def __init__(self, spark, max_concurrent_jobs=3):
        self.spark = spark
        self.max_concurrent_jobs = max_concurrent_jobs
        
    def process_rules(self, not_processed_log):
        """Обработка правил с контролируемым параллелизмом и лучшей обработкой ошибок"""
        successful_results = []
        
        # Обрабатываем пакетами для управления памятью
        for i in range(0, len(not_processed_log), self.max_concurrent_jobs):
            batch = not_processed_log[i:i + self.max_concurrent_jobs]
            
            with ThreadPoolExecutor(max_workers=len(batch)) as executor:
                future_to_log = {
                    executor.submit(self._safe_run_rule, log): log 
                    for log in batch
                }
                
                for future in as_completed(future_to_log):
                    log = future_to_log[future]
                    try:
                        result = future.result(timeout=300)  # таймаут 5 минут
                        if result:
                            successful_results.append(result)
                            print(f"Правило {log.rule_id} успешно обработано")
                        else:
                            print(f"Обработка правила {log.rule_id} не удалась (нет результата)")
                    except Exception as exc:
                        print(f"Правило {log.rule_id} не выполнено с исключением: {exc}")
            
            # Позволяем восстановление памяти между пакетами
            time.sleep(1)
            
        return successful_results
    
    def _safe_run_rule(self, log):
        """Безопасное выполнение одного правила с комплексной обработкой ошибок"""
        try:
            sql_filter = self._get_sql_filter(log)
            dataset = self.spark.sql(sql_filter)
            
            # Проверяем набор данных перед продолжением
            if dataset.rdd.isEmpty():
                print(f"Правило {log.rule_id}: Возвращен пустой набор данных")
                return None
                
            result_count = dataset.count()
            print(f"Правило {log.rule_id}: обработано {result_count} записей")
            
            # Запись с обработкой ошибок
            try:
                write_to_uc_table(dataset, self.job_output)
                
                # Проверяем, что запись действительно удалась
                if not self._verify_table_write(self.job_output.target_table, result_count):
                    print(f"Правило {log.rule_id}: Проверка записи не удалась")
                    return None
                    
                return self._update_values_with_success(log, result_count)
                
            except Exception as write_exc:
                print(f"Правило {log.rule_id}: Запись не удалась - {write_exc}")
                return None
                
        except Exception as exc:
            print(f"Правило {log.rule_id}: Обработка не удалась - {exc}")
            return None
    
    def _verify_table_write(self, table_name, expected_count):
        """Проверяем, что запись в таблицу действительно удалась"""
        try:
            actual_count = self.spark.sql(f"SELECT COUNT(*) FROM {table_name}").collect()[0][0]
            return actual_count == expected_count
        except Exception:
            return False
    
    def _update_values_with_success(self, log, result_count):
        """Обновляем значения с информацией об успешной обработке"""
        return {
            'job_start_time': datetime.datetime.now(datetime.timezone.utc),
            'log_id': log.log_id,
            'result_record_count': result_count,
            'state': 'SUCCESS',
            'job_end_time': datetime.datetime.now(datetime.timezone.utc)
        }

Источники

  1. Enhancing Spark Job Performance with Multithreading - Gupta Akashdeep
  2. Why Memory Management is Causing Your Spark Apps To Be Slow or Fail - Unravel Data
  3. Fix PySpark ‘Java Heap Space’ OOM Errors: Optimal Driver Memory Setup - ReviewInsights
  4. Understanding & Fixing Spark Driver and Executor Out of Memory (OOM) Errors - Omkar
  5. Tackling Out Of Memory (OOM) Errors In PySpark - B V Sarath Chandra

Заключение

Проблема с многопоточностью PySpark, когда запросы сообщают об успехе, но конечные таблицы не содержат данных несмотря на ошибки OOM, в основном вызвана истощением памяти во время параллельных операций и недостаточной обработкой исключений. Ключевые рекомендации включают:

  1. Сокращение параллелизма путем обработки заданий в меньших пакетах (3-5 заданий за раз)
  2. Реализация правильной конфигурации памяти с соответствующими настройками драйвера и исполнителя
  3. Добавление проверки записи для подтверждения, что данные действительно достигают целевой таблицы
  4. Улучшение обработки исключений для обнаружения и правильного отчета о событиях OOM
  5. Рассмотрение последовательной обработки для критических операций с данными

Реализуя эти изменения, вы сможете правильно обнаруживать и обрабатывать ошибки OOM, гарантируя, что успешные операции действительно производят ожидаемые результаты в ваших целевых таблицах.

Авторы
Проверено модерацией
Модерация