diff options
author | Greg Farnum <gregf@hq.newdream.net> | 2009-07-22 20:25:04 +0200 |
---|---|---|
committer | Greg Farnum <gregf@hq.newdream.net> | 2009-07-22 20:37:01 +0200 |
commit | f205273edb92dfab384c1159aac2f37fd87893f2 (patch) | |
tree | 44cfc88fbe3cb002faaa46ff18ec10aeb810b56a | |
parent | Hadoop: CephInputStream retabbing and add seekToNewSource stub. (diff) | |
download | ceph-f205273edb92dfab384c1159aac2f37fd87893f2.tar.xz ceph-f205273edb92dfab384c1159aac2f37fd87893f2.zip |
Hadoop: CephOutputStream retabbed, and it's an OutputStream now.
-rw-r--r-- | src/client/hadoop/ceph/CephFileSystem.java | 7 | ||||
-rw-r--r-- | src/client/hadoop/ceph/CephOutputStream.java | 146 |
2 files changed, 77 insertions, 76 deletions
diff --git a/src/client/hadoop/ceph/CephFileSystem.java b/src/client/hadoop/ceph/CephFileSystem.java index c14f1aa5368..e1079bdf89f 100644 --- a/src/client/hadoop/ceph/CephFileSystem.java +++ b/src/client/hadoop/ceph/CephFileSystem.java @@ -2,12 +2,14 @@ package org.apache.hadoop.fs.ceph; import java.io.IOException; +import java.io.OutputStream; import java.net.URI; import java.util.Set; import java.util.EnumSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -153,7 +155,8 @@ public class CephFileSystem extends FileSystem { throw new IOException("append: Open for append failed on path \"" + abs_path.toString() + "\""); } - return new CephOutputStream(getConf(), clientPointer, fd); + CephOutputStream cephOStream = new CephOutputStream(getConf(), clientPointer, fd); + return new FSDataOutputStream(cephOStream); } public String getName() { @@ -403,7 +406,7 @@ public class CephFileSystem extends FileSystem { } // Step 4: create the stream - FSOutputStream cephOStream = new CephOutputStream(getConf(), clientPointer, fh); + OutputStream cephOStream = new CephOutputStream(getConf(), clientPointer, fh); //System.out.println("createRaw: opened absolute path \"" + absfilepath.toString() // + "\" for writing with fh " + fh); diff --git a/src/client/hadoop/ceph/CephOutputStream.java b/src/client/hadoop/ceph/CephOutputStream.java index 9fac5a4e3e2..254c83830b4 100644 --- a/src/client/hadoop/ceph/CephOutputStream.java +++ b/src/client/hadoop/ceph/CephOutputStream.java @@ -1,3 +1,4 @@ +// -*- mode:Java; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- package org.apache.hadoop.fs.ceph; import java.io.File; @@ -11,11 +12,10 @@ import java.util.List; import java.util.Random; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.Progressable; -class CephOutputStream extends FSDataOutputStream { +class CephOutputStream extends OutputStream { static { System.loadLibrary("hadoopcephfs"); @@ -26,7 +26,7 @@ class CephOutputStream extends FSDataOutputStream { private long fileLength; - //private FileSystemStore store; + //private FileSystemStore store; private Path path; @@ -48,9 +48,9 @@ class CephOutputStream extends FSDataOutputStream { private byte[] outBuf; - //private List<Block> blocks = new ArrayList<Block>(); + //private List<Block> blocks = new ArrayList<Block>(); - //private Block nextBlock; + //private Block nextBlock; @@ -62,7 +62,7 @@ class CephOutputStream extends FSDataOutputStream { } private int ceph_close() { return ceph_close(clientPointer, fileHandle); } private int ceph_write(byte[] buffer, int buffer_offset, int length) - { return ceph_write(clientPointer, fileHandle, buffer, buffer_offset, length); } + { return ceph_write(clientPointer, fileHandle, buffer, buffer_offset, length); } private native long ceph_seek_from_start(long client, int fh, long pos); @@ -71,33 +71,33 @@ class CephOutputStream extends FSDataOutputStream { private native int ceph_write(long client, int fh, byte[] buffer, int buffer_offset, int length); - /* public CephOutputStream(Configuration conf, FileSystemStore store, + /* public CephOutputStream(Configuration conf, FileSystemStore store, Path path, long blockSize, Progressable progress) throws IOException { - // basic pseudocode: - // call ceph_open_for_write to open the file - // store the file handle - // store the client pointer - // look up and store the block size while we're at it - // the following code's old. kill it + // basic pseudocode: + // call ceph_open_for_write to open the file + // store the file handle + // store the client pointer + // look up and store the block size while we're at it + // the following code's old. kill it - this.store = store; - this.path = path; - this.blockSize = blockSize; - this.backupFile = newBackupFile(); - this.backupStream = new FileOutputStream(backupFile); - this.bufferSize = conf.getInt("io.file.buffer.size", 4096); - this.outBuf = new byte[bufferSize]; + this.store = store; + this.path = path; + this.blockSize = blockSize; + this.backupFile = newBackupFile(); + this.backupStream = new FileOutputStream(backupFile); + this.bufferSize = conf.getInt("io.file.buffer.size", 4096); + this.outBuf = new byte[bufferSize]; - }*/ + }*/ - // The file handle + // The file handle public CephOutputStream(Configuration conf, long clientp, int fh) { - clientPointer = clientp; - fileHandle = fh; - //fileLength = flength; - closed = false; + clientPointer = clientp; + fileHandle = fh; + //fileLength = flength; + closed = false; } // possibly useful for the local copy, write later thing? @@ -108,92 +108,90 @@ class CephOutputStream extends FSDataOutputStream { return result; } - - @Override - public long getPos() throws IOException { + public long getPos() throws IOException { // change to get the position from Ceph client - return ceph_getpos(); + return ceph_getpos(); } - // writes a byte + // writes a byte @Override - public synchronized void write(int b) throws IOException { + public synchronized void write(int b) throws IOException { //System.out.println("CephOutputStream.write: writing a single byte to fd " + fileHandle); - if (closed) { - throw new IOException("CephOutputStream.write: cannot write " + - "a byte to fd " + fileHandle + ": stream closed"); - } - // Stick the byte in a buffer and write it - byte buf[] = new byte[1]; - buf[0] = (byte) b; - int result = ceph_write(buf, 0, 1); - if (1 != result) + if (closed) { + throw new IOException("CephOutputStream.write: cannot write " + + "a byte to fd " + fileHandle + ": stream closed"); + } + // Stick the byte in a buffer and write it + byte buf[] = new byte[1]; + buf[0] = (byte) b; + int result = ceph_write(buf, 0, 1); + if (1 != result) System.out.println("CephOutputStream.write: failed writing a single byte to fd " + fileHandle + ": Ceph write() result = " + result); - return; - } + return; + } @Override - public synchronized void write(byte buf[], int off, int len) throws IOException { + public synchronized void write(byte buf[], int off, int len) throws IOException { //System.out.println("CephOutputStream.write: writing " + len + // " bytes to fd " + fileHandle); // make sure stream is open if (closed) { - throw new IOException("CephOutputStream.write: cannot write " + len + - "bytes to fd " + fileHandle + ": stream closed"); + throw new IOException("CephOutputStream.write: cannot write " + len + + "bytes to fd " + fileHandle + ": stream closed"); } // sanity check if (null == buf) { - throw new NullPointerException("CephOutputStream.write: cannot write " + len + - "bytes to fd " + fileHandle + ": write buffer is null"); + throw new NullPointerException("CephOutputStream.write: cannot write " + len + + "bytes to fd " + fileHandle + ": write buffer is null"); } - // check for proper index bounds - if((off < 0) || (len < 0) || (off + len > buf.length)) { + // check for proper index bounds + if((off < 0) || (len < 0) || (off + len > buf.length)) { throw new IndexOutOfBoundsException("CephOutputStream.write: Indices out of bounds for write: " + "write length is " + len + ", buffer offset is " + off +", and buffer size is " + buf.length); - } + } - // write! - int result = ceph_write(buf, off, len); - if (result < 0) { + // write! + int result = ceph_write(buf, off, len); + if (result < 0) { throw new IOException("CephOutputStream.write: Write of " + len + - "bytes to fd " + fileHandle + " failed"); - } - if (result != len) { + "bytes to fd " + fileHandle + " failed"); + } + if (result != len) { throw new IOException("CephOutputStream.write: Write of " + len + - "bytes to fd " + fileHandle + "was incomplete: only " + "bytes to fd " + fileHandle + "was incomplete: only " + result + " of " + len + " bytes were written."); + } + return; } - return; - } @Override - public synchronized void flush() throws IOException { - if (closed) { - throw new IOException("Stream closed"); + public synchronized void flush() throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + return; } - return; - } @Override - public synchronized void close() throws IOException { - if (closed) { - throw new IOException("Stream closed"); - } + public synchronized void close() throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } - int result = ceph_close(); - if (result != 0) { + int result = ceph_close(); + if (result != 0) { throw new IOException("Close failed!"); - } + } - closed = true; + closed = true; - } + } } |