[prev in list] [next in list] [prev in thread] [next in thread]
List: avro-commits
Subject: svn commit: r1350810 - in /avro/trunk: ./ lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/
From: cutting () apache ! org
Date: 2012-06-15 21:29:13
Message-ID: 20120615212913.B916123889BB () eris ! apache ! org
[Download RAW message or body]
Author: cutting
Date: Fri Jun 15 21:29:12 2012
New Revision: 1350810
URL: http://svn.apache.org/viewvc?rev=1350810&view=rev
Log:
AVRO-1108. Java: Add support for reflect API to newer mapreduce API.
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDeserializer.java
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroKeyComparator.java
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerializer.java
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroRecordReaderBase.java
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestWordCount.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1350810&r1=1350809&r2=1350810&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Jun 15 21:29:12 2012
@@ -10,6 +10,9 @@ Avro 1.7.1 (unreleased)
AVRO-1112. Java: Add support for Snappy codec to newer mapreduce API.
(Matt Mead via cutting)
+ AVRO-1108. Java: Add support for reflect API to newer mapreduce API.
+ (cutting)
+
IMPROVEMENTS
BUG FIXES
Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDeserializer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache \
/avro/hadoop/io/AvroDeserializer.java?rev=1350810&r1=1350809&r2=1350810&view=diff \
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDeserializer.java \
(original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroDeserializer.java \
Fri Jun 15 21:29:12 2012 @@ -26,7 +26,7 @@ import org.apache.avro.io.DatumReader;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.mapred.AvroWrapper;
-import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.hadoop.io.serializer.Deserializer;
/**
@@ -66,7 +66,7 @@ public abstract class AvroDeserializer<T
protected AvroDeserializer(Schema writerSchema, Schema readerSchema) {
mWriterSchema = writerSchema;
mReaderSchema = null != readerSchema ? readerSchema : writerSchema;
- mAvroDatumReader = new SpecificDatumReader<D>(mWriterSchema, mReaderSchema);
+ mAvroDatumReader = new ReflectDatumReader<D>(mWriterSchema, mReaderSchema);
}
/**
Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroKeyComparator.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache \
/avro/hadoop/io/AvroKeyComparator.java?rev=1350810&r1=1350809&r2=1350810&view=diff \
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroKeyComparator.java \
(original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroKeyComparator.java \
Fri Jun 15 21:29:12 2012 @@ -22,7 +22,7 @@ import org.apache.avro.Schema;
import org.apache.avro.io.BinaryData;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
-import org.apache.avro.specific.SpecificData;
+import org.apache.avro.reflect.ReflectData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.RawComparator;
@@ -57,6 +57,6 @@ public class AvroKeyComparator<T> extend
/** {@inheritDoc} */
@Override
public int compare(AvroKey<T> x, AvroKey<T> y) {
- return SpecificData.get().compare(x.datum(), y.datum(), mSchema);
+ return ReflectData.get().compare(x.datum(), y.datum(), mSchema);
}
}
Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerializer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerializer.java?rev=1350810&r1=1350809&r2=1350810&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerializer.java \
(original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/AvroSerializer.java \
Fri Jun 15 21:29:12 2012 @@ -26,7 +26,7 @@ import org.apache.avro.io.EncoderFactory
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.mapred.AvroWrapper;
-import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.hadoop.io.serializer.Serializer;
/**
@@ -79,7 +79,7 @@ public class AvroSerializer<T> implement
throw new IllegalArgumentException("Writer schema may not be null");
}
mWriterSchema = writerSchema;
- mAvroDatumWriter = new SpecificDatumWriter<T>(writerSchema);
+ mAvroDatumWriter = new ReflectDatumWriter<T>(writerSchema);
}
/**
Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache \
/avro/mapreduce/AvroKeyRecordWriter.java?rev=1350810&r1=1350809&r2=1350810&view=diff \
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java \
(original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyRecordWriter.java \
Fri Jun 15 21:29:12 2012 @@ -25,7 +25,7 @@ import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -50,7 +50,7 @@ public class AvroKeyRecordWriter<T> exte
public AvroKeyRecordWriter(Schema writerSchema, CodecFactory compressionCodec,
OutputStream outputStream) throws IOException {
// Create an Avro container file and a writer to it.
- mAvroFileWriter = new DataFileWriter<T>(new \
SpecificDatumWriter<T>(writerSchema)); + mAvroFileWriter = new \
DataFileWriter<T>(new ReflectDatumWriter<T>(writerSchema)); \
mAvroFileWriter.setCodec(compressionCodec); mAvroFileWriter.create(writerSchema, \
outputStream); }
Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroRecordReaderBase.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache \
/avro/mapreduce/AvroRecordReaderBase.java?rev=1350810&r1=1350809&r2=1350810&view=diff \
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroRecordReaderBase.java \
(original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroRecordReaderBase.java \
Fri Jun 15 21:29:12 2012 @@ -25,7 +25,7 @@ import org.apache.avro.file.DataFileRead
import org.apache.avro.file.SeekableInput;
import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.FsInput;
-import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -88,7 +88,7 @@ public abstract class AvroRecordReaderBa
// Wrap the seekable input stream in an Avro DataFileReader.
mAvroFileReader = createAvroFileReader(seekableFileInput,
- new SpecificDatumReader<T>(mReaderSchema));
+ new ReflectDatumReader<T>(mReaderSchema));
// Initialize the start and end offsets into the file based on the boundaries of \
the
// input split we're responsible for. We will read the first block that begins
Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestWordCount.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestWordCount.java?rev=1350810&r1=1350809&r2=1350810&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestWordCount.java \
(original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestWordCount.java \
Fri Jun 15 21:29:12 2012 @@ -29,6 +29,8 @@ import org.apache.avro.generic.GenericDa
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.FsInput;
import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -57,6 +59,18 @@ public class TestWordCount {
+ "\"fields\":[{\"name\":\"count\",\"type\":\"int\"},"
+ "{\"name\":\"name\",\"type\":\"string\"}]}");
+ public static class ReflectStats {
+ String name;
+ int count;
+ }
+
+ // permit data written as SpecficStats to be read as ReflectStats
+ private static Schema REFLECT_STATS_SCHEMA
+ = ReflectData.get().getSchema(ReflectStats.class);
+ static {
+ REFLECT_STATS_SCHEMA.addAlias(TextStats.SCHEMA$.getFullName());
+ }
+
private static class LineCountMapper extends Mapper<LongWritable, Text, Text, \
IntWritable> { private IntWritable mOne;
@@ -92,6 +106,26 @@ public class TestWordCount {
}
}
+ private static class ReflectCountMapper
+ extends Mapper<AvroKey<ReflectStats>, NullWritable, Text, IntWritable> {
+ private IntWritable mCount;
+ private Text mText;
+
+ @Override
+ protected void setup(Context context) {
+ mCount = new IntWritable(0);
+ mText = new Text("");
+ }
+
+ @Override
+ protected void map(AvroKey<ReflectStats> record, NullWritable ignore, Context \
context) + throws IOException, InterruptedException {
+ mCount.set(record.datum().count);
+ mText.set(record.datum().name);
+ context.write(mText, mCount);
+ }
+ }
+
private static class GenericStatsReducer
extends Reducer<Text, IntWritable, AvroKey<GenericData.Record>, NullWritable> \
{ private AvroKey<GenericData.Record> mStats;
@@ -139,6 +173,29 @@ public class TestWordCount {
}
}
+ private static class ReflectStatsReducer
+ extends Reducer<Text, IntWritable, AvroKey<ReflectStats>, NullWritable> {
+ private AvroKey<ReflectStats> mStats;
+
+ @Override
+ protected void setup(Context context) {
+ mStats = new AvroKey<ReflectStats>(null);
+ }
+
+ @Override
+ protected void reduce(Text line, Iterable<IntWritable> counts, Context context)
+ throws IOException, InterruptedException {
+ ReflectStats record = new ReflectStats();
+ record.count = 0;
+ for (IntWritable count : counts) {
+ record.count += count.get();
+ }
+ record.name = line.toString();
+ mStats.datum(record);
+ context.write(mStats, NullWritable.get());
+ }
+ }
+
private static class SortMapper
extends Mapper<AvroKey<TextStats>, NullWritable, AvroKey<TextStats>, \
NullWritable> { @Override
@@ -238,6 +295,46 @@ public class TestWordCount {
}
@Test
+ public void testAvroReflectOutput() throws Exception {
+ Job job = new Job();
+
+ FileInputFormat.setInputPaths(job, new Path(getClass()
+ .getResource("/org/apache/avro/mapreduce/mapreduce-test-input.txt")
+ .toURI().toString()));
+ job.setInputFormatClass(TextInputFormat.class);
+
+ job.setMapperClass(LineCountMapper.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(IntWritable.class);
+
+ job.setReducerClass(ReflectStatsReducer.class);
+ AvroJob.setOutputKeySchema(job, REFLECT_STATS_SCHEMA);
+
+ job.setOutputFormatClass(AvroKeyOutputFormat.class);
+ Path outputPath = new Path(tmpFolder.getRoot().getPath() + "/out-reflect");
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ Assert.assertTrue(job.waitForCompletion(true));
+
+ // Check that the results from the MapReduce were as expected.
+ FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+ FileStatus[] outputFiles = fileSystem.globStatus(outputPath.suffix("/part-*"));
+ Assert.assertEquals(1, outputFiles.length);
+ DataFileReader<ReflectStats> reader = new DataFileReader<ReflectStats>(
+ new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+ new ReflectDatumReader<ReflectStats>());
+ Map<String, Integer> counts = new HashMap<String, Integer>();
+ for (ReflectStats record : reader) {
+ counts.put(record.name.toString(), record.count);
+ }
+ reader.close();
+
+ Assert.assertEquals(3, counts.get("apple").intValue());
+ Assert.assertEquals(2, counts.get("banana").intValue());
+ Assert.assertEquals(1, counts.get("carrot").intValue());
+ }
+
+ @Test
public void testAvroInput() throws Exception {
Job job = new Job();
@@ -279,6 +376,46 @@ public class TestWordCount {
}
@Test
+ public void testReflectInput() throws Exception {
+ Job job = new Job();
+ FileInputFormat.setInputPaths(job, new Path(getClass()
+ .getResource("/org/apache/avro/mapreduce/mapreduce-test-input.avro")
+ .toURI().toString()));
+ job.setInputFormatClass(AvroKeyInputFormat.class);
+ AvroJob.setInputKeySchema(job, REFLECT_STATS_SCHEMA);
+
+ job.setMapperClass(ReflectCountMapper.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(IntWritable.class);
+
+ job.setReducerClass(ReflectStatsReducer.class);
+ AvroJob.setOutputKeySchema(job, REFLECT_STATS_SCHEMA);
+
+ job.setOutputFormatClass(AvroKeyOutputFormat.class);
+ Path outputPath = new Path(tmpFolder.getRoot().getPath() + \
"/out-reflect-input"); + FileOutputFormat.setOutputPath(job, outputPath);
+
+ Assert.assertTrue(job.waitForCompletion(true));
+
+ // Check that the results from the MapReduce were as expected.
+ FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+ FileStatus[] outputFiles = fileSystem.globStatus(outputPath.suffix("/part-*"));
+ Assert.assertEquals(1, outputFiles.length);
+ DataFileReader<ReflectStats> reader = new DataFileReader<ReflectStats>(
+ new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+ new ReflectDatumReader<ReflectStats>());
+ Map<String, Integer> counts = new HashMap<String, Integer>();
+ for (ReflectStats record : reader) {
+ counts.put(record.name.toString(), record.count);
+ }
+ reader.close();
+
+ Assert.assertEquals(3, counts.get("apple").intValue());
+ Assert.assertEquals(2, counts.get("banana").intValue());
+ Assert.assertEquals(1, counts.get("carrot").intValue());
+ }
+
+ @Test
public void testAvroMapOutput() throws Exception {
Job job = new Job();
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic