diff options
author | Haomai Wang <haomaiwang@gmail.com> | 2015-03-20 05:14:02 +0100 |
---|---|---|
committer | Haomai Wang <haomaiwang@gmail.com> | 2015-07-01 15:12:08 +0200 |
commit | 8e48ba16c7e96f5164910267c2162052bbfed702 (patch) | |
tree | abf411152c4af0334c46ff98089cb0f27ce53d4f /src/compressor | |
parent | Compressor: Add compressor infrastructure for ceph (diff) | |
download | ceph-8e48ba16c7e96f5164910267c2162052bbfed702.tar.xz ceph-8e48ba16c7e96f5164910267c2162052bbfed702.zip |
Compressor: add decompress failed codes
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
Diffstat (limited to 'src/compressor')
-rw-r--r-- | src/compressor/AsyncCompressor.cc | 28 | ||||
-rw-r--r-- | src/compressor/AsyncCompressor.h | 21 | ||||
-rw-r--r-- | src/compressor/SnappyCompressor.h | 1 |
3 files changed, 36 insertions, 14 deletions
diff --git a/src/compressor/AsyncCompressor.cc b/src/compressor/AsyncCompressor.cc index d9b52b06ac5..e2ca2fe48bb 100644 --- a/src/compressor/AsyncCompressor.cc +++ b/src/compressor/AsyncCompressor.cc @@ -140,17 +140,27 @@ int AsyncCompressor::get_compress_data(uint64_t compress_id, bufferlist &data, b ldout(cct, 10) << __func__ << " missing to get compress job id=" << compress_id << dendl; return -ENOENT; } + int status; retry: - if (it->second.status.read() == DONE) { + status = it->second.status.read(); + if (status == DONE) { ldout(cct, 20) << __func__ << " successfully getting compressed data, job id=" << compress_id << dendl; *finished = true; data.swap(it->second.data); jobs.erase(it); + } else if (status == ERROR) { + ldout(cct, 20) << __func__ << " compressed data failed, job id=" << compress_id << dendl; + jobs.erase(it); + return -EIO; } else if (blocking) { if (it->second.status.cas(WAIT, DONE)) { ldout(cct, 10) << __func__ << " compress job id=" << compress_id << " hasn't finished, abort!"<< dendl; - compressor->compress(it->second.data, data); + if (compressor->compress(it->second.data, data)) { + ldout(cct, 1) << __func__ << " compress job id=" << compress_id << " failed!"<< dendl; + it->second.status.set(ERROR); + return -EIO; + } *finished = true; } else { job_lock.Unlock(); @@ -174,17 +184,27 @@ int AsyncCompressor::get_decompress_data(uint64_t decompress_id, bufferlist &dat ldout(cct, 10) << __func__ << " missing to get decompress job id=" << decompress_id << dendl; return -ENOENT; } + int status; retry: - if (it->second.status.read() == DONE) { + status = it->second.status.read(); + if (status == DONE) { ldout(cct, 20) << __func__ << " successfully getting decompressed data, job id=" << decompress_id << dendl; *finished = true; data.swap(it->second.data); jobs.erase(it); + } else if (status == ERROR) { + ldout(cct, 20) << __func__ << " compressed data failed, job id=" << decompress_id << dendl; + jobs.erase(it); + return -EIO; } else if (blocking) { if (it->second.status.cas(WAIT, DONE)) { ldout(cct, 10) << __func__ << " decompress job id=" << decompress_id << " hasn't started, abort!"<< dendl; - compressor->decompress(it->second.data, data); + if (compressor->decompress(it->second.data, data)) { + ldout(cct, 1) << __func__ << " decompress job id=" << decompress_id << " failed!"<< dendl; + it->second.status.set(ERROR); + return -EIO; + } *finished = true; } else { job_lock.Unlock(); diff --git a/src/compressor/AsyncCompressor.h b/src/compressor/AsyncCompressor.h index d34cbe6bf09..659f506be57 100644 --- a/src/compressor/AsyncCompressor.h +++ b/src/compressor/AsyncCompressor.h @@ -35,7 +35,8 @@ class AsyncCompressor { enum { WAIT, WORKING, - DONE + DONE, + ERROR } status; struct Job { uint64_t id; @@ -79,7 +80,6 @@ class AsyncCompressor { break; } else { Mutex::Locker (async_compressor->job_lock); - assert(item->status.read() == DONE); async_compressor->jobs.erase(item->id); item = NULL; } @@ -89,16 +89,19 @@ class AsyncCompressor { void _process(Job *item, ThreadPool::TPHandle &handle) { assert(item->status.read() == WORKING); bufferlist out; + int r; if (item->is_compress) - async_compressor->compressor->compress(item->data, out); + r = async_compressor->compressor->compress(item->data, out); else - async_compressor->compressor->decompress(item->data, out); - item->data.swap(out); - } - void _process_finish(Job *item) { - assert(item->status.read() == WORKING); - item->status.set(DONE); + r = async_compressor->compressor->decompress(item->data, out); + if (!r) { + item->data.swap(out); + assert(item->status.cas(WORKING, DONE)); + } else { + item->status.set(ERROR); + } } + void _process_finish(Job *item) {} void _clear() {} } compress_wq; friend class CompressWQ; diff --git a/src/compressor/SnappyCompressor.h b/src/compressor/SnappyCompressor.h index 8dc3497c45f..f5d86d5a242 100644 --- a/src/compressor/SnappyCompressor.h +++ b/src/compressor/SnappyCompressor.h @@ -64,7 +64,6 @@ class SnappyCompressor : public Compressor { BufferlistSource source(src); size_t res_len = 0; // Trick, decompress only need first 32bits buffer - list<bufferptr>::const_iterator pb = src.buffers().begin(); if (!snappy::GetUncompressedLength(src.get_contiguous(0, 8), 8, &res_len)) return -1; bufferptr ptr(res_len); |