diff options
author | Sage Weil <sage@inktank.com> | 2013-09-04 01:00:28 +0200 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-09-04 01:00:28 +0200 |
commit | 2b2f296ed84169cd872d2ced3714f9a180994903 (patch) | |
tree | 8547a130e27169a79976685bdbc627379d6f5611 | |
parent | doc: Fix repo URL for Ceph cloning (dev/generatedocs) (diff) | |
parent | ceph_test_rados: test COPY_FROM (diff) | |
download | ceph-2b2f296ed84169cd872d2ced3714f9a180994903.tar.xz ceph-2b2f296ed84169cd872d2ced3714f9a180994903.zip |
Merge branch 'wip-copyfrom'
Reviewed-by: Samuel Just <sam.just@inktank.com>
-rw-r--r-- | src/common/config_opts.h | 1 | ||||
-rw-r--r-- | src/include/ceph_strings.cc | 1 | ||||
-rw-r--r-- | src/include/rados.h | 5 | ||||
-rw-r--r-- | src/include/rados/librados.hpp | 14 | ||||
-rw-r--r-- | src/librados/librados.cc | 8 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 219 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.h | 48 | ||||
-rw-r--r-- | src/osd/osd_types.cc | 2 | ||||
-rw-r--r-- | src/osd/osd_types.h | 6 | ||||
-rw-r--r-- | src/osdc/Objecter.h | 8 | ||||
-rw-r--r-- | src/test/librados/misc.cc | 50 | ||||
-rw-r--r-- | src/test/osd/RadosModel.h | 94 | ||||
-rw-r--r-- | src/test/osd/TestRados.cc | 10 |
13 files changed, 460 insertions, 6 deletions
diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 328f7f4b94d..2fa72d4ce0f 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -444,6 +444,7 @@ OPTION(osd_recovery_delay_start, OPT_FLOAT, 0) OPTION(osd_recovery_max_active, OPT_INT, 15) OPTION(osd_recovery_max_single_start, OPT_INT, 5) OPTION(osd_recovery_max_chunk, OPT_U64, 8<<20) // max size of push chunk +OPTION(osd_copyfrom_max_chunk, OPT_U64, 8<<20) // max size of a COPYFROM chunk OPTION(osd_push_per_object_cost, OPT_U64, 1000) // push cost per object OPTION(osd_max_push_cost, OPT_U64, 8<<20) // max size of push message OPTION(osd_max_push_objects, OPT_U64, 10) // max objects in single push op diff --git a/src/include/ceph_strings.cc b/src/include/ceph_strings.cc index f14f29ce0e9..e86aae4fd50 100644 --- a/src/include/ceph_strings.cc +++ b/src/include/ceph_strings.cc @@ -49,6 +49,7 @@ const char *ceph_osd_op_name(int op) case CEPH_OSD_OP_WATCH: return "watch"; case CEPH_OSD_OP_COPY_GET: return "copy-get"; + case CEPH_OSD_OP_COPY_FROM: return "copy-from"; case CEPH_OSD_OP_CLONERANGE: return "clonerange"; case CEPH_OSD_OP_ASSERT_SRC_VERSION: return "assert-src-version"; diff --git a/src/include/rados.h b/src/include/rados.h index 27291a7440e..178c171c445 100644 --- a/src/include/rados.h +++ b/src/include/rados.h @@ -217,6 +217,7 @@ enum { CEPH_OSD_OP_OMAPRMKEYS = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 24, CEPH_OSD_OP_OMAP_CMP = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 25, + CEPH_OSD_OP_COPY_FROM = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 26, CEPH_OSD_OP_COPY_GET = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 27, /** multi **/ @@ -410,6 +411,10 @@ struct ceph_osd_op { struct { __le64 max; /* max data in reply */ } __attribute__ ((packed)) copy_get; + struct { + __le64 snapid; + __le64 src_version; + } __attribute__ ((packed)) copy_from; }; __le32 payload_len; } __attribute__ ((packed)); diff --git a/src/include/rados/librados.hpp b/src/include/rados/librados.hpp index bc0bcc95ceb..5a750cbc0d1 100644 --- a/src/include/rados/librados.hpp +++ b/src/include/rados/librados.hpp @@ -265,6 +265,19 @@ namespace librados */ void omap_rm_keys(const std::set<std::string> &to_rm); + /** + * Copy an object + * + * Copies an object from another location. The operation is atomic in that + * the copy either succeeds in its entirety or fails (e.g., because the + * source object was modified while the copy was in progress). + * + * @param src source object name + * @param src_ioctx ioctx for the source object + * @param version current version of the source object + */ + void copy_from(const std::string& src, const IoCtx& src_ioctx, uint64_t src_version); + friend class IoCtx; }; @@ -674,6 +687,7 @@ namespace librados IoCtx(IoCtxImpl *io_ctx_impl_); friend class Rados; // Only Rados can use our private constructor to create IoCtxes. + friend class ObjectWriteOperation; // copy_from needs to see our IoCtxImpl IoCtxImpl *io_ctx_impl; }; diff --git a/src/librados/librados.cc b/src/librados/librados.cc index 12372d960b1..852228ed383 100644 --- a/src/librados/librados.cc +++ b/src/librados/librados.cc @@ -382,6 +382,14 @@ void librados::ObjectWriteOperation::omap_rm_keys( o->omap_rm_keys(to_rm); } +void librados::ObjectWriteOperation::copy_from(const std::string& src, + const IoCtx& src_ioctx, + uint64_t src_version) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->copy_from(object_t(src), src_ioctx.io_ctx_impl->snap_seq, src_ioctx.io_ctx_impl->oloc, src_version); +} + void librados::ObjectWriteOperation::tmap_put(const bufferlist &bl) { ::ObjectOperation *o = (::ObjectOperation *)impl; diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 0027edda077..2c96180b13a 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1000,6 +1000,11 @@ void ReplicatedPG::execute_ctx(OpContext *ctx) p->second->ondisk_read_unlock(); } + if (result == -EINPROGRESS) { + // come back later. + return; + } + if (result == -EAGAIN) { // clean up after the ctx delete ctx; @@ -3386,6 +3391,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) if (result < 0) break; cursor.attr_complete = true; + dout(20) << " got attrs" << dendl; } ::encode(out_attrs, osd_op.outdata); @@ -3395,15 +3401,17 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) bufferlist bl; if (left > 0 && !cursor.data_complete) { if (cursor.data_offset < oi.size) { - result = osd->store->read(coll, oi.soid, cursor.data_offset, out_max, bl); + result = osd->store->read(coll, oi.soid, cursor.data_offset, left, bl); if (result < 0) return result; assert(result <= left); left -= result; cursor.data_offset += result; } - if (cursor.data_offset == oi.size) + if (cursor.data_offset == oi.size) { cursor.data_complete = true; + dout(20) << " got data" << dendl; + } } ::encode(bl, osd_op.outdata); @@ -3423,15 +3431,73 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) cursor.omap_offset = iter->key(); } else { cursor.omap_complete = true; + dout(20) << " got omap" << dendl; } } ::encode(out_omap, osd_op.outdata); + dout(20) << " cursor.is_complete=" << cursor.is_complete() << dendl; ::encode(cursor, osd_op.outdata); result = 0; } break; + case CEPH_OSD_OP_COPY_FROM: + ++ctx->num_write; + { + object_t src_name; + object_locator_t src_oloc; + snapid_t src_snapid = (uint64_t)op.copy_from.snapid; + version_t src_version = op.copy_from.src_version; + try { + ::decode(src_name, bp); + ::decode(src_oloc, bp); + } + catch (buffer::error& e) { + result = -EINVAL; + goto fail; + } + pg_t raw_pg; + get_osdmap()->object_locator_to_pg(src_name, src_oloc, raw_pg); + hobject_t src(src_name, src_oloc.key, src_snapid, + raw_pg.ps(), raw_pg.pool(), + src_oloc.nspace); + if (!ctx->copy_op) { + // start + result = start_copy(ctx, src, src_oloc, src_version, &ctx->copy_op); + if (result < 0) + goto fail; + result = -EINPROGRESS; + } else { + // finish + CopyOpRef cop = ctx->copy_op; + + if (!obs.exists) { + ctx->delta_stats.num_objects++; + obs.exists = true; + } else { + t.remove(coll, soid); + } + t.write(coll, soid, 0, cop->data.length(), cop->data); + for (map<string,bufferlist>::iterator p = cop->attrs.begin(); p != cop->attrs.end(); ++p) + t.setattr(coll, soid, string("_") + p->first, p->second); + t.omap_setkeys(coll, soid, cop->omap); + + interval_set<uint64_t> ch; + if (oi.size > 0) + ch.insert(0, oi.size); + ctx->modified_ranges.union_of(ch); + + if (cop->data.length() != oi.size) { + ctx->delta_stats.num_bytes -= oi.size; + oi.size = cop->data.length(); + ctx->delta_stats.num_bytes += oi.size; + } + ctx->delta_stats.num_wr++; + ctx->delta_stats.num_wr_kb += SHIFT_ROUND_UP(cop->data.length(), 10); + } + } + break; default: dout(1) << "unrecognized osd op " << op.op @@ -4013,6 +4079,152 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx) return result; } +// ======================================================================== +// copyfrom + +struct C_Copyfrom : public Context { + ReplicatedPGRef pg; + hobject_t oid; + epoch_t last_peering_reset; + tid_t tid; + C_Copyfrom(ReplicatedPG *p, hobject_t o, epoch_t lpr) + : pg(p), oid(o), last_peering_reset(lpr), + tid(0) + {} + void finish(int r) { + pg->lock(); + if (last_peering_reset == pg->get_last_peering_reset()) { + pg->process_copy_chunk(oid, tid, r); + } + pg->unlock(); + } +}; + +int ReplicatedPG::start_copy(OpContext *ctx, + hobject_t src, object_locator_t oloc, version_t version, + CopyOpRef *pcop) +{ + const hobject_t& dest = ctx->obs->oi.soid; + dout(10) << __func__ << " " << dest << " ctx " << ctx + << " from " << src << " " << oloc << " v" << version + << dendl; + + // cancel a previous in-progress copy? + if (copy_ops.count(dest)) { + // FIXME: if the src etc match, we could avoid restarting from the + // beginning. + CopyOpRef cop = copy_ops[dest]; + cancel_copy(cop); + } + + CopyOpRef cop(new CopyOp(ctx, src, oloc, version)); + copy_ops[dest] = cop; + ctx->copy_op = cop; + ++ctx->obc->copyfrom_readside; + + _copy_some(ctx, cop); + + return 0; +} + +void ReplicatedPG::_copy_some(OpContext *ctx, CopyOpRef cop) +{ + dout(10) << __func__ << " " << ctx << " " << cop << dendl; + ObjectOperation op; + op.assert_version(cop->version); + op.copy_get(&cop->cursor, g_conf->osd_copyfrom_max_chunk, + &cop->size, &cop->mtime, &cop->attrs, + &cop->data, &cop->omap, + &cop->rval); + + C_Copyfrom *fin = new C_Copyfrom(this, ctx->obs->oi.soid, + get_last_peering_reset()); + osd->objecter_lock.Lock(); + tid_t tid = osd->objecter->read(cop->src.oid, cop->oloc, op, + cop->src.snap, NULL, 0, + new C_OnFinisher(fin, + &osd->objecter_finisher), + NULL); + fin->tid = tid; + cop->objecter_tid = tid; + osd->objecter_lock.Unlock(); +} + +void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r) +{ + dout(10) << __func__ << " tid " << tid << " " << cpp_strerror(r) << dendl; + map<hobject_t,CopyOpRef>::iterator p = copy_ops.find(oid); + if (p == copy_ops.end()) { + dout(10) << __func__ << " no copy_op found" << dendl; + return; + } + CopyOpRef cop = p->second; + if (tid != cop->objecter_tid) { + dout(10) << __func__ << " tid " << tid << " != cop " << cop + << " tid " << cop->objecter_tid << dendl; + return; + } + OpContext *ctx = cop->ctx; + cop->objecter_tid = 0; + if (r < 0) { + copy_ops.erase(ctx->obc->obs.oi.soid); + --ctx->obc->copyfrom_readside; + reply_ctx(ctx, r); + return; + } + assert(cop->rval >= 0); + + // FIXME: this is accumulating the entire object in memory. + + if (!cop->cursor.is_complete()) { + dout(10) << __func__ << " fetching more" << dendl; + _copy_some(ctx, cop); + return; + } + + dout(20) << __func__ << " complete; committing" << dendl; + execute_ctx(ctx); + + copy_ops.erase(ctx->obc->obs.oi.soid); + --ctx->obc->copyfrom_readside; + ctx->copy_op.reset(); +} + +void ReplicatedPG::cancel_copy(CopyOpRef cop) +{ + OpContext *ctx = cop->ctx; + dout(10) << __func__ << " " << ctx->obc->obs.oi.soid << " ctx " << ctx + << " from " << cop->src << " " << cop->oloc << " v" << cop->version + << dendl; + + // cancel objecter op, if we can + if (cop->objecter_tid) { + Mutex::Locker l(osd->objecter_lock); + osd->objecter->op_cancel(cop->objecter_tid); + } + + copy_ops.erase(ctx->obc->obs.oi.soid); + --ctx->obc->copyfrom_readside; + ctx->copy_op.reset(); + + delete ctx; +} + +void ReplicatedPG::requeue_cancel_copy_ops(bool requeue) +{ + dout(10) << __func__ << dendl; + for (map<hobject_t,CopyOpRef>::iterator p = copy_ops.begin(); + p != copy_ops.end(); + copy_ops.erase(p++)) { + // requeue initiating copy *and* any subsequent waiters + CopyOpRef cop = p->second; + if (requeue) { + cop->waiting.push_front(cop->ctx->op); + requeue_ops(cop->waiting); + } + cancel_copy(cop); + } +} // ======================================================================== @@ -6736,6 +6948,7 @@ void ReplicatedPG::on_shutdown() deleting = true; unreg_next_scrub(); + requeue_cancel_copy_ops(false); apply_and_flush_repops(false); context_registry_on_change(); @@ -6786,6 +6999,8 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t) context_registry_on_change(); + requeue_cancel_copy_ops(is_primary()); + // requeue object waiters if (is_primary()) { requeue_ops(waiting_for_backfill_pos); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 5dc7d882a8b..254b5842ffc 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -83,7 +83,40 @@ public: class ReplicatedPG : public PG { friend class OSD; friend class Watch; -public: + +public: + + /* + * state associated with a copy operation + */ + struct OpContext; + + struct CopyOp { + OpContext *ctx; + hobject_t src; + object_locator_t oloc; + version_t version; + + tid_t objecter_tid; + + list<OpRequestRef> waiting; + + object_copy_cursor_t cursor; + uint64_t size; + utime_t mtime; + map<string,bufferlist> attrs; + bufferlist data; + map<string,bufferlist> omap; + int rval; + + CopyOp(OpContext *c, hobject_t s, object_locator_t l, version_t v) + : ctx(c), src(s), oloc(l), version(v), + objecter_tid(0), + size(0), + rval(-1) + {} + }; + typedef boost::shared_ptr<CopyOp> CopyOpRef; /* * Capture all object state associated with an in-progress read or write. @@ -145,6 +178,8 @@ public: int num_read; ///< count read ops int num_write; ///< count update ops + CopyOpRef copy_op; + OpContext(const OpContext& other); const OpContext& operator=(const OpContext& other); @@ -749,6 +784,17 @@ protected: void log_subop_stats(OpRequestRef op, int tag_inb, int tag_lat); + // -- copyfrom -- + map<hobject_t, CopyOpRef> copy_ops; + + int start_copy(OpContext *ctx, hobject_t src, object_locator_t oloc, version_t version, + CopyOpRef *pcop); + void process_copy_chunk(hobject_t oid, tid_t tid, int r); + void _copy_some(OpContext *ctx, CopyOpRef cop); + void cancel_copy(CopyOpRef cop); + void requeue_cancel_copy_ops(bool requeue=true); + + friend class C_Copyfrom; // -- scrub -- virtual void _scrub(ScrubMap& map); diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index 1c1b457002c..3451d520ff2 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -3484,6 +3484,8 @@ ostream& operator<<(ostream& out, const OSDOp& op) break; case CEPH_OSD_OP_COPY_GET: out << " max " << op.op.copy_get.max; + case CEPH_OSD_OP_COPY_FROM: + out << " ver " << op.op.copy_from.src_version; break; default: out << " " << op.op.extent.offset << "~" << op.op.extent.length; diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 00e9409c98a..312eb81e3fd 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -2130,6 +2130,9 @@ public: Cond cond; int unstable_writes, readers, writers_waiting, readers_waiting; + /// in-progress copyfrom ops for this object + int copyfrom_readside; + // set if writes for this object are blocked on another objects recovery ObjectContextRef blocked_by; // object blocking our writes set<ObjectContextRef> blocking; // objects whose writes we block @@ -2141,7 +2144,8 @@ public: : ssc(NULL), destructor_callback(0), lock("ReplicatedPG::ObjectContext::lock"), - unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0) {} + unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0), + copyfrom_readside(0) {} ~ObjectContext() { if (destructor_callback) diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 91f62551729..154ee410fde 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -753,6 +753,14 @@ struct ObjectOperation { OSDOp& osd_op = add_op(CEPH_OSD_OP_ROLLBACK); osd_op.op.snap.snapid = snapid; } + + void copy_from(object_t src, snapid_t snapid, object_locator_t src_oloc, version_t src_version) { + OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_FROM); + osd_op.op.copy_from.snapid = snapid; + osd_op.op.copy_from.src_version = src_version; + ::encode(src, osd_op.indata); + ::encode(src_oloc, osd_op.indata); + } }; diff --git a/src/test/librados/misc.cc b/src/test/librados/misc.cc index 6cb7cf5452a..af17847aeab 100644 --- a/src/test/librados/misc.cc +++ b/src/test/librados/misc.cc @@ -564,6 +564,56 @@ TEST(LibRadosMisc, BigAttrPP) { ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster)); } +TEST(LibRadosMisc, CopyPP) { + Rados cluster; + std::string pool_name = get_temp_pool_name(); + ASSERT_EQ("", create_one_pool_pp(pool_name, cluster)); + IoCtx ioctx; + ASSERT_EQ(0, cluster.ioctx_create(pool_name.c_str(), ioctx)); + + bufferlist bl, x; + bl.append("hi there"); + x.append("bar"); + + // small object + bufferlist blc = bl; + bufferlist xc = x; + ASSERT_EQ(0, ioctx.write_full("foo", blc)); + ASSERT_EQ(0, ioctx.setxattr("foo", "myattr", xc)); + + ObjectWriteOperation op; + op.copy_from("foo", ioctx, ioctx.get_last_version()); + ASSERT_EQ(0, ioctx.operate("foo.copy", &op)); + + bufferlist bl2, x2; + ASSERT_EQ((int)bl.length(), ioctx.read("foo.copy", bl2, 10000, 0)); + ASSERT_TRUE(bl.contents_equal(bl2)); + ASSERT_EQ((int)x.length(), ioctx.getxattr("foo.copy", "myattr", x2)); + ASSERT_TRUE(x.contents_equal(x2)); + + // do a big object + bl.append(buffer::create(8000000)); + bl.zero(); + bl.append("tail"); + blc = bl; + xc = x; + ASSERT_EQ(0, ioctx.write_full("big", blc)); + ASSERT_EQ(0, ioctx.setxattr("big", "myattr", xc)); + + ObjectWriteOperation op2; + op.copy_from("big", ioctx, ioctx.get_last_version()); + ASSERT_EQ(0, ioctx.operate("big.copy", &op)); + + bl2.clear(); + ASSERT_EQ((int)bl.length(), ioctx.read("big.copy", bl2, bl.length(), 0)); + ASSERT_TRUE(bl.contents_equal(bl2)); + ASSERT_EQ((int)x.length(), ioctx.getxattr("foo.copy", "myattr", x2)); + ASSERT_TRUE(x.contents_equal(x2)); + + ioctx.close(); + ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster)); +} + int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); diff --git a/src/test/osd/RadosModel.h b/src/test/osd/RadosModel.h index c545d9a29b8..b9967d7af50 100644 --- a/src/test/osd/RadosModel.h +++ b/src/test/osd/RadosModel.h @@ -48,7 +48,8 @@ enum TestOpType { TEST_OP_SETATTR, TEST_OP_RMATTR, TEST_OP_TMAPPUT, - TEST_OP_WATCH + TEST_OP_WATCH, + TEST_OP_COPY_FROM }; class TestWatchContext : public librados::WatchCtx { @@ -396,6 +397,12 @@ public: pool_obj_cont[current_snap].insert(pair<string,ObjectDesc>(oid, new_obj)); } + void update_object_full(const string &oid, const ObjectDesc &contents) + { + pool_obj_cont.rbegin()->second.erase(oid); + pool_obj_cont.rbegin()->second.insert(pair<string,ObjectDesc>(oid, contents)); + } + void update_object_version(const string &oid, uint64_t version) { for (map<int, map<string,ObjectDesc> >::reverse_iterator i = @@ -1378,4 +1385,89 @@ public: } }; +class CopyFromOp : public TestOp { +public: + string oid, oid_src; + ObjectDesc src_value; + librados::ObjectWriteOperation op; + librados::AioCompletion *comp; + int snap; + bool done; + tid_t tid; + CopyFromOp(RadosTestContext *context, + const string &oid, + const string &oid_src, + TestOpStat *stat) + : TestOp(context, stat), oid(oid), oid_src(oid_src), + src_value(&context->cont_gen), + comp(NULL), done(false), tid(0) + {} + + void _begin() + { + ContDesc cont; + { + Mutex::Locker l(context->state_lock); + cont = ContDesc(context->seq_num, context->current_snap, + context->seq_num, ""); + context->oid_in_use.insert(oid); + context->oid_not_in_use.erase(oid); + context->oid_in_use.insert(oid_src); + context->oid_not_in_use.erase(oid_src); + } + + // choose source snap + if (0 && !(rand() % 4) && !context->snaps.empty()) { + snap = rand_choose(context->snaps)->first; + } else { + snap = -1; + } + context->find_object(oid_src, &src_value, snap); + + string src = context->prefix+oid_src; + op.copy_from(src.c_str(), context->io_ctx, src_value.version); + + pair<TestOp*, TestOp::CallbackInfo*> *cb_arg = + new pair<TestOp*, TestOp::CallbackInfo*>(this, + new TestOp::CallbackInfo(0)); + comp = context->rados.aio_create_completion((void*) cb_arg, &write_callback, + NULL); + tid = context->io_ctx.aio_operate(context->prefix+oid, comp, &op); + } + + void _finish(CallbackInfo *info) + { + Mutex::Locker l(context->state_lock); + done = true; + int r; + assert(comp->is_complete()); + cout << "finishing copy_from tid " << tid << " to " << context->prefix + oid << std::endl; + if ((r = comp->get_return_value())) { + if (!(r == -ENOENT && src_value.deleted())) { + cerr << "Error: oid " << oid << " copy_from " << oid_src << " returned error code " + << r << std::endl; + } + } else { + context->update_object_full(oid, src_value); + context->update_object_version(oid, comp->get_version()); + } + context->oid_in_use.erase(oid); + context->oid_not_in_use.insert(oid); + context->oid_in_use.erase(oid_src); + context->oid_not_in_use.insert(oid_src); + context->kick(); + } + + bool finished() + { + return done; + } + + string getType() + { + return "TmapPutOp"; + } +}; + + #endif diff --git a/src/test/osd/TestRados.cc b/src/test/osd/TestRados.cc index 6ac661c0629..1deee23aa2c 100644 --- a/src/test/osd/TestRados.cc +++ b/src/test/osd/TestRados.cc @@ -84,7 +84,7 @@ private: TestOp *gen_op(RadosTestContext &context, TestOpType type) { - string oid; + string oid, oid2; cout << "oids not in use " << context.oid_not_in_use.size() << std::endl; assert(context.oid_not_in_use.size()); switch (type) { @@ -152,6 +152,13 @@ private: << " current snap is " << context.current_snap << std::endl; return new WatchOp(&context, oid, m_stats); + case TEST_OP_COPY_FROM: + oid = *(rand_choose(context.oid_not_in_use)); + oid2 = *(rand_choose(context.oid_not_in_use)); + cout << "copy_from " << oid << " from " << oid2 + << " current snap is " << context.current_snap << std::endl; + return new CopyFromOp(&context, oid, oid2, m_stats); + default: cerr << "Invalid op type " << type << std::endl; assert(0); @@ -192,6 +199,7 @@ int main(int argc, char **argv) { TEST_OP_RMATTR, "rmattr" }, { TEST_OP_TMAPPUT, "tmapput" }, { TEST_OP_WATCH, "watch" }, + { TEST_OP_COPY_FROM, "copy_from" }, { TEST_OP_READ /* grr */, NULL }, }; |