Flask: рассылка без ошибок соединения с несколькими доменами
Узнайте, как отправлять массовые письма в Flask через несколько доменов без ошибок соединения и блокировки портов. Руководство с кодом и практиками доставки.
Как отправлять массовые письма в Flask, используя несколько доменов электронной почты, не сталкиваясь с ошибками соединения и блокировкой портов? Я реализую систему для отправки 140 000 писем с четырьмя разными доменами, но при одновременном использовании всех доменов я получаю ошибки «Connection refused» и блокировку портов. Ниже приведена моя текущая реализация:
import os
import re
import time
from datetime import datetime
from smtplib import SMTPRecipientsRefused
import pandas as pd
from celery import Celery
from celery.schedules import crontab
from flask import Flask, render_template, render_template_string
import mysql.connector
from flask_mail import Mail, Message
flask_app = Flask(__name__)
flask_app.template_folder = 'templates'
celery = Celery(
'tasks',
broker='redis://localhost:6380/0',
backend='redis://localhost:6380/0'
)
celery.conf.timezone = 'Asia/Kolkata'
celery.conf.beat_schedule = {
'send_worker1': {
'task': 'tasks.send_worker1',
'schedule': crontab(minute=27, hour=15),
},
'send_worker2': {
'task': 'tasks.send_worker2',
'schedule': crontab(minute=27, hour=15),
},
'send_worker3': {
'task': 'tasks.send_worker3',
'schedule': crontab(minute=15, hour=15),
},
'send_worker4': {
'task': 'tasks.send_worker4',
'schedule': crontab(minute=22, hour=16),
},
'send_mail_emp': {
'task': 'tasks.send_mail_emp',
'schedule': crontab(minute=25, hour=11),
}
}
mail_configs = {
"viskohr1": {
"MAIL_SERVER": "mail.example.com",
"MAIL_USERNAME": "example@example.com",
"MAIL_PASSWORD": "example@example",
"MAIL_DEFAULT_SENDER": "example@example.com",
"MAIL_USE_TLS": True,
"MAIL_PORT": 587
},
"viskohr2": {
"MAIL_SERVER": "mail.example.in",
"MAIL_USERNAME": "example@example.in",
"MAIL_PASSWORD": "example@example",
"MAIL_DEFAULT_SENDER": "example@example.in",
"MAIL_USE_TLS": True,
"MAIL_PORT": 587
},
"viskohr3": {
"MAIL_SERVER": "mail.example.group",
"MAIL_USERNAME": "example@example.group",
"MAIL_PASSWORD": "example@example",
"MAIL_DEFAULT_SENDER": "example@example.group",
"MAIL_USE_TLS": True,
"MAIL_PORT": 587
},
"viskohr4": {
"MAIL_SERVER": "mail.example-marketing.com",
"MAIL_USERNAME": "example@example-marketing.com",
"MAIL_PASSWORD": "example@example",
"MAIL_DEFAULT_SENDER": "example@example-marketing.com",
"MAIL_USE_TLS": True,
"MAIL_PORT": 587
},
}
# Database connection function
def get_db_connection(database):
"""Return MySQL connection."""
return mysql.connector.connect(
host="xhgvjgdegfjsdfgjsdfsdjf.amazonaws.com",
database=database,
user="msdfgndsgsdg",
password="zmfld,xgmlxmfdg",
port=3306
)
def apply_mail_config(config_name):
"""Apply mail config and return Mail object."""
config = mail_configs.get(config_name)
if not config:
raise ValueError(f"Invalid mail config name: {config_name}")
for key, value in config.items():
flask_app.config[key] = value
mail = Mail(flask_app)
mail.debug = False
return mail
def get_email_candidate():
"""Read Excel and return valid email list."""
current_dir = os.getcwd()
path = os.path.join(current_dir, "Gujarat.xlsx")
if not os.path.exists(path):
raise FileNotFoundError("Excel file 'Gujarat.xlsx' not found in current directory.")
target_sheets = ["Sheet1(2)", "Sheet1"]
pattern = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
valid_records = []
for sheet in target_sheets:
try:
df = pd.read_excel(path, sheet_name=sheet)
except Exception as e:
print(f"⚠️ Could not read sheet '{sheet}': {e}")
continue
if 'Email' not in df.columns or 'Name' not in df.columns:
print(f"⚠️ Missing required columns in '{sheet}'")
continue
for _, row in df.iterrows():
email = str(row['Email']).strip()
name = str(row['Name']).strip()
if pattern.fullmatch(email):
valid_records.append({"name": name, "email": email})
all_unique_valid = {rec['email']: rec for rec in valid_records}
unique_valid = sorted(all_unique_valid.values(), key=lambda x: x['email'])
print(f"✅ Total valid unique emails: {len(unique_valid)}")
return list(unique_valid)
def send_message_safe(mail, msg, max_retries=5):
"""Send email with retry & rate limit handling."""
attempt = 0
while attempt < max_retries:
try:
mail.send(msg)
print("✅ Email sent successfully.")
return True
except SMTPRecipientsRefused as e:
print(f"⚠️ SMTP limit hit: {e}. Waiting 1 hour...")
time.sleep(3600)
return False
except OSError as e:
win_err = getattr(e, "winerror", None)
if win_err == 10061:
attempt += 1
wait_time = min(30, 5 * attempt)
remaining = max_retries - attempt
print(
f"⚠️ Connection refused (WinError 10061). Retrying in {wait_time}s "
f"(attempt {attempt}/{max_retries}, {remaining} retries left)..."
)
time.sleep(wait_time)
continue
print(f"⚠️ OS error while sending email: {e}. Retrying in 10s...")
time.sleep(10)
attempt += 1
except Exception as e:
attempt += 1
wait_time = min(60, 5 * attempt)
print(f"⚠️ Error while sending email: {e}. Retrying in {wait_time}s...")
time.sleep(wait_time)
print("❌ Failed to send email after multiple retries.")
return
def send_emails(unique_emails, worker_name, inst):
print(f"\n🚀 Starting {worker_name} with config: {inst}")
mail = apply_mail_config(inst)
with flask_app.app_context():
db = get_db_connection("xyz")
cursor = db.cursor(dictionary=True)
try:
cursor.execute("SELECT * FROM email_templates WHERE et_name = 'JobFair-1'")
template = cursor.fetchone()
if not template:
print("❌ No email template found!")
return
print(f"📄 Using template ID: {template['et_id']}")
for user in unique_emails:
name = user["name"]
email = user["email"]
query = """select * from email_record where er_user_email=%s and er_template_id=%s"""
cursor.execute(query, (email, template["et_id"]))
existing_record = cursor.fetchone()
if existing_record:
continue
email_body = render_template_string(
template['et_template'],
name=name
)
msg = Message(
subject="Gujarat's Biggest Online Job Fair is Live. Register Now",
recipients=[email],
html=email_body
)
status=send_message_safe(mail, msg)
if status:
cursor.execute("""
INSERT INTO email_record (er_user_email, er_template_id, er_email_send_status)
VALUES (%s, %s, %s)
""", (email, template["et_id"], 1))
db.commit()
print(f"✅ [{worker_name}] Sent: {email}")
except Exception as e:
print(f"❌ [{worker_name}] Error: {e}")
finally:
cursor.close()
db.close()
@celery.task
def send_worker1():
all_emails = get_email_candidate()
send_emails(all_emails[:40000], "Worker 1", "viskohr1")
print("✅ Worker 1 completed.")
@celery.task
def send_worker2():
all_emails = get_email_candidate()
send_emails(all_emails[40000:80000], "Worker 2", "viskohr2")
print("✅ Worker 2 completed.")
@celery.task
def send_worker3():
all_emails = get_email_candidate()
send_emails(all_emails[80000:120000], "Worker 3", "viskohr3")
print("✅ Worker 3 completed.")
@celery.task
def send_worker4():
all_emails = get_email_candidate()
send_emails(all_emails[120000:], "Worker 4", "viskohr4")
print("✅ Worker 4 completed.")
Какие лучшие практики и решения помогут избежать ошибок соединения и блокировки портов при использовании нескольких доменов электронной почты для массовой рассылки в Flask?
Содержание
- [Понимание корневых причин](#понимание-корневых-ц причин)
- Лучшие практики управления соединениями
- Стратегия балансировки нагрузки по доменам
- Надёжное управление ошибками и механизмами повторных попыток
- Конфигурация сервера и доступ к портам
- Улучшения мониторинга и логирования
- Полностью пересмотренная реализация
Понимание корневых причин
Ваша текущая реализация сталкивается с несколькими проблемами, которые приводят к ошибкам соединения и блокировке портов:
-
Перегрузка одновременных соединений: Когда все четыре воркера Celery запускаются одновременно, они создают несколько SMTP‑соединений к разным доменам электронной почты, перегружая доступные порты сервера и вызывая защиту межсетевого экрана.
-
Ограничения скорости поставщиков почты: Большинство почтовых сервисов (включая ваши собственные домены) имеют строгие лимиты отправки, которые, скорее всего, вы превышаете, что приводит к временным или постоянным блокировкам.
-
Блокировка портов: Как отмечено в обсуждениях Reddit, облачные провайдеры часто блокируют порты SMTP (587, 465) по умолчанию по соображениям безопасности.
-
Исчерпание соединений: Без надлежащего пула соединений каждый попытка отправки письма создаёт новое соединение, быстро потребляя доступные порты.
Лучшие практики управления соединениями
Реализуйте пул соединений и постоянные соединения, чтобы избежать исчерпания портов:
from flask_mail import Mail, Message
import threading
from contextlib import contextmanager
class PooledMailManager:
def __init__(self, app, max_connections=5):
self.app = app
self.max_connections = max_connections
self.connections = {}
self.lock = threading.Lock()
@contextmanager
def get_connection(self, config_name):
with self.lock:
if config_name not in self.connections:
config = mail_configs[config_name]
for key, value in config.items():
self.app.config[key] = value
mail = Mail(self.app)
self.connections[config_name] = {
'mail': mail,
'connection': None,
'in_use': 0
}
conn_info = self.connections[config_name]
conn_info['in_use'] += 1
try:
if conn_info['connection'] is None:
conn_info['mail'] = Mail(self.app)
conn_info['connection'] = conn_info['mail'].connect()
yield conn_info['connection']
finally:
conn_info['in_use'] -= 1
if conn_info['in_use'] == 0 and conn_info['connection']:
conn_info['connection'].close()
conn_info['connection'] = None
# Пример использования
mail_manager = PooledMailManager(flask_app, max_connections=3)
Эта реализация создаёт пул соединений, который переиспользует SMTP‑соединения и правильно управляет их жизненным циклом.
Стратегия балансировки нагрузки по доменам
Вместо назначения фиксированных диапазонов адресов каждому домену, реализуйте динамическую балансировку:
class EmailDomainBalancer:
def __init__(self, domains_config, max_emails_per_hour=5000):
self.domains = list(domains_config.keys())
self.config = domains_config
self.max_emails_per_hour = max_emails_per_hour
self.domain_stats = {domain: {
'emails_sent': 0,
'last_reset': datetime.now(),
'consecutive_failures': 0,
'is_available': True
} for domain in self.domains}
def get_next_available_domain(self):
current_time = datetime.now()
# Сброс счётчиков по часам
for domain in self.domains:
stats = self.domain_stats[domain]
if (current_time - stats['last_reset']).seconds >= 3600:
stats['emails_sent'] = 0
stats['last_reset'] = current_time
stats['consecutive_failures'] = 0
stats['is_available'] = True
# Найти лучший доступный домен
available_domains = [
domain for domain in self.domains
if (self.domain_stats[domain]['is_available'] and
self.domain_stats[domain]['emails_sent'] < self.max_emails_per_hour and
self.domain_stats[domain]['consecutive_failures'] < 3)
]
if not available_domains:
raise Exception("No domains available - all exceeded limits or failed")
# Round‑robin с учётом нагрузки
selected = min(
available_domains,
key=lambda d: self.domain_stats[d]['emails_sent']
)
return selected
def record_success(self, domain):
self.domain_stats[domain]['emails_sent'] += 1
self.domain_stats[domain]['consecutive_failures'] = 0
def record_failure(self, domain):
self.domain_stats[domain]['consecutive_failures'] += 1
if self.domain_stats[domain]['consecutive_failures'] >= 3:
self.domain_stats[domain]['is_available'] = False
domain_balancer = EmailDomainBalancer(mail_configs)
Этот подход распределяет нагрузку более равномерно и автоматически исключает неработающие домены из ротации.
Надёжное управление ошибками и механизмами повторных попыток
Улучшите обработку ошибок с экспоненциальным откатом и доменными стратегиями:
import smtplib
from email.utils import formataddr
from socket import error as socket_error
def send_email_with_retry(mail_manager, balancer, msg, max_retries=3):
"""Send email with intelligent retry logic and domain balancing."""
last_exception = None
for attempt in range(max_retries):
try:
# Получить доступный домен
domain = balancer.get_next_available_domain()
# Получить соединение из пула
with mail_manager.get_connection(domain) as connection:
connection.send(msg)
# Записать успех
balancer.record_success(domain)
return True
except (smtplib.SMTPServerDisconnected, ConnectionRefusedError) as e:
last_exception = e
print(f"⚠️ Connection error with domain {domain}: {e}")
balancer.record_failure(domain)
# Экспоненциальный откат
wait_time = min(300, 2 ** attempt) # Макс 5 минут
print(f"⚠️ Retrying in {wait_time} seconds...")
time.sleep(wait_time)
except smtplib.SMTPRecipientsRefused as e:
print(f"⚠️ SMTP limit hit with domain {domain}: {e}")
balancer.record_failure(domain)
# Долгий пауза при лимитах SMTP
print("⚠️ Waiting 2 hours due to SMTP limit...")
time.sleep(7200)
return False
except Exception as e:
last_exception = e
print(f"⚠️ Unexpected error with domain {domain}: {e}")
balancer.record_failure(domain)
# Пауза и повтор
wait_time = min(60, 5 * attempt)
print(f"⚠️ Retrying in {wait_time} seconds...")
time.sleep(wait_time)
print(f"❌ Failed after {max_retries} attempts. Last error: {last_exception}")
return False
Конфигурация сервера и доступ к портам
Решите проблемы с блокировкой портов:
- Проверка доступа к порту: Убедитесь, что ваш сервер может подключиться к SMTP‑портам:
telnet smtp.gmail.com 587 telnet your-mail-server.com 587
-
Свяжитесь с вашим хостинг‑провайдером: Как упомянуто в ответах Stack Overflow, вам может понадобиться:
- Запросить разблокировку портов SMTP
- Настроить корректные записи SPF/DKIM
- Получить выделенные IP‑адреса для каждого домена
-
Альтернативные порты: Рассмотрите возможность использования разных портов для разных доменов, если они доступны.
-
Конфигурация межсетевого экрана: Убедитесь, что ваш сервер разрешает исходящий SMTP‑трафик:
# Allow outbound SMTP traffic
sudo ufw allow out 587
sudo ufw allow out 465
Улучшения мониторинга и логирования
Реализуйте всесторонний мониторинг для отслеживания производительности доставки почты:
import logging
from datetime import datetime
class EmailMonitor:
def __init__(self):
self.logger = logging.getLogger('email_monitor')
self.logger.setLevel(logging.INFO)
# Создать файловый обработчик
handler = logging.FileHandler('email_delivery.log')
handler.setLevel(logging.INFO)
# Создать форматтер
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log_email_attempt(self, email, domain, success, error=None):
status = "SUCCESS" if success else "FAILED"
message = f"[{status}] Email: {email}, Domain: {domain}"
if error:
message += f", Error: {error}"
self.logger.info(message)
def log_domain_performance(self, domain, emails_sent, failures):
performance = f"Domain {domain}: {emails_sent} sent, {failures} failures"
self.logger.info(performance)
def get_daily_summary(self):
# Парсинг логов и генерация ежедневного отчёта
# Реализация зависит от формата логов
pass
email_monitor = EmailMonitor()
Полностью пересмотренная реализация
Ниже приведена полностью пересмотренная реализация, включающая все лучшие практики:
import os
import re
import time
import logging
import threading
from datetime import datetime
from smtplib import SMTPRecipientsRefused
from contextlib import contextmanager
from email.utils import formataddr
import pandas as pd
from celery import Celery
from celery.schedules import crontab
from flask import Flask, render_template, render_template_string
import mysql.connector
from flask_mail import Mail, Message
# Инициализация Flask‑приложения
flask_app = Flask(__name__)
flask_app.template_folder = 'templates'
# Конфигурация Celery
celery = Celery(
'tasks',
broker='redis://localhost:6380/0',
backend='redis://localhost:6380/0'
)
celery.conf.timezone = 'Asia/Kolkata'
# Планировщик Beat с отложенным запуском
celery.conf.beat_schedule = {
'send_worker1': {
'task': 'tasks.send_worker1',
'schedule': crontab(minute=27, hour=15),
},
'send_worker2': {
'task': 'tasks.send_worker2',
'schedule': crontab(minute=30, hour=15), # через 3 минуты
},
'send_worker3': {
'task': 'tasks.send_worker3',
'schedule': crontab(minute=33, hour=15), # через 6 минут
},
'send_worker4': {
'task': 'tasks.send_worker4',
'schedule': crontab(minute=36, hour=15), # через 9 минут
},
'send_mail_emp': {
'task': 'tasks.send_mail_emp',
'schedule': crontab(minute=25, hour=11),
}
}
# Конфигурации почты
mail_configs = {
"viskohr1": {
"MAIL_SERVER": "mail.example.com",
"MAIL_USERNAME": "example@example.com",
"MAIL_PASSWORD": "example@example",
"MAIL_DEFAULT_SENDER": "example@example.com",
"MAIL_USE_TLS": True,
"MAIL_PORT": 587
},
"viskohr2": {
"MAIL_SERVER": "mail.example.in",
"MAIL_USERNAME": "example@example.in",
"MAIL_PASSWORD": "example@example",
"MAIL_DEFAULT_SENDER": "example@example.in",
"MAIL_USE_TLS": True,
"MAIL_PORT": 587
},
"viskohr3": {
"MAIL_SERVER": "mail.example.group",
"MAIL_USERNAME": "example@example.group",
"MAIL_PASSWORD": "example@example",
"MAIL_DEFAULT_SENDER": "example@example.group",
"MAIL_USE_TLS": True,
"MAIL_PORT": 587
},
"viskohr4": {
"MAIL_SERVER": "mail.example-marketing.com",
"MAIL_USERNAME": "example@example-marketing.com",
"MAIL_PASSWORD": "example@example",
"MAIL_DEFAULT_SENDER": "example@example-marketing.com",
"MAIL_USE_TLS": True,
"MAIL_PORT": 587
},
}
# Функция подключения к базе данных
def get_db_connection(database):
"""Return MySQL connection."""
return mysql.connector.connect(
host="xhgvjgdegfjsdfgjsdfsdjf.amazonaws.com",
database=database,
user="msdfgndsgsdg",
password="zmfld,xgmlxmfdg",
port=3306
)
# Классы управления почтой
class PooledMailManager:
def __init__(self, app, max_connections=3):
self.app = app
self.max_connections = max_connections
self.connections = {}
self.lock = threading.Lock()
@contextmanager
def get_connection(self, config_name):
with self.lock:
if config_name not in self.connections:
config = mail_configs[config_name]
for key, value in config.items():
self.app.config[key] = value
mail = Mail(self.app)
self.connections[config_name] = {
'mail': mail,
'connection': None,
'in_use': 0
}
conn_info = self.connections[config_name]
conn_info['in_use'] += 1
try:
if conn_info['connection'] is None:
conn_info['mail'] = Mail(self.app)
conn_info['connection'] = conn_info['mail'].connect()
yield conn_info['connection']
finally:
conn_info['in_use'] -= 1
if conn_info['in_use'] == 0 and conn_info['connection']:
conn_info['connection'].close()
conn_info['connection'] = None
class EmailDomainBalancer:
def __init__(self, domains_config, max_emails_per_hour=3000):
self.domains = list(domains_config.keys())
self.config = domains_config
self.max_emails_per_hour = max_emails_per_hour
self.domain_stats = {domain: {
'emails_sent': 0,
'last_reset': datetime.now(),
'consecutive_failures': 0,
'is_available': True
} for domain in self.domains}
def get_next_available_domain(self):
current_time = datetime.now()
# Сброс счётчиков по часам
for domain in self.domains:
stats = self.domain_stats[domain]
if (current_time - stats['last_reset']).seconds >= 3600:
stats['emails_sent'] = 0
stats['last_reset'] = current_time
stats['consecutive_failures'] = 0
stats['is_available'] = True
# Найти лучший доступный домен
available_domains = [
domain for domain in self.domains
if (self.domain_stats[domain]['is_available'] and
self.domain_stats[domain]['emails_sent'] < self.max_emails_per_hour and
self.domain_stats[domain]['consecutive_failures'] < 3)
]
if not available_domains:
raise Exception("No domains available - all exceeded limits or failed")
# Round‑robin с учётом нагрузки
selected = min(
available_domains,
key=lambda d: self.domain_stats[d]['emails_sent']
)
return selected
def record_success(self, domain):
self.domain_stats[domain]['emails_sent'] += 1
self.domain_stats[domain]['consecutive_failures'] = 0
def record_failure(self, domain):
self.domain_stats[domain]['consecutive_failures'] += 1
if self.domain_stats[domain]['consecutive_failures'] >= 3:
self.domain_stats[domain]['is_available'] = False
class EmailMonitor:
def __init__(self):
self.logger = logging.getLogger('email_monitor')
self.logger.setLevel(logging.INFO)
# Создать файловый обработчик
handler = logging.FileHandler('email_delivery.log')
handler.setLevel(logging.INFO)
# Создать форматтер
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log_email_attempt(self, email, domain, success, error=None):
status = "SUCCESS" if success else "FAILED"
message = f"[{status}] Email: {email}, Domain: {domain}"
if error:
message += f", Error: {error}"
self.logger.info(message)
def log_domain_performance(self, domain, emails_sent, failures):
performance = f"Domain {domain}: {emails_sent} sent, {failures} failures"
self.logger.info(performance)
# Инициализация менеджеров
mail_manager = PooledMailManager(flask_app, max_connections=3)
domain_balancer = EmailDomainBalancer(mail_configs, max_emails_per_hour=3000)
email_monitor = EmailMonitor()
# Функции отправки почты
def get_email_candidate():
"""Read Excel and return valid email list."""
current_dir = os.getcwd()
path = os.path.join(current_dir, "Gujarat.xlsx")
if not os.path.exists(path):
raise FileNotFoundError("Excel file 'Gujarat.xlsx' not found in current directory.")
target_sheets = ["Sheet1(2)", "Sheet1"]
pattern = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
valid_records = []
for sheet in target_sheets:
try:
df = pd.read_excel(path, sheet_name=sheet)
except Exception as e:
print(f"⚠️ Could not read sheet '{sheet}': {e}")
continue
if 'Email' not in df.columns or 'Name' not in df.columns:
print(f"⚠️ Missing required columns in '{sheet}'")
continue
for _, row in df.iterrows():
email = str(row['Email']).strip()
name = str(row['Name']).strip()
if pattern.fullmatch(email):
valid_records.append({"name": name, "email": email})
all_unique_valid = {rec['email']: rec for rec in valid_records}
unique_valid = sorted(all_unique_valid.values(), key=lambda x: x['email'])
print(f"✅ Total valid unique emails: {len(unique_valid)}")
return list(unique_valid)
def send_email_with_retry(msg, max_retries=3):
"""Send email with intelligent retry logic and domain balancing."""
last_exception = None
for attempt in range(max_retries):
try:
# Получить доступный домен
domain = domain_balancer.get_next_available_domain()
# Получить соединение из пула
with mail_manager.get_connection(domain) as connection:
connection.send(msg)
# Записать успех
domain_balancer.record_success(domain)
email_monitor.log_email_attempt(msg.recipients[0], domain, True)
return True
except (smtplib.SMTPServerDisconnected, ConnectionRefusedError) as e:
last_exception = e
print(f"⚠️ Connection error with domain {domain}: {e}")
domain_balancer.record_failure(domain)
email_monitor.log_email_attempt(msg.recipients[0], domain, False, str(e))
# Экспоненциальный откат
wait_time = min(300, 2 ** attempt) # Макс 5 минут
print(f"⚠️ Retrying in {wait_time} seconds...")
time.sleep(wait_time)
except smtplib.SMTPRecipientsRefused as e:
print(f"⚠️ SMTP limit hit with domain {domain}: {e}")
domain_balancer.record_failure(domain)
email_monitor.log_email_attempt(msg.recipients[0], domain, False, str(e))
# Долгий пауза при лимитах SMTP
print("⚠️ Waiting 2 hours due to SMTP limit...")
time.sleep(7200)
return False
except Exception as e:
last_exception = e
print(f"⚠️ Unexpected error with domain {domain}: {e}")
domain_balancer.record_failure(domain)
email_monitor.log_email_attempt(msg.recipients[0], domain, False, str(e))
# Пауза и повтор
wait_time = min(60, 5 * attempt)
print(f"⚠️ Retrying in {wait_time} seconds...")
time.sleep(wait_time)
print(f"❌ Failed after {max_retries} attempts. Last error: {last_exception}")
email_monitor.log_email_attempt(msg.recipients[0], domain, False, str(last_exception))
return False
def send_emails_batch(emails_batch, worker_name, batch_size=100):
"""Send emails in batches with rate limiting."""
print(f"\n🚀 Starting {worker_name} with {len(emails_batch)} emails")
with flask_app.app_context():
db = get_db_connection("xyz")
cursor = db.cursor(dictionary=True)
try:
cursor.execute("SELECT * FROM email_templates WHERE et_name = 'JobFair-1'")
template = cursor.fetchone()
if not template:
print("❌ No email template found!")
return
print(f"📄 Using template ID: {template['et_id']}")
# Отправка пакетами, чтобы не перегрузить
for i in range(0, len(emails_batch), batch_size):
batch = emails_batch[i:i + batch_size]
print(f"📦 Processing batch {i//batch_size + 1}/{(len(emails_batch)-1)//batch_size + 1}")
for user in batch:
name = user["name"]
email = user["email"]
# Проверить, уже отправлено ли письмо
query = """select * from email_record where er_user_email=%s and er_template_id=%s"""
cursor.execute(query, (email, template["et_id"]))
existing_record = cursor.fetchone()
if existing_record:
continue
# Подготовить и отправить письмо
email_body = render_template_string(
template['et_template'],
name=name
)
msg = Message(
subject="Gujarat's Biggest Online Job Fair is Live. Register Now",
recipients=[email],
html=email_body
)
# Отправка с логикой повторов
success = send_email_with_retry(msg)
if success:
cursor.execute("""
INSERT INTO email_record (er_user_email, er_template_id, er_email_send_status)
VALUES (%s, %s, %s)
""", (email, template["et_id"], 1))
db.commit()
# Ограничение скорости между письмами
time.sleep(0.1) # 100 мс между письмами
# Длинная пауза между пакетами
if i + batch_size < len(emails_batch):
print("⏸️ Pausing between batches...")
time.sleep(30) # 30 секунд между пакетами
print(f"✅ {worker_name} completed.")
except Exception as e:
print(f"❌ [{worker_name}] Error: {e}")
finally:
cursor.close()
db.close()
# Celery‑задачи с отложенным запуском
@celery.task(bind=True)
def send_worker1(self):
all_emails = get_email_candidate()
send_emails_batch(all_emails[:35000], "Worker 1", batch_size=100)
print("✅ Worker 1 completed.")
@celery.task(bind=True)
def send_worker2(self):
all_emails = get_email_candidate()
send_emails_batch(all_emails[35000:70000], "Worker 2", batch_size=100)
print("✅ Worker 2 completed.")
@celery.task(bind=True)
def send_worker3(self):
all_emails = get_email_candidate()
send_emails_batch(all_emails[70000:105000], "Worker 3", batch_size=100)
print("✅ Worker 3 completed.")
@celery.task(bind=True)
def send_worker4(self):
all_emails = get_email_candidate()
send_emails_batch(all_emails[105000:], "Worker 4", batch_size=100)
print("✅ Worker 4 completed.")
Ключевые улучшения
- Пул соединений: переиспользует SMTP‑соединения, предотвращая исчерпание портов
- Динамическая балансировка: автоматически распределяет нагрузку между доступными доменами
- Отложенный запуск: воркеры запускаются с интервалом в 3 минуты, чтобы избежать одновременных соединений
- Улучшенная обработка ошибок: доменные стратегии повторов с экспоненциальным откатом
- Ограничение скорости: корректные задержки между письмами и пакетами
- Полное логирование: подробные записи для отладки и анализа
- Меньший размер пакета: 100 писем с длительными паузами между пакетами
Источники
- Flask-Mail Tutorial with Code Snippets - Mailtrap.io
- Reddit Discussion on Flask-Mail Connection Issues
- Stack Overflow: Flask-Mail Connection Timeout Issues
- Stack Overflow: Flask-Mail Connection Closed During Bulk Email
- Stack Overflow: Flask-Mail Gmail Connection Refused
- Flask Mega-Tutorial: Email Support
Заключение
Успешная реализация массовой рассылки в Flask с несколькими доменами требует комплексного подхода, охватывающего управление соединениями, балансировку нагрузки, обработку ошибок и ограничение скорости. Ключевые выводы:
- Пул соединений: переиспользуйте SMTP‑соединения для предотвращения исчерпания портов и повышения производительности.
- Динамическая балансировка: автоматически распределяйте нагрузку между доступными доменами, учитывая их текущую производительность и лимиты.
- Отложенный запуск: избегайте одновременного запуска всех воркеров, чтобы не перегружать ресурсы сервера.
- Интеллектуальная логика повторов: используйте экспоненциальный откат и доменные стратегии для разных сценариев отказов.
- Ограничение скорости: включайте задержки между письмами и пакетами, чтобы оставаться в пределах лимитов поставщиков почты.
- Мониторинг и логирование: отслеживайте производительность доставки и ошибки для постоянного улучшения и отладки.
- Тестирование конфигурации сервера: убедитесь, что ваш хостинг‑провайдер разрешает исходящий SMTP‑трафик и рассмотрите выделенные IP‑адреса для каждого домена.
Внедрив эти лучшие практики, вы сможете надёжно отправлять 140 000 писем через несколько доменов без ошибок соединения и блокировок портов.