Python Kafka не видит headers?
Python Kafka не получает заголовки от Apache Kafka. Как воспроизвести проблему?
Я пытаюсь прочитать заголовки через Kafka Consumer:
consumer = KafkaConsumer(
"mz-core-document-manager-responses",
auto_offset_reset="latest",
group_id=None,
api_version=(0, 10, 1),
bootstrap_servers=[
....
],
)
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print(
"%s:%d:%d: key=%s headers=%s"
% (
message.topic,
message.partition,
message.offset,
message.key,
"".join(message.headers),
)
)
Kafka упорно получает только payload без заголовков.
После исследования обнаружила, что BrokerConnection отсылает запрос, например:
FetchRequest_v3(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, topics=[(topic=‘my-topic’, partitions=[(partition=0, offset=19857795, max_bytes=1048576), (partition=3, offset=18443170, max_bytes=1048576)])]) для BrokerConnection, который его принимает. Но в ответе уже нет заголовков.
Пример ответа:
b’\x00\x00\x00F\x00\x00\x00\x01\x00\x00\x00\x01\x00"***x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\xfaOc’
Есть ссылка на open source баг в Kafka.
Подскажите, куда копать для решения этой проблемы?
Python Kafka не видит заголовки из-за проблем с поддержкой сообщений с заголовками в библиотеке confluent-kafka, особенно в версиях до 1.0.0. Это известная проблема, которая может быть воспроизведена путем отправки сообщений с заголовками и попытки их чтения через KafkaConsumer.
Содержание
- Основные причины проблемы
- Как воспроизвести проблему
- Проверка совместимости версий
- Решения и обходные пути
- Альтернативные библиотеки
- Дополнительные рекомендации
Основные причины проблемы
Проблема с заголовками в Python Kafka связана с несколькими факторами:
-
Ограничения библиотеки confluent-kafka: Согласно документации Confluent, “Currently message headers are not supported on the message returned to the callback. The msg.headers() will return None even if the original message had headers set.” источник
-
Версионная совместимость: Требуется librdkafka >= v0.11.4 и broker версия >= 0.11.0.0 для корректной работы с заголовками источник
-
Проблемы в реализации: В issue #574 сообщается, что
msg.headers()всегда возвращаетNoneдаже при установке заголовков в продюсере источник -
Формат сообщений: Как вы заметили, запросы FetchRequest_v3 отправляются корректно, но в ответе отсутствуют заголовки, что указывает на проблему на уровне протокола или десериализации.
Как воспроизвести проблему
Вот полный пример кода для воспроизведения проблемы:
from confluent_kafka import Producer, Consumer, KafkaException
import json
# Настройка продюсера
producer_conf = {
'bootstrap.servers': 'localhost:9092',
'api.version.request': True,
'api.version.fallback.ms': 0
}
# Настройка консьюмера
consumer_conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'test-group',
'auto.offset_reset': 'earliest',
'enable.auto.commit': False,
'api.version.request': True,
'api.version.fallback.ms': 0
}
try:
# Создаем продюсера
producer = Producer(producer_conf)
# Создаем консьюмера
consumer = Consumer(consumer_conf)
consumer.subscribe(['test-headers-topic'])
# Отправляем сообщение с заголовками
topic = 'test-headers-topic'
headers = [('header-key', 'header-value'), ('source', 'python-test')]
print("Отправка сообщения с заголовками...")
producer.produce(topic, value='test message', headers=headers)
producer.flush()
print("Попытка получения сообщения...")
msg = consumer.poll(timeout=10.0)
if msg is None:
print("Сообщения не получены")
elif msg.error():
print(f"Ошибка при получении сообщения: {msg.error()}")
else:
print(f"Тема: {msg.topic()}")
print(f"Партиция: {msg.partition()}")
print(f"Смещение: {msg.offset()}")
print(f"Ключ: {msg.key()}")
print(f"Значение: {msg.value().decode('utf-8')}")
print(f"Заголовки: {msg.headers()}") # Должно показать None или пусто
finally:
consumer.close()
producer.flush()
Ожидаемый результат: msg.headers() вернет None или пустой список, несмотря на то что заголовки были отправлены.
Проверка совместимости версий
Проверьте совместимость версий Kafka:
| Компонент | Минимальная версия | Рекомендуемая версия |
|---|---|---|
| Kafka Broker | 0.11.0.0 | 2.8+ |
| librdkafka | 0.11.4 | 1.9+ |
| confluent-kafka | 1.0.0 | 2.0+ |
Для проверки версий:
from confluent_kafka import KafkaException, TopicPartition
import confluent_kafka as ck
# Проверка версии librdkafka
print(f"librdkafka version: {ck.libversion()}")
# Проверка версии API
try:
admin_client = ck.AdminClient({'bootstrap.servers': 'localhost:9092'})
cluster_metadata = admin_client.list_topics(timeout=10)
print(f"Кластер доступен, API версия: {cluster_metadata.orig_broker_api_versions}")
except KafkaException as e:
print(f"Ошибка подключения: {e}")
Решения и обходные пути
1. Обновление confluent-kafka
pip install --upgrade confluent-kafka
2. Ручная сериализация заголовков
Если обновление не помогает, можно использовать обходной путь с сериализацией заголовков в значение:
import json
import base64
# Отправка с кодировкой заголовков в значение
headers_data = {'my-header': 'header-value'}
encoded_headers = base64.b64encode(json.dumps(headers_data).encode()).decode()
producer.produce(
topic,
value=json.dumps({'data': 'message', 'headers': encoded_headers}).encode(),
key='test-key'
)
# Получение и декодирование
msg = consumer.poll()
if msg:
try:
data = json.loads(msg.value().decode())
headers = json.loads(base64.b64decode(data['headers']).decode())
print(f"Заголовки: {headers}")
except (json.JSONDecodeError, KeyError) as e:
print(f"Ошибка декодирования: {e}")
3. Использование Kafka Admin API для диагностики
from confluent_kafka import AdminClient, KafkaException
def check_kafka_features(bootstrap_servers):
admin_client = AdminClient({'bootstrap.servers': bootstrap_servers})
try:
cluster_metadata = admin_client.list_topics(timeout=10)
print("Доступные топики:", list(cluster_metadata.topics.keys()))
# Проверка поддержки API
if hasattr(cluster_metadata, 'orig_broker_api_versions'):
print("Версии API брокера:", cluster_metadata.orig_broker_api_versions)
except KafkaException as e:
print(f"Ошибка при проверке: {e}")
check_kafka_features('localhost:9092')
4. Проверка конфигурации консьюмера
consumer_conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'test-group',
'auto.offset_reset': 'earliest',
'enable.auto.commit': False,
'api.version.request': True,
'api.version.fallback.ms': 0,
'fetch.max.bytes': 1048576, # Увеличенный размер для получения заголовков
'fetch.max.wait.ms': 500,
'fetch.min.bytes': 1,
'check.crcs': False, # Может помочь с производительностью
'message.max.bytes': 10485760 # Максимальный размер сообщения
}
Альтернативные библиотеки
Если проблема persists, рассмотрите использование альтернативных библиотек:
1. kafka-python
pip install kafka-python
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'test-headers-topic',
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
enable_auto_commit=False,
consumer_timeout_ms=1000
)
for message in consumer:
print(f"Заголовки: {message.headers}")
print(f"Значение: {message.value.decode('utf-8')}")
2. aiokafka (асинхронная работа)
pip install aiokafka
import asyncio
from aiokafka import AIOKafkaConsumer
async def consume_messages():
consumer = AIOKafkaConsumer(
'test-headers-topic',
bootstrap_servers='localhost:9092',
group_id='async-group',
auto_offset_reset='earliest'
)
await consumer.start()
try:
async for msg in consumer:
print(f"Заголовки: {msg.headers}")
print(f"Значение: {msg.value.decode('utf-8')}")
finally:
await consumer.stop()
asyncio.run(consume_messages())
Дополнительные рекомендации
1. Проверка брокера Kafka
Убедитесь, что ваш Kafka брокер поддерживает заголовки:
# Проверка версии Kafka
kafka-broker-api-versions.sh --bootstrap-server localhost:9092
# Создание топика с поддержкой заголовков
kafka-topics.sh --create --topic test-headers-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 --config message.format.version=2.8-IV2
2. Мониторинг сетевого трафика
Используйте Wireshark для анализа сетевого трафика между вашим клиентом и Kafka брокером, чтобы увидеть, действительно ли заголовки отправляются в запросах.
3. Тестирование с консольными утилитами
# Отправка сообщения с заголовками через консоль
kafka-console-producer.sh --broker-list localhost:9092 --topic test-headers-topic --property parse.headers=true --property headers.separator=,
# Получение сообщений
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-headers-topic --from-beginning --property print.headers=true
4. Отчет об ошибке
Если проблема подтверждается, создайте отчет в репозитории confluent-kafka:
-
Соберите информацию о версиях:
- confluent-kafka:
pip show confluent-kafka - librdkafka:
ck.libversion() - Kafka broker:
kafka-broker-api-versions.sh
- confluent-kafka:
-
Минимальный пример воспроизведения
-
Логи ошибок
-
Информация о конфигурации
Эта проблема активно обсуждается в сообществе, и регулярные обновления библиотеки постепенно улучшают поддержку заголовков. Следите за выпусками новых версий confluent-kafka на GitHub.
Источники
- Confluent Kafka Python Documentation - Overview
- GitHub Issue #574 - msg.headers none in the produce callback
- GitHub Issue #287 - Support for Message headers
- Stack Overflow - Kafka python consumer not reading the message from topics
- Confluent Documentation - Supported Versions and Interoperability
- PyPI - confluent-kafka vs kafka-python comparison
Заключение
Проблема с заголовками в Python Kafka является известной особенностью библиотеки confluent-kafka, особенно в старых версиях. Основные рекомендации:
- Обновите confluent-kafka до последней версии (минимум 1.0.0, лучше 2.0+)
- Проверьте совместимость версий Kafka broker и librdkafka
- В качестве обходного пути используйте кодирование заголовков в значение сообщения
- Рассмотрите альтернативные библиотеки (kafka-python, aiokafka) если проблема не решается
- Создайте отчет об ошибке в репозитории confluent-kafka с полным примером воспроизведения
Регулярно проверяйте обновления библиотеки, так как поддержка заголовков постепенно улучшается в новых релизах.