Quantcast

Union in AvroMapper.map: Not in Union??

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Union in AvroMapper.map: Not in Union??

nir_zamir
This post was updated on .
Hi,

I have a small M/R which, for the sake of troubleshooting, just reads a single Avro file and sends it to an AvroMapper. In order to handle different schemas, I set the mapper to work with a Union Schema.
For the simplicity, the union is a union of only one schema, taken from the Avro file. This is my run function:

        public int run(String[] args) throws Exception {
                List<Schema> schemas= new ArrayList<Schema>();
                Schema schema = readSchema(new Path(inputDir), conf);
                schemas.add(schema);
        Schema unionSchema = Schema.createUnion(schemas);

                AvroJob.setInputSchema(conf, unionSchema);
                AvroJob.setMapOutputSchema(conf, Pair.getPairSchema(Schema.create(Type.INT),
                                unionSchema));

                JobClient.runJob(conf);
                return 0;
        }

And my mapper is:

        public static class MyMap extends AvroMapper <GenericRecord, Pair<Integer, GenericRecord>> {
                @Override
                public void map( GenericRecord datum, AvroCollector<Pair<Integer, GenericRecord>> collector, Reporter reporter) throws IOException {
                        collector.collect(new Pair<Integer, GenericRecord>(conversion_id, datum));
                }
        }

Side note: when I extract the schema manually from the Avro file and call GenericData.get().resolveUnion - the schema is correctly resolved and found in the union schema (the methods returns 0).


THE PROBLEM:
the 'collect' method throws an exception saying my datum's schema is not in the union:

org.apache.avro.UnresolvedUnionException: Not in union [{"type":"record","name":"path_to_conversions_raw","namespace":"org.apache.avro.mapred","doc":"Sqoop import of path_to_conversions_raw","fields":[{"name":"conversion_id","type":["long","null"],"columnName":"conversion_id","sqlType":"-5"},{"name":"conversion_time","type":["long","null"],"columnName":"conversion_time","sqlType":"93"},{"name":"derived_time","type":["long","null"],"columnName":"derived_time","sqlType":"93"},{"name":"position","type":["int","null"],"columnName":"position","sqlType":"5"},{"name":"event_time","type":["long","null"],"columnName":"event_time","sqlType":"93"},{"name":"event_type_id","type":["int","null"],"columnName":"event_type_id","sqlType":"5"},{"name":"entity_id","type":["int","null"],"columnName":"entity_id","sqlType":"4"},{"name":"entity_type_id","type":["int","null"],"columnName":"entity_type_id","sqlType":"4"},{"name":"sv_click_type","type":["int","null"],"columnName":"sv_click_type","sqlType":"5"},{"name":"placement_id","type":["int","null"],"columnName":"placement_id","sqlType":"4"},{"name":"site_id","type":["int","null"],"columnName":"site_id","sqlType":"4"},{"name":"campaign_id","type":["int","null"],"columnName":"campaign_id","sqlType":"4"},{"name":"days_before_conversion","type":["int","null"],"columnName":"days_before_conversion","sqlType":"5"},{"name":"version_id","type":["int","null"],"columnName":"version_id","sqlType":"4"},{"name":"target_audience_id","type":["int","null"],"columnName":"target_audience_id","sqlType":"4"},{"name":"ip_attributes_id","type":["long","null"],"columnName":"ip_attributes_id","sqlType":"-5"},{"name":"batch_id","type":["long","null"],"columnName":"batch_id","sqlType":"-5"},{"name":"ccs_ad_id","type":["long","null"],"columnName":"ccs_ad_id","sqlType":"-5"}],"tableName":"path_to_conversions_raw"}]: {"conversion_id": 552804, "conversion_time": 1325451223000, "derived_time": 1330639180000, "position": 2, "event_time": 1325401200000, "event_type_id": 1, "entity_id": 4035315, "entity_type_id": 1, "sv_click_type": 2, "placement_id": 1978266, "site_id": 9103, "campaign_id": 129001, "days_before_conversion": 1, "version_id": 73721, "target_audience_id": -1, "ip_attributes_id": 3598, "batch_id": 4601, "ccs_ad_id": null}
        at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:542)
        at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:137)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:70)
        at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:104)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:57)
        at org.apache.avro.mapred.AvroSerialization$AvroWrapperSerializer.serialize(AvroSerialization.java:127)
        at org.apache.avro.mapred.AvroSerialization$AvroWrapperSerializer.serialize(AvroSerialization.java:110)
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:916)
        at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:501)
        at org.apache.avro.mapred.HadoopMapper$MapCollector.collect(HadoopMapper.java:69)
        at example.avro.ColorCount$MyMap.map(ColorCount.java:51)


