summaryrefslogtreecommitdiffstats
path: root/src/test/test_workqueue.cc
blob: 5c2fc459da2ea7876a2d441192c1f1dc71c03c96 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#include "gtest/gtest.h"

#include "common/WorkQueue.h"

#include <iostream> // for std::cout

#include "common/ceph_argparse.h"

using namespace std;

TEST(WorkQueue, StartStop)
{
  ThreadPool tp(g_ceph_context, "foo", "tp_foo", 10, "");
  
  tp.start();
  tp.pause();
  tp.pause_new();
  tp.unpause();
  tp.unpause();
  tp.drain();
  tp.stop();
}

TEST(WorkQueue, Resize)
{
  ThreadPool tp(g_ceph_context, "bar", "tp_bar", 2, "filestore_op_threads");
  
  tp.start();

  sleep(1);
  ASSERT_EQ(2, tp.get_num_threads());

  g_conf().set_val("filestore op threads", "5");
  g_conf().apply_changes(&cout);
  sleep(1);
  ASSERT_EQ(5, tp.get_num_threads());

  g_conf().set_val("filestore op threads", "3");
  g_conf().apply_changes(&cout);
  sleep(1);
  ASSERT_EQ(3, tp.get_num_threads());

  g_conf().set_val("filestore op threads", "0");
  g_conf().apply_changes(&cout);
  sleep(1);
  ASSERT_EQ(0, tp.get_num_threads());

  g_conf().set_val("filestore op threads", "15");
  g_conf().apply_changes(&cout);
  sleep(1);
  ASSERT_EQ(15, tp.get_num_threads());

  g_conf().set_val("filestore op threads", "-1");
  g_conf().apply_changes(&cout);
  sleep(1);
  ASSERT_EQ(15, tp.get_num_threads());

  sleep(1);
  tp.stop();
}

class twq : public ThreadPool::WorkQueue<int> {
public:
    twq(time_t timeout, time_t suicide_timeout, ThreadPool *tp)
        : ThreadPool::WorkQueue<int>("test_wq", ceph::make_timespan(timeout), ceph::make_timespan(suicide_timeout), tp) {}

    bool _enqueue(int* item) override {
        return true;
    }
    void _dequeue(int* item) override {
        ceph_abort();
    }
    bool _empty() override {
        return true;
    }
    int *_dequeue() override {
        return nullptr;
    }
    void _process(int *osr, ThreadPool::TPHandle &handle) override {
    }
    void _process_finish(int *osr) override {
    }
    void _clear() override {
    }
};

TEST(WorkQueue, change_timeout){
    ThreadPool tp(g_ceph_context, "bar", "tp_bar", 2, "filestore_op_threads");
    tp.start();
    twq wq(2, 20, &tp);
    // check timeout and suicide
    ASSERT_EQ(ceph::make_timespan(2), wq.timeout_interval.load());
    ASSERT_EQ(ceph::make_timespan(20), wq.suicide_interval.load());

    // change the timeout and suicide and then check them
    wq.set_timeout(4);
    wq.set_suicide_timeout(40);
    ASSERT_EQ(ceph::make_timespan(4), wq.timeout_interval.load());
    ASSERT_EQ(ceph::make_timespan(40), wq.suicide_interval.load());
    tp.stop();
}