summaryrefslogtreecommitdiffstats
path: root/src/msg/Messenger.h
blob: 1ced728805f54fb7557538cb2f8259f0a1069dd2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
// vim: ts=8 sw=2 smarttab
/*
 * Ceph - scalable distributed file system
 *
 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
 *
 * This is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License version 2.1, as published by the Free Software 
 * Foundation.  See file COPYING.
 * 
 */



#ifndef CEPH_MESSENGER_H
#define CEPH_MESSENGER_H

#include <map>
using namespace std;

#include "Message.h"
#include "Dispatcher.h"
#include "common/Mutex.h"
#include "common/Cond.h"
#include "include/Context.h"
#include "include/types.h"


class MDS;
class Timer;


class Messenger {
private:
  list<Dispatcher*> dispatchers;

protected:
  entity_name_t _my_name;
  int default_send_priority;

  atomic_t nref;

 public:
  Messenger(entity_name_t w) : default_send_priority(CEPH_MSG_PRIO_DEFAULT),
			       nref(1) {
    _my_name = w;
  }

  void get() {
    nref.inc();
  }
  void put() {
    if (nref.dec() == 0)
      delete this;
  }
  virtual void destroy() {
    put();
  }

  // accessors
  entity_name_t get_myname() { return _my_name; }
  virtual entity_addr_t get_myaddr() = 0;
  entity_inst_t get_myinst() { return entity_inst_t(get_myname(), get_myaddr()); }
  
  void set_myname(const entity_name_t m) { _my_name = m; }

  void set_default_send_priority(int p) { default_send_priority = p; }
  int get_default_send_priority() { return default_send_priority; }
  
  // hrmpf.
  virtual int get_dispatch_queue_len() { return 0; };

  // setup
  void add_dispatcher_head(Dispatcher *d) { 
    bool first = dispatchers.empty();
    dispatchers.push_front(d);
    if (first)
      ready();
  }
  void add_dispatcher_tail(Dispatcher *d) { 
    bool first = dispatchers.empty();
    dispatchers.push_back(d);
    if (first)
      ready();
  }

  virtual void ready() { }
  bool is_ready() { return !dispatchers.empty(); }

  // dispatch incoming messages
  void ms_deliver_dispatch(Message *m) {
    for (list<Dispatcher*>::iterator p = dispatchers.begin();
	 p != dispatchers.end();
	 p++)
      if ((*p)->ms_dispatch(m))
	return;
    generic_dout(0) << "unhandled message " << m << " " << *m
		    << " from " << m->get_source_inst()
		    << dendl;
    assert(0);
  }
  void ms_deliver_handle_connect(Connection *con) {
    for (list<Dispatcher*>::iterator p = dispatchers.begin();
	 p != dispatchers.end();
	 p++)
      (*p)->ms_handle_connect(con);
  }
  void ms_deliver_handle_reset(Connection *con) {
    for (list<Dispatcher*>::iterator p = dispatchers.begin();
	 p != dispatchers.end();
	 p++)
      if ((*p)->ms_handle_reset(con))
	return;
  }
  void ms_deliver_handle_remote_reset(Connection *con) {
    for (list<Dispatcher*>::iterator p = dispatchers.begin();
	 p != dispatchers.end();
	 p++)
      (*p)->ms_handle_remote_reset(con);
  }

  AuthAuthorizer *ms_deliver_get_authorizer(int peer_type, bool force_new) {
    AuthAuthorizer *a = 0;
    for (list<Dispatcher*>::iterator p = dispatchers.begin();
	 p != dispatchers.end();
	 p++)
      if ((*p)->ms_get_authorizer(peer_type, &a, force_new))
	return a;
    return NULL;
  }
  bool ms_deliver_verify_authorizer(Connection *con, int peer_type,
				    int protocol, bufferlist& authorizer, bufferlist& authorizer_reply,
				    bool& isvalid) {
    for (list<Dispatcher*>::iterator p = dispatchers.begin();
	 p != dispatchers.end();
	 p++)
      if ((*p)->ms_verify_authorizer(con, peer_type, protocol, authorizer, authorizer_reply, isvalid))
	return true;
    return false;
  }

  // shutdown
  virtual int shutdown() = 0;
  virtual void suicide() = 0;

  // send message
  virtual void prepare_dest(const entity_inst_t& inst) {}
  virtual int send_message(Message *m, const entity_inst_t& dest) = 0;
  virtual int send_message(Message *m, Connection *con) = 0;
  virtual int lazy_send_message(Message *m, const entity_inst_t& dest) {
    return send_message(m, dest);
  }
  virtual int lazy_send_message(Message *m, Connection *con) = 0;
  virtual int send_keepalive(const entity_inst_t& dest) = 0;

  virtual void mark_down(const entity_addr_t& a) = 0;

protected:
  //destruction should be handled via destroy()
  virtual ~Messenger() {
    assert(nref.read() == 0);
  }
};





#endif