summaryrefslogtreecommitdiffstats
path: root/src/journal/JournalRecorder.h
blob: a16339faddf73d34a6265bc42f4436d825cd325f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#ifndef CEPH_JOURNAL_JOURNAL_RECORDER_H
#define CEPH_JOURNAL_JOURNAL_RECORDER_H

#include "include/int_types.h"
#include "include/Context.h"
#include "include/rados/librados.hpp"
#include "common/Mutex.h"
#include "journal/Future.h"
#include "journal/FutureImpl.h"
#include "journal/JournalMetadata.h"
#include "journal/ObjectRecorder.h"
#include <map>
#include <string>

class SafeTimer;

namespace journal {

class JournalRecorder {
public:
  JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
                  const JournalMetadataPtr &journal_metadata,
                  uint32_t flush_interval, uint64_t flush_bytes,
                  double flush_age);
  ~JournalRecorder();

  Future append(uint64_t tag_tid, const bufferlist &bl);
  void flush(Context *on_safe);

  ObjectRecorderPtr get_object(uint8_t splay_offset);

private:
  typedef std::map<uint8_t, ObjectRecorderPtr> ObjectRecorderPtrs;

  struct Listener : public JournalMetadataListener {
    JournalRecorder *journal_recorder;

    Listener(JournalRecorder *_journal_recorder)
      : journal_recorder(_journal_recorder) {}

    void handle_update(JournalMetadata *) override {
      journal_recorder->handle_update();
    }
  };

  struct ObjectHandler : public ObjectRecorder::Handler {
    JournalRecorder *journal_recorder;

    ObjectHandler(JournalRecorder *_journal_recorder)
      : journal_recorder(_journal_recorder) {
    }

    void closed(ObjectRecorder *object_recorder) override {
      journal_recorder->handle_closed(object_recorder);
    }
    void overflow(ObjectRecorder *object_recorder) override {
      journal_recorder->handle_overflow(object_recorder);
    }
  };

  struct C_AdvanceObjectSet : public Context {
    JournalRecorder *journal_recorder;

    C_AdvanceObjectSet(JournalRecorder *_journal_recorder)
      : journal_recorder(_journal_recorder) {
    }
    void finish(int r) override {
      journal_recorder->handle_advance_object_set(r);
    }
  };

  librados::IoCtx m_ioctx;
  CephContext *m_cct;
  std::string m_object_oid_prefix;

  JournalMetadataPtr m_journal_metadata;

  uint32_t m_flush_interval;
  uint64_t m_flush_bytes;
  double m_flush_age;

  Listener m_listener;
  ObjectHandler m_object_handler;

  Mutex m_lock;

  uint32_t m_in_flight_advance_sets = 0;
  uint32_t m_in_flight_object_closes = 0;
  uint64_t m_current_set;
  ObjectRecorderPtrs m_object_ptrs;
  std::vector<std::shared_ptr<Mutex>> m_object_locks;

  FutureImplPtr m_prev_future;

  void open_object_set();
  bool close_object_set(uint64_t active_set);

  void advance_object_set();
  void handle_advance_object_set(int r);

  void close_and_advance_object_set(uint64_t object_set);

  ObjectRecorderPtr create_object_recorder(uint64_t object_number,
                                           std::shared_ptr<Mutex> lock);
  void create_next_object_recorder_unlock(ObjectRecorderPtr object_recorder);

  void handle_update();

  void handle_closed(ObjectRecorder *object_recorder);
  void handle_overflow(ObjectRecorder *object_recorder);

  void lock_object_recorders() {
    for (auto& lock : m_object_locks) {
      lock->Lock();
    }
  }

  void unlock_object_recorders() {
    for (auto& lock : m_object_locks) {
      lock->Unlock();
    }
  }
};

} // namespace journal

#endif // CEPH_JOURNAL_JOURNAL_RECORDER_H