НейроАгент

Python Kafka не видит заголовки: причины и решения

Проблема с заголовками в Python Kafka: как воспроизвести и решить. Совместимость версий, обходные пути и альтернативные библиотеки.

Python Kafka не видит headers?

Python Kafka не получает заголовки от Apache Kafka. Как воспроизвести проблему?

Я пытаюсь прочитать заголовки через Kafka Consumer:

python
consumer = KafkaConsumer(
    "mz-core-document-manager-responses",
    auto_offset_reset="latest",
    group_id=None,
    api_version=(0, 10, 1),
    bootstrap_servers=[
         ....
    ],
)
for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print(
        "%s:%d:%d: key=%s headers=%s"
        % (
            message.topic,
            message.partition,
            message.offset,
            message.key,
            "".join(message.headers),
        )
    )

Kafka упорно получает только payload без заголовков.

После исследования обнаружила, что BrokerConnection отсылает запрос, например:
FetchRequest_v3(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, topics=[(topic=‘my-topic’, partitions=[(partition=0, offset=19857795, max_bytes=1048576), (partition=3, offset=18443170, max_bytes=1048576)])]) для BrokerConnection, который его принимает. Но в ответе уже нет заголовков.

Пример ответа:
b’\x00\x00\x00F\x00\x00\x00\x01\x00\x00\x00\x01\x00"***x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\xfaOc’

Есть ссылка на open source баг в Kafka.

Подскажите, куда копать для решения этой проблемы?

НейроАгент

Python Kafka не видит заголовки из-за проблем с поддержкой сообщений с заголовками в библиотеке confluent-kafka, особенно в версиях до 1.0.0. Это известная проблема, которая может быть воспроизведена путем отправки сообщений с заголовками и попытки их чтения через KafkaConsumer.

Содержание

Основные причины проблемы

Проблема с заголовками в Python Kafka связана с несколькими факторами:

  1. Ограничения библиотеки confluent-kafka: Согласно документации Confluent, “Currently message headers are not supported on the message returned to the callback. The msg.headers() will return None even if the original message had headers set.” источник

  2. Версионная совместимость: Требуется librdkafka >= v0.11.4 и broker версия >= 0.11.0.0 для корректной работы с заголовками источник

  3. Проблемы в реализации: В issue #574 сообщается, что msg.headers() всегда возвращает None даже при установке заголовков в продюсере источник

  4. Формат сообщений: Как вы заметили, запросы FetchRequest_v3 отправляются корректно, но в ответе отсутствуют заголовки, что указывает на проблему на уровне протокола или десериализации.

Как воспроизвести проблему

Вот полный пример кода для воспроизведения проблемы:

python
from confluent_kafka import Producer, Consumer, KafkaException
import json

# Настройка продюсера
producer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'api.version.request': True,
    'api.version.fallback.ms': 0
}

# Настройка консьюмера
consumer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'test-group',
    'auto.offset_reset': 'earliest',
    'enable.auto.commit': False,
    'api.version.request': True,
    'api.version.fallback.ms': 0
}

try:
    # Создаем продюсера
    producer = Producer(producer_conf)
    
    # Создаем консьюмера
    consumer = Consumer(consumer_conf)
    consumer.subscribe(['test-headers-topic'])
    
    # Отправляем сообщение с заголовками
    topic = 'test-headers-topic'
    headers = [('header-key', 'header-value'), ('source', 'python-test')]
    
    print("Отправка сообщения с заголовками...")
    producer.produce(topic, value='test message', headers=headers)
    producer.flush()
    
    print("Попытка получения сообщения...")
    msg = consumer.poll(timeout=10.0)
    
    if msg is None:
        print("Сообщения не получены")
    elif msg.error():
        print(f"Ошибка при получении сообщения: {msg.error()}")
    else:
        print(f"Тема: {msg.topic()}")
        print(f"Партиция: {msg.partition()}")
        print(f"Смещение: {msg.offset()}")
        print(f"Ключ: {msg.key()}")
        print(f"Значение: {msg.value().decode('utf-8')}")
        print(f"Заголовки: {msg.headers()}")  # Должно показать None или пусто
        
finally:
    consumer.close()
    producer.flush()

Ожидаемый результат: msg.headers() вернет None или пустой список, несмотря на то что заголовки были отправлены.

Проверка совместимости версий

Проверьте совместимость версий Kafka:

Компонент Минимальная версия Рекомендуемая версия
Kafka Broker 0.11.0.0 2.8+
librdkafka 0.11.4 1.9+
confluent-kafka 1.0.0 2.0+

Для проверки версий:

python
from confluent_kafka import KafkaException, TopicPartition
import confluent_kafka as ck

# Проверка версии librdkafka
print(f"librdkafka version: {ck.libversion()}")

# Проверка версии API
try:
    admin_client = ck.AdminClient({'bootstrap.servers': 'localhost:9092'})
    cluster_metadata = admin_client.list_topics(timeout=10)
    print(f"Кластер доступен, API версия: {cluster_metadata.orig_broker_api_versions}")
except KafkaException as e:
    print(f"Ошибка подключения: {e}")

Решения и обходные пути

1. Обновление confluent-kafka

bash
pip install --upgrade confluent-kafka

2. Ручная сериализация заголовков

Если обновление не помогает, можно использовать обходной путь с сериализацией заголовков в значение:

python
import json
import base64

