Полное решение проблем с памятью при многопоточности в PySpark
Узнайте, как исправлять ошибки OOM при многопоточности в PySpark, когда запросы выполняются успешно, но таблицы не содержат данных. Полное руководство с решениями, лучшими практиками и примерами кода.
Проблема с многопоточностью в PySpark: Запросы сообщают об успехе, но конечная таблица не содержит данных несмотря на ошибки OOM
Я столкнулся с проблемой при использовании многопоточности в PySpark, когда некоторые запросы сообщают об успехе, но в конечной таблице отсутствуют данные. В Spark UI видно, что некоторые задачи в стадиях сообщают об ошибках OOM (Out of Memory), однако общая задача считается успешной.
Вот моя реализация:
def run_single_rule(self, log):
try:
dataset = self.spark.sql(sql_filter)
result_count = dataset.count()
print(
f"""
Statement for building rule [{log.rule_id}] result:
{sql_filter}
dataset after processed contains {result_count} records
"""
)
write_to_uc_table(dataset, self.job_output)
except Exception:
logger.warning(
f"""
rule_id = {log.rule_id} has failed with log:
{full_error_log}
"""
)
return update_values
with ThreadPoolExecutor(max_workers=10) as executor:
future_job_runs = {
executor.submit(self.run_single_rule, query): query
for query in not_processed_log
}
wait(future_job_runs)
for future in as_completed(future_job_runs):
log = future_job_runs[future]
try:
log_records.append(future.result())
except Exception as exc:
print(f"rule_id: {log.rule_id} generated an exception: {exc}")
else:
print(
f"Finist log_id: {log.rule_id} finised with result: {future.result()}"
)
def write_to_uc_table(df, job_output: JobOutput):
writer = df.write
if job_output.partition_columns:
writer = writer.partitionBy(job_output.partition_columns)
if job_output.write_mode == WriteMode.OVERWRITE_PARTITION:
writer = writer.option("partitionOverwriteMode", "dynamic")
writer = writer.mode("overwrite")
else:
writer = writer.mode(job_output.write_mode)
if job_output.options:
for k, v in job_output.options.items():
writer = writer.option(k, v)
writer.saveAsTable(
f"{job_output.target_catalog}.{job_output.target_schema}.{job_output.target_table}"
)
Например, один запрос показывает:
Statement for building rule [10030006] result:
SELECT ... FROM ...
dataset after processed contains 650048 records
Но конечный результат показывает:
Finist log_id: 10030006 finised with result: {'job_start_time': datetime.datetime(2025, 10, 31, 1, 7, 2, 469565, tzinfo=datetime.timezone.utc), 'log_id': '5763b7d8-b5ee-11f0-a43c-00163eb2a776', 'result_record_count': None, 'state': 'FAILED', 'job_end_time': datetime.datetime(2025, 10, 31, 1, 7, 25, 763043, tzinfo=datetime.timezone.utc)}
Сталкивался ли кто-нибудь с подобными случаями в PySpark при многопоточности, когда запросы кажутся успешными, но конечная таблица не содержит данных из-за ошибок OOM? Что может вызывать такое поведение, и есть ли какие-либо рекомендуемые решения или обходные пути?
Содержание
- Понимание основной причины
- Проблемы управления памятью
- Проблемы параллельной записи
- Недостатки обработки исключений
- Решения и лучшие практики
- Рекомендуемые изменения в реализации
Понимание основной причины
Основная проблема связана с параллельным потреблением памяти при одновременном выполнении нескольких операций PySpark. Как объясняет Gupta Akashdeep, “Отправка слишком большого количества заданий одновременно может привести к высокому потреблению памяти драйвера”.
В вашей реализации ThreadPoolExecutor(max_workers=10) создает 10 параллельных потоков, каждый из которых потенциально:
- Загружает данные в память
- Обрабатывает SQL-запросы
- Записывает результаты в таблицы
- Хранит промежуточные наборы данных
Когда емкость памяти драйвера превышена, задачи Spark завершаются с ошибками OOM, но ваша обработка исключений может не перехватывать все сценарии сбоев, что приводит к вводящим в заблуждение сообщениям об “успехе”.
Проблемы управления памятью
Нагрузка на память драйвера
JVM драйвера управляет планированием задач и координирует работу с исполнителями. При 10 параллельных потоках, конкурирующих за память, возникают несколько проблем:
-
Исчерпание пространства в куче: Как отмечено в анализе ReviewInsights, “Выделение слишком большого количества памяти драйверу снижает ресурсы, доступные другим исполнителям и всей операционной системе, что потенциально может снизить общую производительность.”
-
Статические границы памяти: Согласно анализу Омкара, “Ключевая проблема статического управления памятью заключалась в жестких границах между выполнением и хранением. Если память для хранения не использовалась полностью, задачи выполнения не могли заимствовать эту память, что приводило к неэффективному использованию памяти.”
-
Конфигурация памяти исполнителя: Как предупреждает Unravel Data, “Если это значение установлено на более высокое значение без должного учета памяти, исполнители могут завершиться с ошибкой OOM.”
Точки потребления памяти в вашем коде
dataset = self.spark.sql(sql_filter)
result_count = dataset.count()
write_to_uc_table(dataset, self.job_output)
Операция count() заставляет данные собираться на драйвере, а write_to_uc_table() может удерживать данные в памяти в процессе записи.
Проблемы параллельной записи
Ваша функция write_to_uc_table с режимом перезаписи разделов создает дополнительные проблемы параллелизма:
if job_output.write_mode == WriteMode.OVERWRITE_PARTITION:
writer = writer.option("partitionOverwriteMode", "dynamic")
writer = writer.mode("overwrite")
Как предупреждает Gupta Akashdeep, “Будьте осторожны при записи в таблицу одновременно, это может привести к ошибкам ConcurrentModification или неожиданным результатам, особенно когда вы выполняете partitionOverwrite.”
Когда несколько потоков одновременно пытаются записать в одну и ту же таблицу в режиме перезаписи, вы можете получить:
- Состояние гонки в управлении разделами
- Частичные записи, которые выглядят завершенными
- Тихие сбои, которые не распространяются вверх до вашего обработчика исключений
Недостатки обработки исключений
Ваша текущая обработка исключений имеет несколько уязвимостей:
except Exception:
logger.warning(f"""
rule_id = {log.rule_id} has failed with log:
{full_error_log}
""")
Проблема в том, что ошибки OOM в Spark не всегда распространяются как исключения Python. Задачи завершаются сбоем на стороне исполнителя, но драйвер может не обнаружить эти сбои немедленно. Как отмечено в исследовании, “Обработка исключений крайне важна в таких сценариях, то есть обработка исключений при вызове future.result(), так как это упрощает отладку и помогает понять, что именно вызывает проблему.”
Ваш вызов future.result() может возвращать результат без возбуждения исключения, даже когда базовая задача Spark завершилась сбоем.
Решения и лучшие практики
1. Сокращение параллелизма и реализация пакетной обработки
Вместо одновременного выполнения всех запросов реализуйте контролируемый подход:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def run_rules_in_batches(self, not_processed_log, batch_size=3):
"""Обработка правил в контролируемых пакетах для управления памятью"""
for i in range(0, len(not_processed_log), batch_size):
batch = not_processed_log[i:i + batch_size]
with ThreadPoolExecutor(max_workers=min(batch_size, 5)) as executor:
future_job_runs = {
executor.submit(self.run_single_rule, query): query
for query in batch
}
for future in as_completed(future_job_runs):
log = future_job_runs[future]
try:
result = future.result()
# Проверяем, что результат действительно содержит данные
if result and result.get('result_record_count') == 0:
logger.warning(f"Правило {log.rule_id} завершено с 0 записями")
except Exception as exc:
logger.error(f"Правило {log.rule_id} не выполнено: {exc}")
# Добавляем задержку между пакетами для восстановления памяти
time.sleep(2)
2. Улучшенная конфигурация памяти
Настройте параметры памяти Spark соответствующим образом:
# Конфигурируем параметры памяти перед созданием Spark сессии
spark.conf.set("spark.driver.memory", "4g")
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.executor.memoryOverhead", "1g")
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
3. Улучшенная обработка исключений с проверкой задач Spark
def run_single_rule_with_validation(self, log):
try:
dataset = self.spark.sql(sql_filter)
result_count = dataset.count()
# Дополнительная проверка
if result_count == 0:
logger.warning(f"Правило {log.rule_id} вернуло 0 записей")
return None
print(f"""
Инструкция для построения правила [{log.rule_id}] результат:
{sql_filter}
набор данных после обработки содержит {result_count} записей
""")
write_to_uc_table(dataset, self.job_output)
# Проверяем, что запись действительно удалась
verify_write_success = self.verify_table_write(
job_output.target_table,
expected_count=result_count
)
if not verify_write_success:
logger.error(f"Проверка записи правила {log.rule_id} не удалась")
return None
return update_values
except Exception as e:
logger.error(f"Правило {log.rule_id} не выполнено с исключением: {str(e)}")
return None
def verify_table_write(self, table_name, expected_count):
"""Проверяем, что запись в таблицу действительно удалась"""
try:
actual_count = self.spark.sql(f"SELECT COUNT(*) FROM {table_name}").collect()[0][0]
return actual_count == expected_count
except Exception:
return False
4. Последовательная альтернатива для критических операций
Для операций, которые должны гарантированно выполняться успешно, рассмотрите последовательный подход:
def run_rules_sequentially(self, not_processed_log):
"""Последовательная обработка для целостности данных"""
successful_logs = []
for log in not_processed_log:
try:
result = self.run_single_rule_with_validation(log)
if result:
successful_logs.append(result)
else:
logger.error(f"Обработка правила {log.rule_id} не удалась")
except Exception as e:
logger.error(f"Правило {log.rule_id} столкнулось с исключением: {e}")
return successful_logs
5. Мониторинг и логирование событий OOM
Добавьте комплексное логирование для обнаружения событий OOM:
import logging
# Настраиваем детальное логирование
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
class SparkOOMMonitor:
def __init__(self, spark):
self.spark = spark
def check_oom_events(self):
"""Проверяем наличие событий OOM в Spark UI"""
try:
# Получаем статус приложения
app_status = self.spark.sparkContext.statusTracker().getActiveJobsIds()
# В реальной реализации вы будете анализировать логи Spark UI
# или использовать инструменты мониторинга для обнаружения событий OOM
return self._parse_ui_logs_for_oom()
except Exception:
return False
def _parse_ui_logs_for_oom(self):
"""Анализируем логи на наличие индикаторов OOM"""
# Реализация будет зависеть от вашей настройки логирования
# Это заглушка для фактической реализации
return False
Рекомендуемые изменения в реализации
На основе анализа, вот измененная версия вашего кода, которая решает указанные проблемы:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
class PySparkRuleProcessor:
def __init__(self, spark, max_concurrent_jobs=3):
self.spark = spark
self.max_concurrent_jobs = max_concurrent_jobs
def process_rules(self, not_processed_log):
"""Обработка правил с контролируемым параллелизмом и лучшей обработкой ошибок"""
successful_results = []
# Обрабатываем пакетами для управления памятью
for i in range(0, len(not_processed_log), self.max_concurrent_jobs):
batch = not_processed_log[i:i + self.max_concurrent_jobs]
with ThreadPoolExecutor(max_workers=len(batch)) as executor:
future_to_log = {
executor.submit(self._safe_run_rule, log): log
for log in batch
}
for future in as_completed(future_to_log):
log = future_to_log[future]
try:
result = future.result(timeout=300) # таймаут 5 минут
if result:
successful_results.append(result)
print(f"Правило {log.rule_id} успешно обработано")
else:
print(f"Обработка правила {log.rule_id} не удалась (нет результата)")
except Exception as exc:
print(f"Правило {log.rule_id} не выполнено с исключением: {exc}")
# Позволяем восстановление памяти между пакетами
time.sleep(1)
return successful_results
def _safe_run_rule(self, log):
"""Безопасное выполнение одного правила с комплексной обработкой ошибок"""
try:
sql_filter = self._get_sql_filter(log)
dataset = self.spark.sql(sql_filter)
# Проверяем набор данных перед продолжением
if dataset.rdd.isEmpty():
print(f"Правило {log.rule_id}: Возвращен пустой набор данных")
return None
result_count = dataset.count()
print(f"Правило {log.rule_id}: обработано {result_count} записей")
# Запись с обработкой ошибок
try:
write_to_uc_table(dataset, self.job_output)
# Проверяем, что запись действительно удалась
if not self._verify_table_write(self.job_output.target_table, result_count):
print(f"Правило {log.rule_id}: Проверка записи не удалась")
return None
return self._update_values_with_success(log, result_count)
except Exception as write_exc:
print(f"Правило {log.rule_id}: Запись не удалась - {write_exc}")
return None
except Exception as exc:
print(f"Правило {log.rule_id}: Обработка не удалась - {exc}")
return None
def _verify_table_write(self, table_name, expected_count):
"""Проверяем, что запись в таблицу действительно удалась"""
try:
actual_count = self.spark.sql(f"SELECT COUNT(*) FROM {table_name}").collect()[0][0]
return actual_count == expected_count
except Exception:
return False
def _update_values_with_success(self, log, result_count):
"""Обновляем значения с информацией об успешной обработке"""
return {
'job_start_time': datetime.datetime.now(datetime.timezone.utc),
'log_id': log.log_id,
'result_record_count': result_count,
'state': 'SUCCESS',
'job_end_time': datetime.datetime.now(datetime.timezone.utc)
}
Источники
- Enhancing Spark Job Performance with Multithreading - Gupta Akashdeep
- Why Memory Management is Causing Your Spark Apps To Be Slow or Fail - Unravel Data
- Fix PySpark ‘Java Heap Space’ OOM Errors: Optimal Driver Memory Setup - ReviewInsights
- Understanding & Fixing Spark Driver and Executor Out of Memory (OOM) Errors - Omkar
- Tackling Out Of Memory (OOM) Errors In PySpark - B V Sarath Chandra
Заключение
Проблема с многопоточностью PySpark, когда запросы сообщают об успехе, но конечные таблицы не содержат данных несмотря на ошибки OOM, в основном вызвана истощением памяти во время параллельных операций и недостаточной обработкой исключений. Ключевые рекомендации включают:
- Сокращение параллелизма путем обработки заданий в меньших пакетах (3-5 заданий за раз)
- Реализация правильной конфигурации памяти с соответствующими настройками драйвера и исполнителя
- Добавление проверки записи для подтверждения, что данные действительно достигают целевой таблицы
- Улучшение обработки исключений для обнаружения и правильного отчета о событиях OOM
- Рассмотрение последовательной обработки для критических операций с данными
Реализуя эти изменения, вы сможете правильно обнаруживать и обрабатывать ошибки OOM, гарантируя, что успешные операции действительно производят ожидаемые результаты в ваших целевых таблицах.