Any help would be appreciated..

Thanks!
Nir

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Union in AvroMapper.map: Not in Union??

nir_zamir
Help please...
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Union in AvroMapper.map: Not in Union??

Martin Kleppmann
In reply to this post by nir_zamir
Hi Nir,

I'm not an expert with the avro.mapred APIs, but as far as I know,
AvroJob does not perform schema evolution, so the schema you provide
to AvroJob.setInputSchema has to be the exact same schema with which
your input files to the mappers are encoded. So if your input isn't
actually a union type, but your schema says that it is, decoding of
the input file will fail.

If you want to consume several different input directories with
different schemas in the same MapReduce job, you might be able to
build that using Hadoop's MultipleInputs, though I don't know how well
that plays with Avro. Alternatively, you could take a look at
http://crunch.apache.org/ which provides higher-level MapReduce APIs
and is designed to play nicely with Avro.

Hope that helps,
Martin

On 24 March 2013 03:23, nir_zamir <[hidden email]> wrote:

> Hi,
>
> I have a small M/R which, for the sake of troubleshooting, just reads a
> single Avro file and sends it to an AvroMapper. In order to handle different
> schemas, I set the mapper to work with a Union Schema.
> For the simplicity, the union is a union of only one schema, taken from the
> Avro file. This is my run function:
>
>         public int run2(String[] args) throws Exception {
>                 List<Schema> schemas= new ArrayList<Schema>();
>                 Schema schema = readSchema(new Path(inputDir), conf);
>                 schemas.add(schema);
>         Schema unionSchema = Schema.createUnion(schemas);
>
>                 AvroJob.setInputSchema(conf, unionSchema);
>                 AvroJob.setMapOutputSchema(conf,
> Pair.getPairSchema(Schema.create(Type.INT),
>                                 unionSchema));
>
>                 JobClient.runJob(conf);
>                 return 0;
>         }
>
> And my mapper is:
>
>         public static class MyMap extends AvroMapper <GenericRecord,
> Pair&lt;Integer, GenericRecord>> {
>                 @Override
>                 public void map( GenericRecord datum, AvroCollector<Pair&lt;Integer,
> GenericRecord>> collector, Reporter reporter) throws IOException {
>                         collector.collect(new Pair<Integer, GenericRecord>(conversion_id,
> datum));
>                 }
>         }
>
> *Side note:* when I extract the schema manually from the Avro file and call
> GenericData.get().resolveUnion - the schema is correctly resolved and found
> in the union schema (the methods returns 0).
>
>
> *THE PROBLEM:*
> the 'collect' method throws an exception saying my datum's schema is not in
> the union:
>
> org.apache.avro.UnresolvedUnionException: Not in union
> [{"type":"record","name":"path_to_conversions_raw","namespace":"org.apache.avro.mapred","doc":"Sqoop
> import of
> path_to_conversions_raw","fields":[{"name":"conversion_id","type":["long","null"],"columnName":"conversion_id","sqlType":"-5"},{"name":"conversion_time","type":["long","null"],"columnName":"conversion_time","sqlType":"93"},{"name":"derived_time","type":["long","null"],"columnName":"derived_time","sqlType":"93"},{"name":"position","type":["int","null"],"columnName":"position","sqlType":"5"},{"name":"event_time","type":["long","null"],"columnName":"event_time","sqlType":"93"},{"name":"event_type_id","type":["int","null"],"columnName":"event_type_id","sqlType":"5"},{"name":"entity_id","type":["int","null"],"columnName":"entity_id","sqlType":"4"},{"name":"entity_type_id","type":["int","null"],"columnName":"entity_type_id","sqlType":"4"},{"name":"sv_click_type","type":["int","null"],"columnName":"sv_click_type","sqlType":"5"},{"name":"placement_id","type":["int","null"],"columnName":"placement_id","sqlType":"4"},{"name":"site_id","type":["int","null"],"columnName":"site_id","sqlType":"4"},{"name":"campaign_id","type":["int","null"],"columnName":"campaign_id","sqlType":"4"},{"name":"days_before_conversion","type":["int","null"],"columnName":"days_before_conversion","sqlType":"5"},{"name":"version_id","type":["int","null"],"columnName":"version_id","sqlType":"4"},{"name":"target_audience_id","type":["int","null"],"columnName":"target_audience_id","sqlType":"4"},{"name":"ip_attributes_id","type":["long","null"],"columnName":"ip_attributes_id","sqlType":"-5"},{"name":"batch_id","type":["long","null"],"columnName":"batch_id","sqlType":"-5"},{"name":"ccs_ad_id","type":["long","null"],"columnName":"ccs_ad_id","sqlType":"-5"}],"tableName":"path_to_conversions_raw"}]:
> {"conversion_id": 552804, "conversion_time": 1325451223000, "derived_time":
> 1330639180000, "position": 2, "event_time": 1325401200000, "event_type_id":
> 1, "entity_id": 4035315, "entity_type_id": 1, "sv_click_type": 2,
> "placement_id": 1978266, "site_id": 9103, "campaign_id": 129001,
> "days_before_conversion": 1, "version_id": 73721, "target_audience_id": -1,
> "ip_attributes_id": 3598, "batch_id": 4601, "ccs_ad_id": null}
>         at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:542)
>         at
> org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:137)
>         at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:70)
>         at
> org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:104)
>         at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:57)
>         at
> org.apache.avro.mapred.AvroSerialization$AvroWrapperSerializer.serialize(AvroSerialization.java:127)
>         at
> org.apache.avro.mapred.AvroSerialization$AvroWrapperSerializer.serialize(AvroSerialization.java:110)
>         at
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:916)
>         at
> org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:501)
>         at
> org.apache.avro.mapred.HadoopMapper$MapCollector.collect(HadoopMapper.java:69)
>         at example.avro.ColorCount$MyMap.map(ColorCount.java:51)
>
>
> Any help would be appreciated..
>
> Thanks!
> Nir
>
>
>
>
>
> --
> View this message in context: http://apache-avro.679487.n3.nabble.com/Union-in-AvroMapper-map-Not-in-Union-tp4026706.html
> Sent from the Avro - Users mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Union in AvroMapper.map: Not in Union??

