1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#pragma once
#include <seastar/core/future.hh>
#include "include/ceph_assert.h"
#include "include/buffer.h"
#include "include/denc.h"
#include "crimson/os/seastore/async_cleaner.h"
#include "crimson/os/seastore/journal.h"
#include "crimson/os/seastore/segment_manager_group.h"
#include "crimson/os/seastore/ordering_handle.h"
#include "crimson/os/seastore/seastore_types.h"
#include "crimson/osd/exceptions.h"
#include "segment_allocator.h"
#include "crimson/os/seastore/segment_seq_allocator.h"
#include "record_submitter.h"
namespace crimson::os::seastore::journal {
/**
* Manages stream of atomically written records to a SegmentManager.
*/
class SegmentedJournal : public Journal {
public:
SegmentedJournal(
SegmentProvider &segment_provider,
JournalTrimmer &trimmer);
~SegmentedJournal() {}
JournalTrimmer &get_trimmer() final {
return trimmer;
}
writer_stats_t get_writer_stats() const final {
return record_submitter.get_stats();
}
open_for_mkfs_ret open_for_mkfs() final;
open_for_mount_ret open_for_mount() final;
close_ertr::future<> close() final;
submit_record_ertr::future<> submit_record(
record_t &&record,
OrderingHandle &handle,
transaction_type_t t_src,
on_submission_func_t &&on_submission) final;
seastar::future<> flush(OrderingHandle &handle) final;
replay_ret replay(delta_handler_t &&delta_handler) final;
void set_write_pipeline(WritePipeline *_write_pipeline) final {
write_pipeline = _write_pipeline;
}
backend_type_t get_type() final {
return backend_type_t::SEGMENTED;
}
bool is_checksum_needed() final {
// segmented journal always requires checksum
return true;
}
private:
submit_record_ertr::future<> do_submit_record(
record_t &&record,
OrderingHandle &handle,
on_submission_func_t &&on_submission);
SegmentSeqAllocatorRef segment_seq_allocator;
SegmentAllocator journal_segment_allocator;
RecordSubmitter record_submitter;
SegmentManagerGroup &sm_group;
JournalTrimmer &trimmer;
WritePipeline* write_pipeline = nullptr;
/// return ordered vector of segments to replay
using replay_segments_t = std::vector<
std::pair<journal_seq_t, segment_header_t>>;
using prep_replay_segments_fut = replay_ertr::future<
replay_segments_t>;
prep_replay_segments_fut prep_replay_segments(
std::vector<std::pair<segment_id_t, segment_header_t>> segments);
/// scan the last segment for tail deltas
using scan_last_segment_ertr = replay_ertr;
scan_last_segment_ertr::future<> scan_last_segment(
const segment_id_t&, const segment_header_t&);
struct replay_stats_t {
std::size_t num_record_groups = 0;
std::size_t num_records = 0;
std::size_t num_alloc_deltas = 0;
std::size_t num_dirty_deltas = 0;
};
/// replays records starting at start through end of segment
replay_ertr::future<>
replay_segment(
journal_seq_t start, ///< [in] starting addr, seq
segment_header_t header, ///< [in] segment header
delta_handler_t &delta_handler, ///< [in] processes deltas in order
replay_stats_t &stats ///< [out] replay stats
);
};
}
|