# Отправка с кодировкой заголовков в значение
headers_data = {'my-header': 'header-value'}
encoded_headers = base64.b64encode(json.dumps(headers_data).encode()).decode()

producer.produce(
    topic,
    value=json.dumps({'data': 'message', 'headers': encoded_headers}).encode(),
    key='test-key'
)

# Получение и декодирование
msg = consumer.poll()
if msg:
    try:
        data = json.loads(msg.value().decode())
        headers = json.loads(base64.b64decode(data['headers']).decode())
        print(f"Заголовки: {headers}")
    except (json.JSONDecodeError, KeyError) as e:
        print(f"Ошибка декодирования: {e}")

3. Использование Kafka Admin API для диагностики

python
from confluent_kafka import AdminClient, KafkaException

def check_kafka_features(bootstrap_servers):
    admin_client = AdminClient({'bootstrap.servers': bootstrap_servers})
    
    try:
        cluster_metadata = admin_client.list_topics(timeout=10)
        print("Доступные топики:", list(cluster_metadata.topics.keys()))
        
        # Проверка поддержки API
        if hasattr(cluster_metadata, 'orig_broker_api_versions'):
            print("Версии API брокера:", cluster_metadata.orig_broker_api_versions)
        
    except KafkaException as e:
        print(f"Ошибка при проверке: {e}")

check_kafka_features('localhost:9092')

4. Проверка конфигурации консьюмера

python
consumer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'test-group',
    'auto.offset_reset': 'earliest',
    'enable.auto.commit': False,
    'api.version.request': True,
    'api.version.fallback.ms': 0,
    'fetch.max.bytes': 1048576,  # Увеличенный размер для получения заголовков
    'fetch.max.wait.ms': 500,
    'fetch.min.bytes': 1,
    'check.crcs': False,  # Может помочь с производительностью
    'message.max.bytes': 10485760  # Максимальный размер сообщения
}

Альтернативные библиотеки

Если проблема persists, рассмотрите использование альтернативных библиотек:

1. kafka-python

bash
pip install kafka-python
python
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'test-headers-topic',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='earliest',
    enable_auto_commit=False,
    consumer_timeout_ms=1000
)

for message in consumer:
    print(f"Заголовки: {message.headers}")
    print(f"Значение: {message.value.decode('utf-8')}")

2. aiokafka (асинхронная работа)

bash
pip install aiokafka
python
import asyncio
from aiokafka import AIOKafkaConsumer

async def consume_messages():
    consumer = AIOKafkaConsumer(
        'test-headers-topic',
        bootstrap_servers='localhost:9092',
        group_id='async-group',
        auto_offset_reset='earliest'
    )
    await consumer.start()
    try:
        async for msg in consumer:
            print(f"Заголовки: {msg.headers}")
            print(f"Значение: {msg.value.decode('utf-8')}")
    finally:
        await consumer.stop()

asyncio.run(consume_messages())

Дополнительные рекомендации

1. Проверка брокера Kafka

Убедитесь, что ваш Kafka брокер поддерживает заголовки:

bash
# Проверка версии Kafka
kafka-broker-api-versions.sh --bootstrap-server localhost:9092

# Создание топика с поддержкой заголовков
kafka-topics.sh --create --topic test-headers-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 --config message.format.version=2.8-IV2

2. Мониторинг сетевого трафика

Используйте Wireshark для анализа сетевого трафика между вашим клиентом и Kafka брокером, чтобы увидеть, действительно ли заголовки отправляются в запросах.

3. Тестирование с консольными утилитами

bash
# Отправка сообщения с заголовками через консоль
kafka-console-producer.sh --broker-list localhost:9092 --topic test-headers-topic --property parse.headers=true --property headers.separator=,

# Получение сообщений
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-headers-topic --from-beginning --property print.headers=true

4. Отчет об ошибке

Если проблема подтверждается, создайте отчет в репозитории confluent-kafka:

  1. Соберите информацию о версиях:

    • confluent-kafka: pip show confluent-kafka
    • librdkafka: ck.libversion()
    • Kafka broker: kafka-broker-api-versions.sh
  2. Минимальный пример воспроизведения

  3. Логи ошибок

  4. Информация о конфигурации

Эта проблема активно обсуждается в сообществе, и регулярные обновления библиотеки постепенно улучшают поддержку заголовков. Следите за выпусками новых версий confluent-kafka на GitHub.

Источники

  1. Confluent Kafka Python Documentation - Overview
  2. GitHub Issue #574 - msg.headers none in the produce callback
  3. GitHub Issue #287 - Support for Message headers
  4. Stack Overflow - Kafka python consumer not reading the message from topics
  5. Confluent Documentation - Supported Versions and Interoperability
  6. PyPI - confluent-kafka vs kafka-python comparison

Заключение

Проблема с заголовками в Python Kafka является известной особенностью библиотеки confluent-kafka, особенно в старых версиях. Основные рекомендации:

  1. Обновите confluent-kafka до последней версии (минимум 1.0.0, лучше 2.0+)
  2. Проверьте совместимость версий Kafka broker и librdkafka
  3. В качестве обходного пути используйте кодирование заголовков в значение сообщения
  4. Рассмотрите альтернативные библиотеки (kafka-python, aiokafka) если проблема не решается
  5. Создайте отчет об ошибке в репозитории confluent-kafka с полным примером воспроизведения

Регулярно проверяйте обновления библиотеки, так как поддержка заголовков постепенно улучшается в новых релизах.