diff options
author | Josh Durgin <josh.durgin@inktank.com> | 2012-05-15 19:58:59 +0200 |
---|---|---|
committer | Josh Durgin <josh.durgin@inktank.com> | 2012-05-17 01:31:35 +0200 |
commit | 59c6816542f23148164015fa9e0b697cfbcab14b (patch) | |
tree | 964371dc0b326ef870631dc2869b2047c4a32527 | |
parent | librados: avoid overflow in the return value of reads (diff) | |
download | ceph-59c6816542f23148164015fa9e0b697cfbcab14b.tar.xz ceph-59c6816542f23148164015fa9e0b697cfbcab14b.zip |
ObjectCacher: propagate read errors to the caller
Previously the return value of a read operation was ignored. Now a
read error sets the error field, and changes the BufferHead to a new
error state. Error state BufferHeads are treated as misses so they can
be retried when requested by a user of the ObjectCacher. When _readx
is called again internally, they're treated as hits so the error can
be returned to the user.
The error value is ignored if the BufferHead is not in the error
state.
Signed-off-by: Josh Durgin <josh.durgin@inktank.com>
-rw-r--r-- | src/osdc/ObjectCacher.cc | 93 | ||||
-rw-r--r-- | src/osdc/ObjectCacher.h | 31 |
2 files changed, 88 insertions, 36 deletions
diff --git a/src/osdc/ObjectCacher.cc b/src/osdc/ObjectCacher.cc index 5d0e9886bfb..8a1a7fcedb2 100644 --- a/src/osdc/ObjectCacher.cc +++ b/src/osdc/ObjectCacher.cc @@ -4,6 +4,7 @@ #include "msg/Messenger.h" #include "ObjectCacher.h" #include "WritebackHandler.h" +#include "common/errno.h" #include "common/perf_counters.h" @@ -27,7 +28,7 @@ ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left, loff_t o right->last_write_tid = left->last_write_tid; right->set_state(left->get_state()); right->snapc = left->snapc; - + loff_t newleftlen = off - left->start(); right->set_start(off); right->set_length(left->length() - newleftlen); @@ -114,15 +115,16 @@ void ObjectCacher::Object::try_merge_bh(BufferHead *bh) p->second->get_state() == bh->get_state()) { merge_left(p->second, bh); bh = p->second; - } else + } else { p++; + } } // to the right? assert(p->second == bh); p++; if (p != data.end() && p->second->start() == bh->end() && - p->second->get_state() == bh->get_state()) + p->second->get_state() == bh->get_state()) merge_left(bh, p->second); } @@ -160,7 +162,8 @@ bool ObjectCacher::Object::is_cached(loff_t cur, loff_t left) int ObjectCacher::Object::map_read(OSDRead *rd, map<loff_t, BufferHead*>& hits, map<loff_t, BufferHead*>& missing, - map<loff_t, BufferHead*>& rx) + map<loff_t, BufferHead*>& rx, + map<loff_t, BufferHead*>& errors) { for (vector<ObjectExtent>::iterator ex_it = rd->extents.begin(); ex_it != rd->extents.end(); @@ -169,7 +172,8 @@ int ObjectCacher::Object::map_read(OSDRead *rd, if (ex_it->oid != oid.oid) continue; ldout(oc->cct, 10) << "map_read " << ex_it->oid - << " " << ex_it->offset << "~" << ex_it->length << dendl; + << " " << ex_it->offset << "~" << ex_it->length + << dendl; loff_t cur = ex_it->offset; loff_t left = ex_it->length; @@ -195,18 +199,21 @@ int ObjectCacher::Object::map_read(OSDRead *rd, if (p->first <= cur) { // have it (or part of it) BufferHead *e = p->second; - + if (e->is_clean() || e->is_dirty() || e->is_tx()) { hits[cur] = e; // readable! ldout(oc->cct, 20) << "map_read hit " << *e << dendl; - } - else if (e->is_rx()) { + } else if (e->is_rx()) { rx[cur] = e; // missing, not readable. ldout(oc->cct, 20) << "map_read rx " << *e << dendl; - } - else assert(0); + } else if (e->is_error()) { + errors[cur] = e; + ldout(oc->cct, 20) << "map_read error " << *e << dendl; + } else { + assert(0); + } loff_t lenfromcur = MIN(e->end() - cur, left); cur += lenfromcur; @@ -226,12 +233,12 @@ int ObjectCacher::Object::map_read(OSDRead *rd, left -= MIN(left, n->length()); ldout(oc->cct, 20) << "map_read gap " << *n << dendl; continue; // more? - } - else + } else { assert(0); + } } } - return(0); + return 0; } /* @@ -251,7 +258,7 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(OSDWrite *wr) if (ex_it->oid != oid.oid) continue; ldout(oc->cct, 10) << "map_write oex " << ex_it->oid - << " " << ex_it->offset << "~" << ex_it->length << dendl; + << " " << ex_it->offset << "~" << ex_it->length << dendl; loff_t cur = ex_it->offset; loff_t left = ex_it->length; @@ -418,7 +425,8 @@ ObjectCacher::ObjectCacher(CephContext *cct_, string name, WritebackHandler& wb, max_dirty(max_dirty), target_dirty(target_dirty), max_size(max_size), flush_set_callback(flush_callback), flush_set_callback_arg(flush_callback_arg), flusher_stop(false), flusher_thread(this), - stat_clean(0), stat_dirty(0), stat_rx(0), stat_tx(0), stat_missing(0), stat_dirty_waiting(0) + stat_clean(0), stat_dirty(0), stat_rx(0), stat_tx(0), stat_missing(0), + stat_error(0), stat_dirty_waiting(0) { this->max_dirty_age.set_from_double(max_dirty_age); perf_start(); @@ -527,10 +535,6 @@ void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid, loff_t start, << " returned " << r << dendl; - if (r < 0) { - // TODO: fix bad read case - } - if (bl.length() < length) { bufferptr bp(length - bl.length()); bp.zero(); @@ -574,7 +578,14 @@ void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid, loff_t start, bh->bl.substr_of(bl, opos-bh->start(), bh->length()); - mark_clean(bh); + + if (r < 0 && r != -ENOENT) { + mark_error(bh); + bh->error = r; + } else { + mark_clean(bh); + } + ldout(cct, 10) << "bh_read_finish read " << *bh << dendl; opos = bh->end(); @@ -588,7 +599,7 @@ void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid, loff_t start, p++) ls.splice(ls.end(), p->second); bh->waitfor_read.clear(); - finish_contexts(cct, ls); + finish_contexts(cct, ls, bh->error); // clean up? ob->try_merge_bh(bh); @@ -855,6 +866,7 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, { assert(lock.is_locked()); bool success = true; + int error = 0; list<BufferHead*> hit_ls; uint64_t bytes_in_cache = 0; uint64_t bytes_not_in_cache = 0; @@ -873,8 +885,17 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, Object *o = get_object(soid, oset, ex_it->oloc); // map extent into bufferheads - map<loff_t, BufferHead*> hits, missing, rx; - o->map_read(rd, hits, missing, rx); + map<loff_t, BufferHead*> hits, missing, rx, errors; + o->map_read(rd, hits, missing, rx, errors); + if (external_call) { + // retry reading error buffers + missing.insert(errors.begin(), errors.end()); + } else { + // some reads had errors, fail later so completions + // are cleaned up up properly + // TODO: make read path not call _readx for every completion + hits.insert(errors.begin(), errors.end()); + } if (!missing.empty() || !rx.empty()) { // read missing @@ -912,6 +933,8 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, bh_it != hits.end(); bh_it++) { ldout(cct, 10) << "readx hit bh " << *bh_it->second << dendl; + if (bh_it->second->is_error() && bh_it->second->error) + error = bh_it->second->error; hit_ls.push_back(bh_it->second); bytes_in_cache += bh_it->second->length(); } @@ -990,7 +1013,7 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, // ok, assemble into result buffer. uint64_t pos = 0; - if (rd->bl) { + if (rd->bl && !error) { rd->bl->clear(); for (map<uint64_t,bufferlist>::iterator i = stripe_map.begin(); i != stripe_map.end(); @@ -1013,7 +1036,7 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, assert(pos <= (uint64_t) INT_MAX); - return pos; + return error ? error : pos; } @@ -1728,7 +1751,7 @@ void ObjectCacher::verify_stats() const { ldout(cct, 10) << "verify_stats" << dendl; - loff_t clean = 0, dirty = 0, rx = 0, tx = 0, missing = 0; + loff_t clean = 0, dirty = 0, rx = 0, tx = 0, missing = 0, error = 0; for (vector<hash_map<sobject_t, Object*> >::const_iterator i = objects.begin(); i != objects.end(); ++i) { @@ -1756,6 +1779,9 @@ void ObjectCacher::verify_stats() const case BufferHead::STATE_RX: rx += bh->length(); break; + case BufferHead::STATE_ERROR: + error += bh->length(); + break; default: assert(0); } @@ -1768,12 +1794,14 @@ void ObjectCacher::verify_stats() const << " tx " << tx << " dirty " << dirty << " missing " << missing + << " error " << error << dendl; assert(clean == stat_clean); assert(rx == stat_rx); assert(tx == stat_tx); assert(dirty == stat_dirty); assert(missing == stat_missing); + assert(error == stat_error); } void ObjectCacher::bh_stat_add(BufferHead *bh) @@ -1798,6 +1826,11 @@ void ObjectCacher::bh_stat_add(BufferHead *bh) case BufferHead::STATE_RX: stat_rx += bh->length(); break; + case BufferHead::STATE_ERROR: + stat_error += bh->length(); + break; + default: + assert(0 == "bh_stat_add: invalid bufferhead state"); } if (get_stat_dirty_waiting() > 0) stat_cond.Signal(); @@ -1825,6 +1858,11 @@ void ObjectCacher::bh_stat_sub(BufferHead *bh) case BufferHead::STATE_RX: stat_rx -= bh->length(); break; + case BufferHead::STATE_ERROR: + stat_error -= bh->length(); + break; + default: + assert(0 == "bh_stat_sub: invalid bufferhead state"); } } @@ -1841,6 +1879,9 @@ void ObjectCacher::bh_set_state(BufferHead *bh, int s) lru_rest.lru_insert_top(bh); dirty_bh.erase(bh); } + if (s != BufferHead::STATE_ERROR && bh->get_state() == BufferHead::STATE_ERROR) { + bh->error = 0; + } // set state bh_stat_sub(bh); diff --git a/src/osdc/ObjectCacher.h b/src/osdc/ObjectCacher.h index f959cf359d7..5fd3d3130b4 100644 --- a/src/osdc/ObjectCacher.h +++ b/src/osdc/ObjectCacher.h @@ -87,7 +87,8 @@ class ObjectCacher { static const int STATE_DIRTY = 2; static const int STATE_RX = 3; static const int STATE_TX = 4; - + static const int STATE_ERROR = 5; // a read error occurred + private: // my fields int state; @@ -102,16 +103,17 @@ class ObjectCacher { tid_t last_write_tid; // version of bh (if non-zero) utime_t last_write; SnapContext snapc; + int error; // holds return value for failed reads map< loff_t, list<Context*> > waitfor_read; - public: // cons BufferHead(Object *o) : state(STATE_MISSING), ref(0), ob(o), - last_write_tid(0) {} + last_write_tid(0), + error(0) {} // extent loff_t start() const { return ex.start; } @@ -134,6 +136,7 @@ class ObjectCacher { bool is_clean() { return state == STATE_CLEAN; } bool is_tx() { return state == STATE_TX; } bool is_rx() { return state == STATE_RX; } + bool is_error() { return state == STATE_ERROR; } // reference counting int get() { @@ -148,7 +151,6 @@ class ObjectCacher { return ref; } }; - // ******* Object ********* class Object { @@ -251,6 +253,7 @@ class ObjectCacher { assert(data.count(bh->start())); data.erase(bh->start()); } + bool is_empty() { return data.empty(); } // mid-level @@ -262,7 +265,8 @@ class ObjectCacher { int map_read(OSDRead *rd, map<loff_t, BufferHead*>& hits, map<loff_t, BufferHead*>& missing, - map<loff_t, BufferHead*>& rx); + map<loff_t, BufferHead*>& rx, + map<loff_t, BufferHead*>& errors); BufferHead *map_write(OSDWrite *wr); void truncate(loff_t s); @@ -341,6 +345,7 @@ class ObjectCacher { loff_t stat_rx; loff_t stat_tx; loff_t stat_missing; + loff_t stat_error; loff_t stat_dirty_waiting; // bytes that writers are waiting on to write void verify_stats() const; @@ -370,6 +375,7 @@ class ObjectCacher { void mark_clean(BufferHead *bh) { bh_set_state(bh, BufferHead::STATE_CLEAN); }; void mark_rx(BufferHead *bh) { bh_set_state(bh, BufferHead::STATE_RX); }; void mark_tx(BufferHead *bh) { bh_set_state(bh, BufferHead::STATE_TX); }; + void mark_error(BufferHead *bh) { bh_set_state(bh, BufferHead::STATE_ERROR); }; void mark_dirty(BufferHead *bh) { bh_set_state(bh, BufferHead::STATE_DIRTY); lru_dirty.lru_touch(bh); @@ -491,11 +497,15 @@ class ObjectCacher { Context *onfinish; public: C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c) : oc(_oc), rd(r), oset(os), onfinish(c) {} - void finish(int) { - int r = oc->_readx(rd, oset, onfinish, false); - if (r > 0 && onfinish) { - onfinish->finish(r); - delete onfinish; + void finish(int r) { + if (r < 0) { + if (onfinish) + onfinish->complete(r); + return; + } + int ret = oc->_readx(rd, oset, onfinish, false); + if (ret != 0 && onfinish) { + onfinish->complete(ret); } } }; @@ -591,6 +601,7 @@ inline ostream& operator<<(ostream& out, ObjectCacher::BufferHead &bh) if (bh.is_clean()) out << " clean"; if (bh.is_missing()) out << " missing"; if (bh.bl.length() > 0) out << " firstbyte=" << (int)bh.bl[0]; + if (bh.error) out << " error=" << bh.error; out << "]"; return out; } |