nir_zamir
Hi Martin,

Thanks.

According to this, using the union should work:
http://apache-avro.679487.n3.nabble.com/Multiple-input-schemas-in-MapReduce-td2928590.html

BTW, I had a chance to debug the failing code, and the reason for it is described in a different thread I posted:
http://apache-avro.679487.n3.nabble.com/Parsing-a-Pair-s-value-inherited-namespace-td4026810.html

Thanks,
Nir
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Union in AvroMapper.map: Not in Union??

Doug Cutting
In reply to this post by Martin Kleppmann

On Tue, Apr 9, 2013 at 8:55 PM, Martin Kleppmann <[hidden email]> wrote:
AvroJob does not perform schema evolution, so the schema you provide
to AvroJob.setInputSchema has to be the exact same schema with which
your input files to the mappers are encoded.

Actually the schema you provide is used as the "reader" schema and need not exactly match the schema used to write the data.  The schema used when writing is stored in the data files.

Doug
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Union in AvroMapper.map: Not in Union??

Martin Kleppmann
On 10 April 2013 11:47, Doug Cutting <[hidden email]> wrote:
On Tue, Apr 9, 2013 at 8:55 PM, Martin Kleppmann <[hidden email]> wrote:
AvroJob does not perform schema evolution, so the schema you provide
to AvroJob.setInputSchema has to be the exact same schema with which
your input files to the mappers are encoded.

Actually the schema you provide is used as the "reader" schema and need not exactly match the schema used to write the data.  The schema used when writing is stored in the data files.

Oh yes — sorry, I misread the code. (I overlooked the fact that DataFileStream.initialize mutates the reader to use the writer's schema.)

Martin

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Union in AvroMapper.map: Not in Union??

nir_zamir
In reply to this post by nir_zamir
Hi,

Update:
The issue was resolved by a patch of Avro, posted by Doug.
Thread: http://apache-avro.679487.n3.nabble.com/Parsing-a-Pair-s-value-inherited-namespace-td4026810.html
Issue: https://issues.apache.org/jira/browse/AVRO-1295

Thanks,
Nir
Loading...