[prev in list] [next in list] [prev in thread] [next in thread]
List: avro-commits
Subject: svn commit: r1539765 - in /avro/trunk: ./ lang/java/avro/src/main/java/org/apache/avro/file/
From: cutting () apache ! org
Date: 2013-11-07 19:09:09
Message-ID: 20131107190909.A41B6238896F () eris ! apache ! org
[Download RAW message or body]
Author: cutting
Date: Thu Nov 7 19:09:09 2013
New Revision: 1539765
URL: http://svn.apache.org/r1539765
Log:
AVRO-1388. Java: Add fsync support to DataFileWriter. Contributed by Hari \
Shreedharan.
Added:
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/Syncable.java \
(with props) avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/SyncableFileOutputStream.java \
(with props) Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java
avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1539765&r1=1539764&r2=1539765&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Thu Nov 7 19:09:09 2013
@@ -6,6 +6,9 @@ Trunk (not yet released)
AVRO-975. C#: Add RPC support. (Mark Lamley via cutting)
+ AVRO-1388. Java: Add fsync support to DataFileWriter.
+ (Hari Shreedharan via cutting)
+
IMPROVEMENTS
AVRO-1355. Java: Reject schemas with duplicate field
Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java?rev=1539765&r1=1539764&r2=1539765&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java \
(original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java \
Thu Nov 7 19:09:09 2013 @@ -21,7 +21,6 @@ import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.FilterOutputStream;
import java.io.Flushable;
import java.io.IOException;
@@ -54,6 +53,8 @@ public class DataFileWriter<D> implement
private Schema schema;
private DatumWriter<D> dout;
+ private OutputStream underlyingStream;
+
private BufferedFileOutputStream out;
private BinaryEncoder vout;
@@ -125,7 +126,7 @@ public class DataFileWriter<D> implement
/** Open a new file for data matching a schema. */
public DataFileWriter<D> create(Schema schema, File file) throws IOException {
- return create(schema, new FileOutputStream(file));
+ return create(schema, new SyncableFileOutputStream(file));
}
/** Open a new file for data matching a schema. */
@@ -178,7 +179,7 @@ public class DataFileWriter<D> implement
/** Open a writer appending to an existing file. */
public DataFileWriter<D> appendTo(File file) throws IOException {
return appendTo(new SeekableFileInput(file),
- new FileOutputStream(file, true));
+ new SyncableFileOutputStream(file, true));
}
/** Open a writer appending to an existing file.
@@ -208,6 +209,7 @@ public class DataFileWriter<D> implement
}
private void init(OutputStream outs) throws IOException {
+ this.underlyingStream = outs;
this.out = new BufferedFileOutputStream(outs);
EncoderFactory efactory = new EncoderFactory();
this.vout = efactory.binaryEncoder(out, null);
@@ -409,6 +411,21 @@ public class DataFileWriter<D> implement
vout.flush();
}
+ /**
+ * If this writer was instantiated using a File or using an
+ * {@linkplain Syncable} instance, this method flushes all buffers for this
+ * writer to disk. In other cases, this method behaves exactly
+ * like {@linkplain #flush()}.
+ *
+ * @throws IOException
+ */
+ public void fSync() throws IOException {
+ flush();
+ if (underlyingStream instanceof Syncable) {
+ ((Syncable) underlyingStream).sync();
+ }
+ }
+
/** Flush and close the file. */
@Override
public void close() throws IOException {
Added: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/Syncable.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/Syncable.java?rev=1539765&view=auto
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/Syncable.java \
(added)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/Syncable.java Thu \
Nov 7 19:09:09 2013 @@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.avro.file;
+
+import java.io.IOException;
+
+public interface Syncable {
+
+ /**
+ * Sync the file to disk. On supported platforms, this method behaves like
+ * POSIX <code>fsync</code> and syncs all underlying OS buffers for this
+ * file descriptor to disk. On these platforms, if this method returns,
+ * the data written to this instance is guaranteed to be persisted on disk.
+ *
+ * @throws IOException - if an error occurred while attempting to sync the
+ * data to disk.
+ */
+ void sync() throws IOException;
+}
Propchange: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/Syncable.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/SyncableFileOutputStream.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/SyncableFileOutputStream.java?rev=1539765&view=auto
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/SyncableFileOutputStream.java \
(added)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/SyncableFileOutputStream.java \
Thu Nov 7 19:09:09 2013 @@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.avro.file;
+
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+/**
+ * An implementation of {@linkplain Syncable} which writes to a file.
+ * An instance of this class can be used with {@linkplain DataFileWriter} to
+ * guarantee that Avro Container Files are persisted to disk on supported
+ * platforms using the
+ * {@linkplain org.apache.avro.file.DataFileWriter#fSync()} method.
+ *
+ * @see FileOutputStream
+ */
+public class SyncableFileOutputStream
+ extends FileOutputStream implements Syncable {
+
+ /**
+ * Creates an instance of {@linkplain SyncableFileOutputStream} with the
+ * given name.
+ *
+ * @param name - the full file name.
+ * @throws FileNotFoundException - if the file cannot be created or opened.
+ */
+ public SyncableFileOutputStream(String name) throws FileNotFoundException {
+ super(name);
+ }
+
+ /**
+ * Creates an instance of {@linkplain SyncableFileOutputStream} using the
+ * given {@linkplain File} instance.
+ *
+ * @param file - The file to use to create the output stream.
+ *
+ * @throws FileNotFoundException - if the file cannot be created or opened.
+ */
+ public SyncableFileOutputStream(File file)
+ throws FileNotFoundException {
+ super(file);
+ }
+
+ /**
+ * Creates an instance of {@linkplain SyncableFileOutputStream} with the
+ * given name and optionally append to the file if it already exists.
+ *
+ * @param name - the full file name.
+ * @param append - true if the file is to be appended to
+ *
+ * @throws FileNotFoundException - if the file cannot be created or opened.
+ */
+ public SyncableFileOutputStream(String name, boolean append)
+ throws FileNotFoundException {
+ super(name, append);
+ }
+
+ /**
+ * Creates an instance of {@linkplain SyncableFileOutputStream}
+ * that writes to the file represented by the given {@linkplain File}
+ * instance and optionally append to the file if it already exists.
+ *
+ * @param file - the file instance to use to create the stream.
+ * @param append - true if the file is to be appended to
+ *
+ * @throws FileNotFoundException - if the file cannot be created or opened.
+ */
+ public SyncableFileOutputStream(File file, boolean append)
+ throws FileNotFoundException {
+ super(file, append);
+ }
+
+ /**
+ * Creates an instance of {@linkplain SyncableFileOutputStream}
+ * using the given {@linkplain FileDescriptor} instance.
+ */
+ public SyncableFileOutputStream(FileDescriptor fdObj) {
+ super(fdObj);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void sync() throws IOException {
+ getFD().sync();
+ }
+}
Propchange: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/SyncableFileOutputStream.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java?rev=1539765&r1=1539764&r2=1539765&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java \
(original)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java Thu Nov \
7 19:09:09 2013 @@ -35,6 +35,7 @@ import org.apache.avro.file.DataFileRead
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.SeekableFileInput;
+import org.apache.avro.file.Syncable;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
@@ -93,6 +94,8 @@ public class TestDataFile {
testSyncDiscovery();
testGenericAppend();
testReadWithHeader();
+ testFSync(false);
+ testFSync(true);
}
public void testGenericWrite() throws IOException {
@@ -315,6 +318,37 @@ public class TestDataFile {
out.flushCount >= flushCounter);
}
+ private void testFSync(boolean useFile) throws IOException {
+ DataFileWriter<Object> writer =
+ new DataFileWriter<Object>(new GenericDatumWriter<Object>());
+ writer.setFlushOnEveryBlock(false);
+ TestingByteArrayOutputStream out = new TestingByteArrayOutputStream();
+ if (useFile) {
+ File f = makeFile();
+ SeekableFileInput in = new SeekableFileInput(f);
+ writer.appendTo(in, out);
+ } else {
+ writer.create(SCHEMA, out);
+ }
+ int currentCount = 0;
+ int syncCounter = 0;
+ try {
+ for (Object datum : new RandomData(SCHEMA, COUNT, SEED+1)) {
+ currentCount++;
+ writer.append(datum);
+ if (currentCount % 10 == 0) {
+ writer.fSync();
+ syncCounter++;
+ }
+ }
+ } finally {
+ writer.close();
+ }
+ System.out.println("Total number of syncs: " + out.syncCount);
+ Assert.assertEquals(syncCounter, out.syncCount);
+ }
+
+
static void readFile(File f, DatumReader<? extends Object> datumReader)
throws IOException {
FileReader<? extends Object> reader = DataFileReader.openReader(f, datumReader);
@@ -335,13 +369,20 @@ public class TestDataFile {
System.out.println("Time: "+(System.currentTimeMillis()-start));
}
- private class TestingByteArrayOutputStream extends ByteArrayOutputStream {
+ private class TestingByteArrayOutputStream extends ByteArrayOutputStream
+ implements Syncable {
private int flushCount = 0;
+ private int syncCount = 0;
@Override
public void flush() throws IOException {
super.flush();
flushCount++;
}
+
+ @Override
+ public void sync() throws IOException {
+ syncCount++;
+ }
}
}
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic