summaryrefslogtreecommitdiffstats
path: root/src/journal/ObjectRecorder.h
blob: fd4e88ffafca3a35a69292ebea9a43574b462d4b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#ifndef CEPH_JOURNAL_OBJECT_RECORDER_H
#define CEPH_JOURNAL_OBJECT_RECORDER_H

#include "include/Context.h"
#include "include/rados/librados.hpp"
#include "common/Cond.h"
#include "common/Mutex.h"
#include "common/RefCountedObj.h"
#include "common/WorkQueue.h"
#include "journal/FutureImpl.h"
#include <list>
#include <map>
#include <set>
#include <boost/intrusive_ptr.hpp>
#include <boost/noncopyable.hpp>
#include "include/assert.h"

class SafeTimer;

namespace journal {

class ObjectRecorder;
typedef boost::intrusive_ptr<ObjectRecorder> ObjectRecorderPtr;

typedef std::pair<FutureImplPtr, bufferlist> AppendBuffer;
typedef std::list<AppendBuffer> AppendBuffers;

class ObjectRecorder : public RefCountedObject, boost::noncopyable {
public:
  struct Handler {
    virtual ~Handler() {
    }
    virtual void closed(ObjectRecorder *object_recorder) = 0;
    virtual void overflow(ObjectRecorder *object_recorder) = 0;
  };

  ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
                 uint64_t object_number, std::shared_ptr<Mutex> lock,
                 ContextWQ *work_queue, SafeTimer &timer, Mutex &timer_lock,
                 Handler *handler, uint8_t order, uint32_t flush_interval,
                 uint64_t flush_bytes, double flush_age);
  ~ObjectRecorder() override;

  inline uint64_t get_object_number() const {
    return m_object_number;
  }
  inline const std::string &get_oid() const {
    return m_oid;
  }

  bool append_unlock(AppendBuffers &&append_buffers);
  void flush(Context *on_safe);
  void flush(const FutureImplPtr &future);

  void claim_append_buffers(AppendBuffers *append_buffers);

  bool is_closed() const {
    ceph_assert(m_lock->is_locked());
    return (m_object_closed && m_in_flight_appends.empty());
  }
  bool close();

  inline CephContext *cct() const {
    return m_cct;
  }

  inline size_t get_pending_appends() const {
    Mutex::Locker locker(*m_lock);
    return m_append_buffers.size();
  }

private:
  typedef std::set<uint64_t> InFlightTids;
  typedef std::map<uint64_t, AppendBuffers> InFlightAppends;

  struct FlushHandler : public FutureImpl::FlushHandler {
    ObjectRecorder *object_recorder;
    FlushHandler(ObjectRecorder *o) : object_recorder(o) {}
    void get() override {
      object_recorder->get();
    }
    void put() override {
      object_recorder->put();
    }
    void flush(const FutureImplPtr &future) override {
      Mutex::Locker locker(*(object_recorder->m_lock));
      object_recorder->flush(future);
    }
  };
  struct C_AppendFlush : public Context {
    ObjectRecorder *object_recorder;
    uint64_t tid;
    C_AppendFlush(ObjectRecorder *o, uint64_t _tid)
        : object_recorder(o), tid(_tid) {
      object_recorder->get();
    }
    void finish(int r) override {
      object_recorder->handle_append_flushed(tid, r);
      object_recorder->put();
    }
  };

  librados::IoCtx m_ioctx;
  std::string m_oid;
  uint64_t m_object_number;
  CephContext *m_cct;

  ContextWQ *m_op_work_queue;

  SafeTimer &m_timer;
  Mutex &m_timer_lock;

  Handler *m_handler;

  uint8_t m_order;
  uint64_t m_soft_max_size;

  uint32_t m_flush_interval;
  uint64_t m_flush_bytes;
  double m_flush_age;

  FlushHandler m_flush_handler;

  Context *m_append_task = nullptr;

  mutable std::shared_ptr<Mutex> m_lock;
  AppendBuffers m_append_buffers;
  uint64_t m_append_tid;
  uint32_t m_pending_bytes;

  InFlightTids m_in_flight_tids;
  InFlightAppends m_in_flight_appends;
  uint64_t m_size;
  bool m_overflowed;
  bool m_object_closed;

  bufferlist m_prefetch_bl;

  bool m_in_flight_flushes;
  Cond m_in_flight_flushes_cond;

  AppendBuffers m_pending_buffers;
  uint64_t m_aio_sent_size = 0;
  bool m_aio_scheduled;

  void handle_append_task();
  void cancel_append_task();
  void schedule_append_task();

  bool append(const AppendBuffer &append_buffer, bool *schedule_append);
  bool flush_appends(bool force);
  void handle_append_flushed(uint64_t tid, int r);
  void append_overflowed();
  void send_appends(AppendBuffers *append_buffers);
  void send_appends_aio();

  void notify_handler_unlock();
};

} // namespace journal

#endif // CEPH_JOURNAL_OBJECT_RECORDER_H