Convert GenericRecord objects between schemas

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Convert GenericRecord objects between schemas

Brian Barefoot Burns
Hi,

I am trying to multiplex GenericRecord objects to a set of DataFileWriter objects. Each DataFileWriter expects its own schema, and the GenericRecord objects are written with a "superset" schema that is compatible with the schemas assigned to the DataFileWriters. Is there a way to convert individual GenericRecord objects between schemas without re-reading them from their source (in this case a DataFileStream)? If I simply call append() on a DataFileWriter with a GenericRecord, I get an exception complaining that the schema of the GenericRecord does not match the DataFileWriter. If I set the reader schema in the DataFileStream to the schema expected by a given DataFileWriter, I do not get these errors, but I would prefer to not have to open the stream multiple times to write out the records to the different writers. I feel like there should be some obvious tool that leverages schema evolution that could do this, but I cannot find it if it exists.

Thank you!
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Convert GenericRecord objects between schemas

Brian Barefoot Burns
I have some more info to share regarding my problem. I investigated
further why I was getting errors appending GenericRecord objects to
certain DataFileWriters. I expected that the DataFileWriter would
correctly resolve the schemas between the GenericRecord I try to append
and the schema expected by the DataFileWriter itself. Although the
schema expected by the DataFileWriter is a strict subset of the schema
of the GenericRecord object in terms of the named fields, the
writeWithoutConversion method in GenericDatumWriter uses the index
positions when resolving unions instead of the field names, which
causes this problem when the writer and reader schemas do not have
field names and types in the same order. Consider the two schemas
below, where the first schema is the "master" schema and the second
schema is a "subset" schema containing fields cherry-picked from the
master schema. Notice that "subset" is backward-compatible with
"superset". That is, data written with "superset" can be read with
"subset" according to the Avro specification.

{
    "name":"superset",
    "type":"record","fields":[
        {"name":"version","type":["int", "null"],"default":1},
        {"name":"hostname","type":["string", "null"],"default":"null"},
        {"name":"id","type":"int"},
        {"name":"name","type":"string"},
        {"name":"timestamp","type":"long"},
        {"name":"uptime","type":["int", "null"]},
        {"name":"lat","type":["double", "null"]},
        {"name":"long","type":["double", "null"]},
        {"name":"alt","type":["int", "null"]},
        {"name":"vel","type":["int", "null"]},
        {"name":"status","type":["int", "null"]},
        {"name":"state","type":["string", "null"]}
    ]
}

{
    "name":"subset",
    "type":"record","fields":[
        {"name":"version","type":["int", "null"],"default":1},
        {"name":"id","type":"int"},
        {"name":"name","type":"string"},
        {"name":"timestamp","type":"long"},
        {"name":"state","type":["string", "null"]}
    ]
}

If I have a GenericRecord with schema "superset" and I wish to append
it using a DataFileWriter with schema "subset", the GenericDatumWriter
of the DataFileWriter will traverse "subset", picking each field from
the GenericRecord object with schema "superset" and encode it to the
appropriate output stream. The first field, "version", works correctly,
but when the GenericDatumWriter gets to the second field, "id", it uses
the index position 1 (for a zero-indexed list) and retrieves the datum
at index 1 from the GenericRecord object. In this case, it retrieves
the value assigned to "hostname" from the superset schema and tries to
encode into "id" in the subset-schemed record. This causes a type
mismatch, as a string cannot be coerced into an int, and raises a
runtime exception. -- I may be misunderstanding the behavior of the
code in GenericDatumWriter, but the explanation I have given here is
consistent with my observations of the code behavior and logging
output.

It is not clear to me whether this behavior is a bug based on the
Avro specication or whether it is expected based on the implementation
of the Java API. Certainly the "subset" schema in my example is
compatible with "superset" according to the Avro specification -- and
reading files written with superset, setting the reader schema to
subset works as expected.

I want to achieve the expected behavior on a per-record basis without
re-opening byte streams multiple times. In my use case I do not have
files on disk but am working with streams over the network, and
re-opening the streams in order to translate and send a single record
to multiple destinations is too expensive. In the interim I have
written a simple function to do the conversion/translation/evolution
work I want. It is far from ideal and only safe based on other
assertions I can make about my system and software environment as a
whole. The function is given below and might more clearly illustrate my
intention.

GenericRecord translateRecord(GenericRecord record, Schema targetSchema) {

    GenericRecordBuilder recordBuilder =
   new GenericRecordBuilder(targetSchema);

    for (Schema.Field f : targetSchema.getFields()) {
        recordBuilder.set(f, record.get(f.name()));
    }

    return recordBuilder.build();
}
Loading...