confluentinc/confluent-kafka-python

AvroDeserializer reader schema cannot ignore values required in writer schema

Open

#887 建立於 2020年5月29日

在 GitHub 查看
 (1 留言) (4 反應) (0 負責人)Python (3,526 star) (909 fork)batch import
component:schema-registrycomponent:serdesgood first issue

描述

Description

I'm using AvroDeserializer to deserialize records from a topic and I'm running into issues when confluent-kafka-python calls fastavro in the deserialization function:

obj_dict = schemaless_reader(payload,
    writer_schema,
    self._reader_schema)

(link to the source code line)

The reader_schema that I pass in omits a field that is present in the writer_schema because I'm not interested in reading it. However, this results in fastavro raising the following exception:

  File "fastavro/_read.pyx", line 835, in fastavro._read.schemaless_reader
  File "fastavro/_read.pyx", line 846, in fastavro._read.schemaless_reader
  File "fastavro/_read.pyx", line 561, in fastavro._read._read_data
  File "fastavro/_read.pyx", line 472, in fastavro._read.read_record
  File "fastavro/_read.pyx", line 559, in fastavro._read._read_data
  File "fastavro/_read.pyx", line 413, in fastavro._read.read_union
  File "fastavro/_read.pyx", line 73, in fastavro._read.match_types
  File "fastavro/_read.pyx", line 127, in fastavro._read.match_schemas
fastavro._read_common.SchemaResolutionError: Schema mismatch: {'avro.java.string': 'String', 'type': 'string'} is not null

I suppose this makes sense from fastavro point-of-view because the purpose of the schemaless_reader reader_schema is for "schema migration" (see here) and a required field cannot just be dropped. From a Kafka topic reader's point of view, it doesn't make sense as a reader might only be interested in a subset of data.

Personally, my issue would be solved by allowing AvroDeserializer to allow a None value for the reader_schema and in that case ignoring reader_schema completely by calling fastavro like this:

obj_dict = schemaless_reader(payload,
    writer_schema,
    None)

Or do you feel this should be addressed with fastavro?

How to reproduce

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): latest (1.4.2)
  • Apache Kafka broker version: Confluent Cloud
  • Client configuration: {...}
  • Operating system: Ubuntu 18.04 (under WSL1)
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue

貢獻者指南