summaryrefslogtreecommitdiffstats
path: root/src/compressor
diff options
context:
space:
mode:
authorHaomai Wang <haomaiwang@gmail.com>2015-03-20 05:14:02 +0100
committerHaomai Wang <haomaiwang@gmail.com>2015-07-01 15:12:08 +0200
commit8e48ba16c7e96f5164910267c2162052bbfed702 (patch)
treeabf411152c4af0334c46ff98089cb0f27ce53d4f /src/compressor
parentCompressor: Add compressor infrastructure for ceph (diff)
downloadceph-8e48ba16c7e96f5164910267c2162052bbfed702.tar.xz
ceph-8e48ba16c7e96f5164910267c2162052bbfed702.zip
Compressor: add decompress failed codes
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
Diffstat (limited to 'src/compressor')
-rw-r--r--src/compressor/AsyncCompressor.cc28
-rw-r--r--src/compressor/AsyncCompressor.h21
-rw-r--r--src/compressor/SnappyCompressor.h1
3 files changed, 36 insertions, 14 deletions
diff --git a/src/compressor/AsyncCompressor.cc b/src/compressor/AsyncCompressor.cc
index d9b52b06ac5..e2ca2fe48bb 100644
--- a/src/compressor/AsyncCompressor.cc
+++ b/src/compressor/AsyncCompressor.cc
@@ -140,17 +140,27 @@ int AsyncCompressor::get_compress_data(uint64_t compress_id, bufferlist &data, b
ldout(cct, 10) << __func__ << " missing to get compress job id=" << compress_id << dendl;
return -ENOENT;
}
+ int status;
retry:
- if (it->second.status.read() == DONE) {
+ status = it->second.status.read();
+ if (status == DONE) {
ldout(cct, 20) << __func__ << " successfully getting compressed data, job id=" << compress_id << dendl;
*finished = true;
data.swap(it->second.data);
jobs.erase(it);
+ } else if (status == ERROR) {
+ ldout(cct, 20) << __func__ << " compressed data failed, job id=" << compress_id << dendl;
+ jobs.erase(it);
+ return -EIO;
} else if (blocking) {
if (it->second.status.cas(WAIT, DONE)) {
ldout(cct, 10) << __func__ << " compress job id=" << compress_id << " hasn't finished, abort!"<< dendl;
- compressor->compress(it->second.data, data);
+ if (compressor->compress(it->second.data, data)) {
+ ldout(cct, 1) << __func__ << " compress job id=" << compress_id << " failed!"<< dendl;
+ it->second.status.set(ERROR);
+ return -EIO;
+ }
*finished = true;
} else {
job_lock.Unlock();
@@ -174,17 +184,27 @@ int AsyncCompressor::get_decompress_data(uint64_t decompress_id, bufferlist &dat
ldout(cct, 10) << __func__ << " missing to get decompress job id=" << decompress_id << dendl;
return -ENOENT;
}
+ int status;
retry:
- if (it->second.status.read() == DONE) {
+ status = it->second.status.read();
+ if (status == DONE) {
ldout(cct, 20) << __func__ << " successfully getting decompressed data, job id=" << decompress_id << dendl;
*finished = true;
data.swap(it->second.data);
jobs.erase(it);
+ } else if (status == ERROR) {
+ ldout(cct, 20) << __func__ << " compressed data failed, job id=" << decompress_id << dendl;
+ jobs.erase(it);
+ return -EIO;
} else if (blocking) {
if (it->second.status.cas(WAIT, DONE)) {
ldout(cct, 10) << __func__ << " decompress job id=" << decompress_id << " hasn't started, abort!"<< dendl;
- compressor->decompress(it->second.data, data);
+ if (compressor->decompress(it->second.data, data)) {
+ ldout(cct, 1) << __func__ << " decompress job id=" << decompress_id << " failed!"<< dendl;
+ it->second.status.set(ERROR);
+ return -EIO;
+ }
*finished = true;
} else {
job_lock.Unlock();
diff --git a/src/compressor/AsyncCompressor.h b/src/compressor/AsyncCompressor.h
index d34cbe6bf09..659f506be57 100644
--- a/src/compressor/AsyncCompressor.h
+++ b/src/compressor/AsyncCompressor.h
@@ -35,7 +35,8 @@ class AsyncCompressor {
enum {
WAIT,
WORKING,
- DONE
+ DONE,
+ ERROR
} status;
struct Job {
uint64_t id;
@@ -79,7 +80,6 @@ class AsyncCompressor {
break;
} else {
Mutex::Locker (async_compressor->job_lock);
- assert(item->status.read() == DONE);
async_compressor->jobs.erase(item->id);
item = NULL;
}
@@ -89,16 +89,19 @@ class AsyncCompressor {
void _process(Job *item, ThreadPool::TPHandle &handle) {
assert(item->status.read() == WORKING);
bufferlist out;
+ int r;
if (item->is_compress)
- async_compressor->compressor->compress(item->data, out);
+ r = async_compressor->compressor->compress(item->data, out);
else
- async_compressor->compressor->decompress(item->data, out);
- item->data.swap(out);
- }
- void _process_finish(Job *item) {
- assert(item->status.read() == WORKING);
- item->status.set(DONE);
+ r = async_compressor->compressor->decompress(item->data, out);
+ if (!r) {
+ item->data.swap(out);
+ assert(item->status.cas(WORKING, DONE));
+ } else {
+ item->status.set(ERROR);
+ }
}
+ void _process_finish(Job *item) {}
void _clear() {}
} compress_wq;
friend class CompressWQ;
diff --git a/src/compressor/SnappyCompressor.h b/src/compressor/SnappyCompressor.h
index 8dc3497c45f..f5d86d5a242 100644
--- a/src/compressor/SnappyCompressor.h
+++ b/src/compressor/SnappyCompressor.h
@@ -64,7 +64,6 @@ class SnappyCompressor : public Compressor {
BufferlistSource source(src);
size_t res_len = 0;
// Trick, decompress only need first 32bits buffer
- list<bufferptr>::const_iterator pb = src.buffers().begin();
if (!snappy::GetUncompressedLength(src.get_contiguous(0, 8), 8, &res_len))
return -1;
bufferptr ptr(res_len);