[prev in list] [next in list] [prev in thread] [next in thread] 

List:       avro-commits
Subject:    svn commit: r1350810 - in /avro/trunk: ./ lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/
From:       cutting () apache ! org
Date:       2012-06-15 21:29:13
Message-ID: 20120615212913.B916123889BB () eris ! apache ! org
[Download RAW message or body]

Author: cutting
Date: Fri Jun 15 21:29:12 2012
New Revision: 1350810

URL: http://svn.apache.org/viewvc?rev=1350810&view=rev
Log:
AVRO-1108. Java: Add support for reflect API to newer mapreduce API.

Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDeserializer.java
  avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroKeyComparator.java
  avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerializer.java
  avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java
  avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroRecordReaderBase.java
  avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestWordCount.java


Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1350810&r1=1350809&r2=1350810&view=diff
 ==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Jun 15 21:29:12 2012
@@ -10,6 +10,9 @@ Avro 1.7.1 (unreleased)
     AVRO-1112. Java: Add support for Snappy codec to newer mapreduce API.
     (Matt Mead via cutting)
 
+    AVRO-1108. Java: Add support for reflect API to newer mapreduce API.
+    (cutting)
+
   IMPROVEMENTS
 
   BUG FIXES

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDeserializer.java
                
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache \
/avro/hadoop/io/AvroDeserializer.java?rev=1350810&r1=1350809&r2=1350810&view=diff \
                ==============================================================================
                
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDeserializer.java \
                (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDeserializer.java \
Fri Jun 15 21:29:12 2012 @@ -26,7 +26,7 @@ import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.mapred.AvroWrapper;
-import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.hadoop.io.serializer.Deserializer;
 
 /**
@@ -66,7 +66,7 @@ public abstract class AvroDeserializer<T
   protected AvroDeserializer(Schema writerSchema, Schema readerSchema) {
     mWriterSchema = writerSchema;
     mReaderSchema = null != readerSchema ? readerSchema : writerSchema;
-    mAvroDatumReader = new SpecificDatumReader<D>(mWriterSchema, mReaderSchema);
+    mAvroDatumReader = new ReflectDatumReader<D>(mWriterSchema, mReaderSchema);
   }
 
   /**

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroKeyComparator.java
                
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache \
/avro/hadoop/io/AvroKeyComparator.java?rev=1350810&r1=1350809&r2=1350810&view=diff \
                ==============================================================================
                
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroKeyComparator.java \
                (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroKeyComparator.java \
Fri Jun 15 21:29:12 2012 @@ -22,7 +22,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.io.BinaryData;
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.specific.SpecificData;
+import org.apache.avro.reflect.ReflectData;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.RawComparator;
@@ -57,6 +57,6 @@ public class AvroKeyComparator<T> extend
   /** {@inheritDoc} */
   @Override
   public int compare(AvroKey<T> x, AvroKey<T> y) {
-    return SpecificData.get().compare(x.datum(), y.datum(), mSchema);
+    return ReflectData.get().compare(x.datum(), y.datum(), mSchema);
   }
 }

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerializer.java
                
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerializer.java?rev=1350810&r1=1350809&r2=1350810&view=diff
 ==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerializer.java \
                (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerializer.java \
Fri Jun 15 21:29:12 2012 @@ -26,7 +26,7 @@ import org.apache.avro.io.EncoderFactory
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.mapred.AvroWrapper;
-import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.hadoop.io.serializer.Serializer;
 
 /**
@@ -79,7 +79,7 @@ public class AvroSerializer<T> implement
       throw new IllegalArgumentException("Writer schema may not be null");
     }
     mWriterSchema = writerSchema;
-    mAvroDatumWriter = new SpecificDatumWriter<T>(writerSchema);
+    mAvroDatumWriter = new ReflectDatumWriter<T>(writerSchema);
   }
 
   /**

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java
                
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache \
/avro/mapreduce/AvroKeyRecordWriter.java?rev=1350810&r1=1350809&r2=1350810&view=diff \
                ==============================================================================
                
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java \
                (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java \
Fri Jun 15 21:29:12 2012 @@ -25,7 +25,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -50,7 +50,7 @@ public class AvroKeyRecordWriter<T> exte
   public AvroKeyRecordWriter(Schema writerSchema, CodecFactory compressionCodec,
       OutputStream outputStream) throws IOException {
     // Create an Avro container file and a writer to it.
-    mAvroFileWriter = new DataFileWriter<T>(new \
SpecificDatumWriter<T>(writerSchema)); +    mAvroFileWriter = new \
DataFileWriter<T>(new ReflectDatumWriter<T>(writerSchema));  \
mAvroFileWriter.setCodec(compressionCodec);  mAvroFileWriter.create(writerSchema, \
outputStream);  }

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroRecordReaderBase.java
                
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache \
/avro/mapreduce/AvroRecordReaderBase.java?rev=1350810&r1=1350809&r2=1350810&view=diff \
                ==============================================================================
                
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroRecordReaderBase.java \
                (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroRecordReaderBase.java \
Fri Jun 15 21:29:12 2012 @@ -25,7 +25,7 @@ import org.apache.avro.file.DataFileRead
 import org.apache.avro.file.SeekableInput;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.mapred.FsInput;
-import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -88,7 +88,7 @@ public abstract class AvroRecordReaderBa
 
     // Wrap the seekable input stream in an Avro DataFileReader.
     mAvroFileReader = createAvroFileReader(seekableFileInput,
-        new SpecificDatumReader<T>(mReaderSchema));
+        new ReflectDatumReader<T>(mReaderSchema));
 
     // Initialize the start and end offsets into the file based on the boundaries of \
                the
     // input split we're responsible for.  We will read the first block that begins

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestWordCount.java
                
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestWordCount.java?rev=1350810&r1=1350809&r2=1350810&view=diff
 ==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestWordCount.java \
                (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestWordCount.java \
Fri Jun 15 21:29:12 2012 @@ -29,6 +29,8 @@ import org.apache.avro.generic.GenericDa
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapred.FsInput;
 import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -57,6 +59,18 @@ public class TestWordCount {
           + "\"fields\":[{\"name\":\"count\",\"type\":\"int\"},"
           + "{\"name\":\"name\",\"type\":\"string\"}]}");
 
+  public static class ReflectStats {
+    String name;
+    int count;
+  }
+
+  // permit data written as SpecficStats to be read as ReflectStats
+  private static Schema REFLECT_STATS_SCHEMA
+    = ReflectData.get().getSchema(ReflectStats.class);
+  static {
+    REFLECT_STATS_SCHEMA.addAlias(TextStats.SCHEMA$.getFullName());
+  }
+
   private static class LineCountMapper extends Mapper<LongWritable, Text, Text, \
IntWritable> {  private IntWritable mOne;
 
@@ -92,6 +106,26 @@ public class TestWordCount {
     }
   }
 
+  private static class ReflectCountMapper
+      extends Mapper<AvroKey<ReflectStats>, NullWritable, Text, IntWritable> {
+    private IntWritable mCount;
+    private Text mText;
+
+    @Override
+    protected void setup(Context context) {
+      mCount = new IntWritable(0);
+      mText = new Text("");
+    }
+
+    @Override
+    protected void map(AvroKey<ReflectStats> record, NullWritable ignore, Context \
context) +        throws IOException, InterruptedException {
+      mCount.set(record.datum().count);
+      mText.set(record.datum().name);
+      context.write(mText, mCount);
+    }
+  }
+
   private static class GenericStatsReducer
       extends Reducer<Text, IntWritable, AvroKey<GenericData.Record>, NullWritable> \
{  private AvroKey<GenericData.Record> mStats;
@@ -139,6 +173,29 @@ public class TestWordCount {
     }
   }
 
+  private static class ReflectStatsReducer
+      extends Reducer<Text, IntWritable, AvroKey<ReflectStats>, NullWritable> {
+    private AvroKey<ReflectStats> mStats;
+
+    @Override
+    protected void setup(Context context) {
+      mStats = new AvroKey<ReflectStats>(null);
+    }
+
+    @Override
+    protected void reduce(Text line, Iterable<IntWritable> counts, Context context)
+        throws IOException, InterruptedException {
+      ReflectStats record = new ReflectStats();
+      record.count = 0;
+      for (IntWritable count : counts) {
+        record.count += count.get();
+      }
+      record.name = line.toString();
+      mStats.datum(record);
+      context.write(mStats, NullWritable.get());
+    }
+  }
+
   private static class SortMapper
       extends Mapper<AvroKey<TextStats>, NullWritable, AvroKey<TextStats>, \
NullWritable> {  @Override
@@ -238,6 +295,46 @@ public class TestWordCount {
   }
 
   @Test
+  public void testAvroReflectOutput() throws Exception {
+    Job job = new Job();
+
+    FileInputFormat.setInputPaths(job, new Path(getClass()
+            .getResource("/org/apache/avro/mapreduce/mapreduce-test-input.txt")
+            .toURI().toString()));
+    job.setInputFormatClass(TextInputFormat.class);
+
+    job.setMapperClass(LineCountMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(IntWritable.class);
+
+    job.setReducerClass(ReflectStatsReducer.class);
+    AvroJob.setOutputKeySchema(job, REFLECT_STATS_SCHEMA);
+
+    job.setOutputFormatClass(AvroKeyOutputFormat.class);
+    Path outputPath = new Path(tmpFolder.getRoot().getPath() + "/out-reflect");
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    Assert.assertTrue(job.waitForCompletion(true));
+
+    // Check that the results from the MapReduce were as expected.
+    FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+    FileStatus[] outputFiles = fileSystem.globStatus(outputPath.suffix("/part-*"));
+    Assert.assertEquals(1, outputFiles.length);
+    DataFileReader<ReflectStats> reader = new DataFileReader<ReflectStats>(
+        new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+        new ReflectDatumReader<ReflectStats>());
+    Map<String, Integer> counts = new HashMap<String, Integer>();
+    for (ReflectStats record : reader) {
+      counts.put(record.name.toString(), record.count);
+    }
+    reader.close();
+
+    Assert.assertEquals(3, counts.get("apple").intValue());
+    Assert.assertEquals(2, counts.get("banana").intValue());
+    Assert.assertEquals(1, counts.get("carrot").intValue());
+  }
+
+  @Test
   public void testAvroInput() throws Exception {
     Job job = new Job();
 
@@ -279,6 +376,46 @@ public class TestWordCount {
   }
 
   @Test
+  public void testReflectInput() throws Exception {
+    Job job = new Job();
+    FileInputFormat.setInputPaths(job, new Path(getClass()
+            .getResource("/org/apache/avro/mapreduce/mapreduce-test-input.avro")
+            .toURI().toString()));
+    job.setInputFormatClass(AvroKeyInputFormat.class);
+    AvroJob.setInputKeySchema(job, REFLECT_STATS_SCHEMA);
+
+    job.setMapperClass(ReflectCountMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(IntWritable.class);
+
+    job.setReducerClass(ReflectStatsReducer.class);
+    AvroJob.setOutputKeySchema(job, REFLECT_STATS_SCHEMA);
+
+    job.setOutputFormatClass(AvroKeyOutputFormat.class);
+    Path outputPath = new Path(tmpFolder.getRoot().getPath() + \
"/out-reflect-input"); +    FileOutputFormat.setOutputPath(job, outputPath);
+
+    Assert.assertTrue(job.waitForCompletion(true));
+
+    // Check that the results from the MapReduce were as expected.
+    FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+    FileStatus[] outputFiles = fileSystem.globStatus(outputPath.suffix("/part-*"));
+    Assert.assertEquals(1, outputFiles.length);
+    DataFileReader<ReflectStats> reader = new DataFileReader<ReflectStats>(
+        new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+        new ReflectDatumReader<ReflectStats>());
+    Map<String, Integer> counts = new HashMap<String, Integer>();
+    for (ReflectStats record : reader) {
+      counts.put(record.name.toString(), record.count);
+    }
+    reader.close();
+
+    Assert.assertEquals(3, counts.get("apple").intValue());
+    Assert.assertEquals(2, counts.get("banana").intValue());
+    Assert.assertEquals(1, counts.get("carrot").intValue());
+  }
+
+  @Test
   public void testAvroMapOutput() throws Exception {
     Job job = new Job();
 


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic