diff options
author | Haomai Wang <haomaiwang@gmail.com> | 2015-03-17 16:00:46 +0100 |
---|---|---|
committer | Haomai Wang <haomaiwang@gmail.com> | 2015-07-01 15:11:57 +0200 |
commit | 8f0919e366c789092c6c9cbd98665d9ef0d4f69d (patch) | |
tree | 2c65a2781736ff4e4e1081c30bda949dfd79a7dc /src/compressor | |
parent | Merge pull request #5094 from SUSE/wip-sharutils-dupe (diff) | |
download | ceph-8f0919e366c789092c6c9cbd98665d9ef0d4f69d.tar.xz ceph-8f0919e366c789092c6c9cbd98665d9ef0d4f69d.zip |
Compressor: Add compressor infrastructure for ceph
AsyncCompressor is a stucture which implement the nonblock compress method,
callers could compress this and then get the resulted data later. Of course,
callers could directly wait for the compressor completed.
Compression algorithm usually uses lots of helper data to speed compress,
so here AsyncCompressor uses dedicated threads to help cache-affinity. Second,
We want to make compression parallel with normal io logic. For example,
any component receive data may not need to care the actual data and need to
do some things with metadata. We could let metadata process parallel with data
compression.
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
Diffstat (limited to 'src/compressor')
-rw-r--r-- | src/compressor/AsyncCompressor.cc | 200 | ||||
-rw-r--r-- | src/compressor/AsyncCompressor.h | 126 | ||||
-rw-r--r-- | src/compressor/Compressor.cc | 25 | ||||
-rw-r--r-- | src/compressor/Compressor.h | 30 | ||||
-rw-r--r-- | src/compressor/Makefile.am | 11 | ||||
-rw-r--r-- | src/compressor/SnappyCompressor.h | 79 |
6 files changed, 471 insertions, 0 deletions
diff --git a/src/compressor/AsyncCompressor.cc b/src/compressor/AsyncCompressor.cc new file mode 100644 index 00000000000..d9b52b06ac5 --- /dev/null +++ b/src/compressor/AsyncCompressor.cc @@ -0,0 +1,200 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "common/dout.h" +#include "common/errno.h" +#include "AsyncCompressor.h" + +#define dout_subsys ceph_subsys_compressor +#undef dout_prefix +#define dout_prefix *_dout << "compressor " + +//void AsyncCompressor::_compress(bufferlist &in, bufferlist &out) +//{ +// uint64_t length = 0; +// size_t res_len; +// uint64_t left_pbrs = in.buffers().size(); +// compressor->max_compress_size(in.length(), &res_len); +// ldout(cct, 20) << __func__ << " data length=" << in.length() << " got max compressed size is " << res_len << dendl; +// bufferptr ptr(res_len); +// list<bufferptr>::const_iterator pb = in.buffers().begin(); +// while (left_pbrs--) { +// if (compressor->compress(pb->c_str(), pb->length(), ptr.c_str()+length, &res_len)) +// assert(0); +// ldout(cct, 20) << __func__ << " pb length=" << pb->length() << " compress size is " << res_len << dendl; +// out.append(ptr, length, length+res_len); +// length += res_len; +// pb++; +// } +// ldout(cct, 20) << __func__ << " total compressed length is " << length << dendl; +//} +// +//void AsyncCompressor::_decompress(bufferlist &in, bufferlist &out) +//{ +// int i = 0; +// uint64_t length = 0; +// size_t res_len; +// bufferptr ptr; +// vector<uint64_t> lens; +// list<bufferptr>::const_iterator pb = in.buffers().begin(); +// uint64_t left_pbrs = in.buffers().size(); +// while (left_pbrs--) { +// if (compressor->max_uncompress_size(pb->c_str(), pb->length(), &res_len)) +// assert(0); +// length += res_len; +// lens.push_back(res_len); +// pb++; +// } +// pb = in.buffers().begin(); +// left_pbrs = in.buffers().size(); +// ptr = bufferptr(length); +// length = 0; +// while (left_pbrs--) { +// res_len = lens[i++]; +// if (compressor->decompress(pb->c_str(), pb->length(), ptr.c_str()+length, &res_len)) +// assert(0); +// ldout(cct, 20) << __func__ << " pb compressed length=" << pb->length() << " actually got decompressed size is " << res_len << dendl; +// out.append(ptr, length, length+res_len); +// length += res_len; +// pb++; +// } +// ldout(cct, 20) << __func__ << " total decompressed length is " << length << dendl; +//} + +AsyncCompressor::AsyncCompressor(CephContext *c): + compressor(Compressor::create(c->_conf->async_compressor_type)), cct(c), + job_id(0), + compress_tp(g_ceph_context, "AsyncCompressor::compressor_tp", cct->_conf->async_compressor_threads, "async_compressor_threads"), + job_lock("AsyncCompressor::job_lock"), + compress_wq(this, c->_conf->async_compressor_thread_timeout, c->_conf->async_compressor_thread_suicide_timeout, &compress_tp) { + vector<string> corestrs; + get_str_vec(cct->_conf->async_compressor_affinity_cores, corestrs); + for (vector<string>::iterator it = corestrs.begin(); + it != corestrs.end(); ++it) { + string err; + int coreid = strict_strtol(it->c_str(), 10, &err); + if (err == "") + coreids.push_back(coreid); + else + lderr(cct) << __func__ << " failed to parse " << *it << " in " << cct->_conf->async_compressor_affinity_cores << dendl; + } +} + +void AsyncCompressor::init() +{ + ldout(cct, 10) << __func__ << dendl; + compress_tp.start(); +} + +void AsyncCompressor::terminate() +{ + ldout(cct, 10) << __func__ << dendl; + compress_tp.stop(); +} + +uint64_t AsyncCompressor::async_compress(bufferlist &data) +{ + uint64_t id = job_id.inc(); + pair<unordered_map<uint64_t, Job>::iterator, bool> it; + { + Mutex::Locker l(job_lock); + it = jobs.insert(make_pair(id, Job(id, true))); + it.first->second.data = data; + } + compress_wq.queue(&it.first->second); + ldout(cct, 10) << __func__ << " insert async compress job id=" << id << dendl; + return id; +} + +uint64_t AsyncCompressor::async_decompress(bufferlist &data) +{ + uint64_t id = job_id.inc(); + pair<unordered_map<uint64_t, Job>::iterator, bool> it; + { + Mutex::Locker l(job_lock); + it = jobs.insert(make_pair(id, Job(id, false))); + it.first->second.data = data; + } + compress_wq.queue(&it.first->second); + ldout(cct, 10) << __func__ << " insert async decompress job id=" << id << dendl; + return id; +} + +int AsyncCompressor::get_compress_data(uint64_t compress_id, bufferlist &data, bool blocking, bool *finished) +{ + assert(finished); + Mutex::Locker l(job_lock); + unordered_map<uint64_t, Job>::iterator it = jobs.find(compress_id); + if (it == jobs.end() || !it->second.is_compress) { + ldout(cct, 10) << __func__ << " missing to get compress job id=" << compress_id << dendl; + return -ENOENT; + } + + retry: + if (it->second.status.read() == DONE) { + ldout(cct, 20) << __func__ << " successfully getting compressed data, job id=" << compress_id << dendl; + *finished = true; + data.swap(it->second.data); + jobs.erase(it); + } else if (blocking) { + if (it->second.status.cas(WAIT, DONE)) { + ldout(cct, 10) << __func__ << " compress job id=" << compress_id << " hasn't finished, abort!"<< dendl; + compressor->compress(it->second.data, data); + *finished = true; + } else { + job_lock.Unlock(); + usleep(1000); + job_lock.Lock(); + goto retry; + } + } else { + ldout(cct, 10) << __func__ << " compress job id=" << compress_id << " hasn't finished."<< dendl; + *finished = false; + } + return 0; +} + +int AsyncCompressor::get_decompress_data(uint64_t decompress_id, bufferlist &data, bool blocking, bool *finished) +{ + assert(finished); + Mutex::Locker l(job_lock); + unordered_map<uint64_t, Job>::iterator it = jobs.find(decompress_id); + if (it == jobs.end() || it->second.is_compress) { + ldout(cct, 10) << __func__ << " missing to get decompress job id=" << decompress_id << dendl; + return -ENOENT; + } + + retry: + if (it->second.status.read() == DONE) { + ldout(cct, 20) << __func__ << " successfully getting decompressed data, job id=" << decompress_id << dendl; + *finished = true; + data.swap(it->second.data); + jobs.erase(it); + } else if (blocking) { + if (it->second.status.cas(WAIT, DONE)) { + ldout(cct, 10) << __func__ << " decompress job id=" << decompress_id << " hasn't started, abort!"<< dendl; + compressor->decompress(it->second.data, data); + *finished = true; + } else { + job_lock.Unlock(); + usleep(1000); + job_lock.Lock(); + goto retry; + } + } else { + ldout(cct, 10) << __func__ << " decompress job id=" << decompress_id << " hasn't finished."<< dendl; + *finished = false; + } + return 0; +} diff --git a/src/compressor/AsyncCompressor.h b/src/compressor/AsyncCompressor.h new file mode 100644 index 00000000000..d34cbe6bf09 --- /dev/null +++ b/src/compressor/AsyncCompressor.h @@ -0,0 +1,126 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_ASYNCCOMPRESSOR_H +#define CEPH_ASYNCCOMPRESSOR_H + +#include <deque> + +#include "include/atomic.h" +#include "include/str_list.h" +#include "Compressor.h" +#include "common/WorkQueue.h" + +class AsyncCompressor; + +class AsyncCompressor { + private: + Compressor *compressor; + CephContext *cct; + atomic_t job_id; + vector<int> coreids; + ThreadPool compress_tp; + + enum { + WAIT, + WORKING, + DONE + } status; + struct Job { + uint64_t id; + atomic_t status; + bool is_compress; + bufferlist data; + Job(uint64_t i, bool compress): id(i), status(WAIT), is_compress(compress) {} + Job(const Job &j): id(j.id), status(j.status.read()), is_compress(j.is_compress), data(j.data) {} + }; + Mutex job_lock; + // only when job.status == DONE && with job_lock holding, we can insert/erase element in jobs + // only when job.status == WAIT && with pool_lock holding, you can change its status and modify element's info later + unordered_map<uint64_t, Job> jobs; + + struct CompressWQ : public ThreadPool::WorkQueue<Job> { + typedef AsyncCompressor::Job Job; + AsyncCompressor *async_compressor; + deque<Job*> job_queue; + + CompressWQ(AsyncCompressor *ac, time_t timeout, time_t suicide_timeout, ThreadPool *tp) + : ThreadPool::WorkQueue<Job>("AsyncCompressor::CompressWQ", timeout, suicide_timeout, tp), async_compressor(ac) {} + + bool _enqueue(Job *item) { + job_queue.push_back(item); + return true; + } + void _dequeue(Job *item) { + assert(0); + } + bool _empty() { + return job_queue.empty(); + } + Job* _dequeue() { + if (job_queue.empty()) + return NULL; + Job *item = NULL; + while (!job_queue.empty()) { + item = job_queue.front(); + job_queue.pop_front(); + if (item->status.cas(WAIT, WORKING)) { + break; + } else { + Mutex::Locker (async_compressor->job_lock); + assert(item->status.read() == DONE); + async_compressor->jobs.erase(item->id); + item = NULL; + } + } + return item; + } + void _process(Job *item, ThreadPool::TPHandle &handle) { + assert(item->status.read() == WORKING); + bufferlist out; + if (item->is_compress) + async_compressor->compressor->compress(item->data, out); + else + async_compressor->compressor->decompress(item->data, out); + item->data.swap(out); + } + void _process_finish(Job *item) { + assert(item->status.read() == WORKING); + item->status.set(DONE); + } + void _clear() {} + } compress_wq; + friend class CompressWQ; + void _compress(bufferlist &in, bufferlist &out); + void _decompress(bufferlist &in, bufferlist &out); + + public: + AsyncCompressor(CephContext *c); + virtual ~AsyncCompressor() {} + + int get_cpuid(int id) { + if (coreids.empty()) + return -1; + return coreids[id % coreids.size()]; + } + + void init(); + void terminate(); + uint64_t async_compress(bufferlist &data); + uint64_t async_decompress(bufferlist &data); + int get_compress_data(uint64_t compress_id, bufferlist &data, bool blocking, bool *finished); + int get_decompress_data(uint64_t decompress_id, bufferlist &data, bool blocking, bool *finished); +}; + +#endif diff --git a/src/compressor/Compressor.cc b/src/compressor/Compressor.cc new file mode 100644 index 00000000000..0d11e748d41 --- /dev/null +++ b/src/compressor/Compressor.cc @@ -0,0 +1,25 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "Compressor.h" +#include "SnappyCompressor.h" + + +Compressor* Compressor::create(const string &type) +{ + if (type == "snappy") + return new SnappyCompressor(); + + assert(0); +} diff --git a/src/compressor/Compressor.h b/src/compressor/Compressor.h new file mode 100644 index 00000000000..3eb71aa5c45 --- /dev/null +++ b/src/compressor/Compressor.h @@ -0,0 +1,30 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_COMPRESSOR_H +#define CEPH_COMPRESSOR_H + +#include "include/int_types.h" +#include "include/Context.h" + +class Compressor { + public: + virtual ~Compressor() {} + virtual int compress(bufferlist &in, bufferlist &out) = 0; + virtual int decompress(bufferlist &in, bufferlist &out) = 0; + + static Compressor *create(const string &type); +}; + +#endif diff --git a/src/compressor/Makefile.am b/src/compressor/Makefile.am new file mode 100644 index 00000000000..bd2a2d7d174 --- /dev/null +++ b/src/compressor/Makefile.am @@ -0,0 +1,11 @@ +libcompressor_la_SOURCES = \ + compressor/Compressor.cc \ + compressor/AsyncCompressor.cc +noinst_LTLIBRARIES += libcompressor.la + +libcompressor_la_LIBADD = $(LIBCOMMON) + +noinst_HEADERS += \ + compressor/Compressor.h \ + compressor/AsyncCompressor.h \ + compressor/SnappyCompressor.h diff --git a/src/compressor/SnappyCompressor.h b/src/compressor/SnappyCompressor.h new file mode 100644 index 00000000000..8dc3497c45f --- /dev/null +++ b/src/compressor/SnappyCompressor.h @@ -0,0 +1,79 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_SNAPPYCOMPRESSOR_H +#define CEPH_SNAPPYCOMPRESSOR_H + +#include <snappy.h> +#include <snappy-sinksource.h> +#include "include/buffer.h" +#include "Compressor.h" + +class BufferlistSource : public snappy::Source { + list<bufferptr>::const_iterator pb; + size_t pb_off; + size_t left; + + public: + BufferlistSource(bufferlist &data): pb(data.buffers().begin()), pb_off(0), left(data.length()) {} + virtual ~BufferlistSource() {} + virtual size_t Available() const { return left; } + virtual const char* Peek(size_t* len) { + if (left) { + *len = pb->length() - pb_off; + return pb->c_str() + pb_off; + } else { + *len = 0; + return NULL; + } + } + virtual void Skip(size_t n) { + if (n + pb_off == pb->length()) { + pb++; + pb_off = 0; + } else { + pb_off += n; + } + left -= n; + } +}; + +class SnappyCompressor : public Compressor { + public: + virtual ~SnappyCompressor() {} + virtual int compress(bufferlist &src, bufferlist &dst) { + BufferlistSource source(src); + bufferptr ptr(snappy::MaxCompressedLength(src.length())); + snappy::UncheckedByteArraySink sink(ptr.c_str()); + snappy::Compress(&source, &sink); + dst.append(ptr, 0, sink.CurrentDestination()-ptr.c_str()); + return 0; + } + virtual int decompress(bufferlist &src, bufferlist &dst) { + BufferlistSource source(src); + size_t res_len = 0; + // Trick, decompress only need first 32bits buffer + list<bufferptr>::const_iterator pb = src.buffers().begin(); + if (!snappy::GetUncompressedLength(src.get_contiguous(0, 8), 8, &res_len)) + return -1; + bufferptr ptr(res_len); + if (snappy::RawUncompress(&source, ptr.c_str())) { + dst.append(ptr); + return 0; + } + return -1; + } +}; + +#endif |