summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGreg Farnum <gregf@hq.newdream.net>2009-07-22 20:25:04 +0200
committerGreg Farnum <gregf@hq.newdream.net>2009-07-22 20:37:01 +0200
commitf205273edb92dfab384c1159aac2f37fd87893f2 (patch)
tree44cfc88fbe3cb002faaa46ff18ec10aeb810b56a
parentHadoop: CephInputStream retabbing and add seekToNewSource stub. (diff)
downloadceph-f205273edb92dfab384c1159aac2f37fd87893f2.tar.xz
ceph-f205273edb92dfab384c1159aac2f37fd87893f2.zip
Hadoop: CephOutputStream retabbed, and it's an OutputStream now.
-rw-r--r--src/client/hadoop/ceph/CephFileSystem.java7
-rw-r--r--src/client/hadoop/ceph/CephOutputStream.java146
2 files changed, 77 insertions, 76 deletions
diff --git a/src/client/hadoop/ceph/CephFileSystem.java b/src/client/hadoop/ceph/CephFileSystem.java
index c14f1aa5368..e1079bdf89f 100644
--- a/src/client/hadoop/ceph/CephFileSystem.java
+++ b/src/client/hadoop/ceph/CephFileSystem.java
@@ -2,12 +2,14 @@
package org.apache.hadoop.fs.ceph;
import java.io.IOException;
+import java.io.OutputStream;
import java.net.URI;
import java.util.Set;
import java.util.EnumSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
@@ -153,7 +155,8 @@ public class CephFileSystem extends FileSystem {
throw new IOException("append: Open for append failed on path \"" +
abs_path.toString() + "\"");
}
- return new CephOutputStream(getConf(), clientPointer, fd);
+ CephOutputStream cephOStream = new CephOutputStream(getConf(), clientPointer, fd);
+ return new FSDataOutputStream(cephOStream);
}
public String getName() {
@@ -403,7 +406,7 @@ public class CephFileSystem extends FileSystem {
}
// Step 4: create the stream
- FSOutputStream cephOStream = new CephOutputStream(getConf(), clientPointer, fh);
+ OutputStream cephOStream = new CephOutputStream(getConf(), clientPointer, fh);
//System.out.println("createRaw: opened absolute path \"" + absfilepath.toString()
// + "\" for writing with fh " + fh);
diff --git a/src/client/hadoop/ceph/CephOutputStream.java b/src/client/hadoop/ceph/CephOutputStream.java
index 9fac5a4e3e2..254c83830b4 100644
--- a/src/client/hadoop/ceph/CephOutputStream.java
+++ b/src/client/hadoop/ceph/CephOutputStream.java
@@ -1,3 +1,4 @@
+// -*- mode:Java; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
package org.apache.hadoop.fs.ceph;
import java.io.File;
@@ -11,11 +12,10 @@ import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Progressable;
-class CephOutputStream extends FSDataOutputStream {
+class CephOutputStream extends OutputStream {
static {
System.loadLibrary("hadoopcephfs");
@@ -26,7 +26,7 @@ class CephOutputStream extends FSDataOutputStream {
private long fileLength;
- //private FileSystemStore store;
+ //private FileSystemStore store;
private Path path;
@@ -48,9 +48,9 @@ class CephOutputStream extends FSDataOutputStream {
private byte[] outBuf;
- //private List<Block> blocks = new ArrayList<Block>();
+ //private List<Block> blocks = new ArrayList<Block>();
- //private Block nextBlock;
+ //private Block nextBlock;
@@ -62,7 +62,7 @@ class CephOutputStream extends FSDataOutputStream {
}
private int ceph_close() { return ceph_close(clientPointer, fileHandle); }
private int ceph_write(byte[] buffer, int buffer_offset, int length)
- { return ceph_write(clientPointer, fileHandle, buffer, buffer_offset, length); }
+ { return ceph_write(clientPointer, fileHandle, buffer, buffer_offset, length); }
private native long ceph_seek_from_start(long client, int fh, long pos);
@@ -71,33 +71,33 @@ class CephOutputStream extends FSDataOutputStream {
private native int ceph_write(long client, int fh, byte[] buffer, int buffer_offset, int length);
- /* public CephOutputStream(Configuration conf, FileSystemStore store,
+ /* public CephOutputStream(Configuration conf, FileSystemStore store,
Path path, long blockSize, Progressable progress) throws IOException {
- // basic pseudocode:
- // call ceph_open_for_write to open the file
- // store the file handle
- // store the client pointer
- // look up and store the block size while we're at it
- // the following code's old. kill it
+ // basic pseudocode:
+ // call ceph_open_for_write to open the file
+ // store the file handle
+ // store the client pointer
+ // look up and store the block size while we're at it
+ // the following code's old. kill it
- this.store = store;
- this.path = path;
- this.blockSize = blockSize;
- this.backupFile = newBackupFile();
- this.backupStream = new FileOutputStream(backupFile);
- this.bufferSize = conf.getInt("io.file.buffer.size", 4096);
- this.outBuf = new byte[bufferSize];
+ this.store = store;
+ this.path = path;
+ this.blockSize = blockSize;
+ this.backupFile = newBackupFile();
+ this.backupStream = new FileOutputStream(backupFile);
+ this.bufferSize = conf.getInt("io.file.buffer.size", 4096);
+ this.outBuf = new byte[bufferSize];
- }*/
+ }*/
- // The file handle
+ // The file handle
public CephOutputStream(Configuration conf, long clientp, int fh) {
- clientPointer = clientp;
- fileHandle = fh;
- //fileLength = flength;
- closed = false;
+ clientPointer = clientp;
+ fileHandle = fh;
+ //fileLength = flength;
+ closed = false;
}
// possibly useful for the local copy, write later thing?
@@ -108,92 +108,90 @@ class CephOutputStream extends FSDataOutputStream {
return result;
}
-
- @Override
- public long getPos() throws IOException {
+ public long getPos() throws IOException {
// change to get the position from Ceph client
- return ceph_getpos();
+ return ceph_getpos();
}
- // writes a byte
+ // writes a byte
@Override
- public synchronized void write(int b) throws IOException {
+ public synchronized void write(int b) throws IOException {
//System.out.println("CephOutputStream.write: writing a single byte to fd " + fileHandle);
- if (closed) {
- throw new IOException("CephOutputStream.write: cannot write " +
- "a byte to fd " + fileHandle + ": stream closed");
- }
- // Stick the byte in a buffer and write it
- byte buf[] = new byte[1];
- buf[0] = (byte) b;
- int result = ceph_write(buf, 0, 1);
- if (1 != result)
+ if (closed) {
+ throw new IOException("CephOutputStream.write: cannot write " +
+ "a byte to fd " + fileHandle + ": stream closed");
+ }
+ // Stick the byte in a buffer and write it
+ byte buf[] = new byte[1];
+ buf[0] = (byte) b;
+ int result = ceph_write(buf, 0, 1);
+ if (1 != result)
System.out.println("CephOutputStream.write: failed writing a single byte to fd "
+ fileHandle + ": Ceph write() result = " + result);
- return;
- }
+ return;
+ }
@Override
- public synchronized void write(byte buf[], int off, int len) throws IOException {
+ public synchronized void write(byte buf[], int off, int len) throws IOException {
//System.out.println("CephOutputStream.write: writing " + len +
// " bytes to fd " + fileHandle);
// make sure stream is open
if (closed) {
- throw new IOException("CephOutputStream.write: cannot write " + len +
- "bytes to fd " + fileHandle + ": stream closed");
+ throw new IOException("CephOutputStream.write: cannot write " + len +
+ "bytes to fd " + fileHandle + ": stream closed");
}
// sanity check
if (null == buf) {
- throw new NullPointerException("CephOutputStream.write: cannot write " + len +
- "bytes to fd " + fileHandle + ": write buffer is null");
+ throw new NullPointerException("CephOutputStream.write: cannot write " + len +
+ "bytes to fd " + fileHandle + ": write buffer is null");
}
- // check for proper index bounds
- if((off < 0) || (len < 0) || (off + len > buf.length)) {
+ // check for proper index bounds
+ if((off < 0) || (len < 0) || (off + len > buf.length)) {
throw new IndexOutOfBoundsException("CephOutputStream.write: Indices out of bounds for write: "
+ "write length is " + len + ", buffer offset is "
+ off +", and buffer size is " + buf.length);
- }
+ }
- // write!
- int result = ceph_write(buf, off, len);
- if (result < 0) {
+ // write!
+ int result = ceph_write(buf, off, len);
+ if (result < 0) {
throw new IOException("CephOutputStream.write: Write of " + len +
- "bytes to fd " + fileHandle + " failed");
- }
- if (result != len) {
+ "bytes to fd " + fileHandle + " failed");
+ }
+ if (result != len) {
throw new IOException("CephOutputStream.write: Write of " + len +
- "bytes to fd " + fileHandle + "was incomplete: only "
+ "bytes to fd " + fileHandle + "was incomplete: only "
+ result + " of " + len + " bytes were written.");
+ }
+ return;
}
- return;
- }
@Override
- public synchronized void flush() throws IOException {
- if (closed) {
- throw new IOException("Stream closed");
+ public synchronized void flush() throws IOException {
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+ return;
}
- return;
- }
@Override
- public synchronized void close() throws IOException {
- if (closed) {
- throw new IOException("Stream closed");
- }
+ public synchronized void close() throws IOException {
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
- int result = ceph_close();
- if (result != 0) {
+ int result = ceph_close();
+ if (result != 0) {
throw new IOException("Close failed!");
- }
+ }
- closed = true;
+ closed = true;
- }
+ }
}