~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/user_locks/barrier.h

  • Committer: Monty Taylor
  • Date: 2011-02-13 17:26:39 UTC
  • mfrom: (2157.2.2 give-in-to-pkg-config)
  • mto: This revision was merged to the branch mainline in revision 2166.
  • Revision ID: mordred@inaugust.com-20110213172639-nhy7i72sfhoq13ms
Merged in pkg-config fixes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
 
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 
3
 *
 
4
 * Copyright (C) 2010 Brian Aker
 
5
 * All rights reserved.
 
6
 *
 
7
 * Redistribution and use in source and binary forms, with or without
 
8
 * modification, are permitted provided that the following conditions are met:
 
9
 *     * Redistributions of source code must retain the above copyright
 
10
 *       notice, this list of conditions and the following disclaimer.
 
11
 *     * Redistributions in binary form must reproduce the above copyright
 
12
 *       notice, this list of conditions and the following disclaimer in the
 
13
 *       documentation and/or other materials provided with the distribution.
 
14
 *     * Neither the name of the <organization> nor the
 
15
 *       names of its contributors may be used to endorse or promote products
 
16
 *       derived from this software without specific prior written permission.
 
17
 *
 
18
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
 
19
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 
20
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 
21
 * DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
 
22
 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 
23
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 
24
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 
25
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
26
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 
27
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
28
 *
 
29
 */
 
30
 
 
31
#include <boost/thread/mutex.hpp>
 
32
#include <boost/thread/condition_variable.hpp>
 
33
#include <boost/shared_ptr.hpp>
 
34
#include <boost/foreach.hpp>
 
35
 
 
36
#include "observer.h"
 
37
 
 
38
#ifndef PLUGIN_USER_LOCKS_BARRIER_H
 
39
#define PLUGIN_USER_LOCKS_BARRIER_H
 
40
 
 
41
/*
 
42
  Barrier was designed with the following concepts.
 
43
 
 
44
  1) A barrier can be set with an initial limit which can be used such that if the limit is met, it releases all waiters.
 
45
  2) A barrier can be released at any time, even if the limit is not met by an outside caller.
 
46
  3) An observer can register itself to the barrier, it will wait until some predicate action releases it.
 
47
  4) Observers are always released by limit, or in the case where the barrier is released or destroyed.
 
48
  5) Observers should be held by copy, not by reference in order to allow for correct deletion.
 
49
 
 
50
  @todo while we do pass an owner type to a barrier currently, we make no usage of it, and instead we currently protect
 
51
  poor use, namely the owner of a barrier calling wait() via the layer above. It may be a good idea to change this.
 
52
*/
 
53
 
 
54
namespace user_locks {
 
55
namespace barriers {
 
56
 
 
57
// Barrier starts in a blocking posistion
 
58
class Barrier {
 
59
public:
 
60
  typedef boost::shared_ptr<Barrier> shared_ptr;
 
61
 
 
62
  Barrier(drizzled::session_id_t owner_arg) :
 
63
    owner(owner_arg),
 
64
    limit(0),
 
65
    current_wait(0),
 
66
    generation(0)
 
67
  { }
 
68
 
 
69
  Barrier(drizzled::session_id_t owner_arg, int64_t limit_arg) :
 
70
    owner(owner_arg),
 
71
    limit(limit_arg),
 
72
    current_wait(limit),
 
73
    generation(0)
 
74
  {
 
75
  }
 
76
 
 
77
  ~Barrier()
 
78
  {
 
79
    wakeAll();
 
80
  }
 
81
 
 
82
  // Signal all of the observers to start
 
83
  void signal()
 
84
  {
 
85
    boost::mutex::scoped_lock scopedBarrier(sleeper_mutex);
 
86
    wakeAll();
 
87
  }
 
88
 
 
89
  drizzled::session_id_t getOwner() const
 
90
  {
 
91
    return owner;
 
92
  }
 
93
 
 
94
  void wait()
 
95
  {
 
96
    boost::mutex::scoped_lock scopedLock(sleeper_mutex);
 
97
    int64_t my_generation= generation;
 
98
 
 
99
    --current_wait;
 
100
    if (limit)
 
101
    {
 
102
      if (not current_wait)
 
103
      {
 
104
        wakeAll();
 
105
 
 
106
        return;
 
107
      }
 
108
 
 
109
    }
 
110
    checkObservers();
 
111
 
 
112
    // If we are interrupted we remove ourself from the list, and check on
 
113
    // the observers.
 
114
    try 
 
115
    {
 
116
      while (my_generation == generation)
 
117
      {
 
118
        sleep_threshhold.wait(sleeper_mutex);
 
119
      }
 
120
    }
 
121
    catch(boost::thread_interrupted const& error)
 
122
    {
 
123
      current_wait++;
 
124
      checkObservers();
 
125
    }
 
126
  }
 
127
 
 
128
  // A call to either signal or a release will cause wait_for() to continue
 
129
  void wait_until(int64_t wait_until_arg)
 
130
  {
 
131
    Observer::shared_ptr observer;
 
132
    {
 
133
      boost::mutex::scoped_lock scopedLock(sleeper_mutex);
 
134
 
 
135
      if (wait_until_arg <= count())
 
136
        return;
 
137
 
 
138
      observer.reset(new Observer(wait_until_arg));
 
139
      observers.push_back(observer);
 
140
    }
 
141
 
 
142
    try {
 
143
      observer->sleep();
 
144
    }
 
145
    catch(boost::thread_interrupted const& error)
 
146
    {
 
147
      boost::mutex::scoped_lock scopedLock(sleeper_mutex);
 
148
      // Someone has interrupted us, we now try to remove ourself from the
 
149
      // observer chain ourself
 
150
 
 
151
      observers.remove(observer);
 
152
      
 
153
      throw error;
 
154
    }
 
155
  }
 
156
 
 
157
  void wait(int64_t generation_arg)
 
158
  {
 
159
    boost::mutex::scoped_lock scopedLock(sleeper_mutex);
 
160
    int64_t my_generation= generation;
 
161
 
 
162
    // If the generation is newer  then we just return immediatly
 
163
    if (my_generation > generation_arg)
 
164
      return;
 
165
 
 
166
    --current_wait;
 
167
 
 
168
    if (limit)
 
169
    {
 
170
      if (not current_wait)
 
171
      {
 
172
        wakeAll();
 
173
        return;
 
174
      }
 
175
 
 
176
    }
 
177
 
 
178
    while (my_generation == generation)
 
179
    {
 
180
      sleep_threshhold.wait(sleeper_mutex);
 
181
    }
 
182
  }
 
183
 
 
184
  int64_t getGeneration()
 
185
  {
 
186
    boost::mutex::scoped_lock scopedLock(sleeper_mutex);
 
187
    return generation;
 
188
  }
 
189
 
 
190
  int64_t sizeObservers()
 
191
  {
 
192
    boost::mutex::scoped_lock scopedLock(sleeper_mutex);
 
193
    return static_cast<int64_t>(observers.size());
 
194
  }
 
195
 
 
196
  int64_t sizeWaiters()
 
197
  {
 
198
    boost::mutex::scoped_lock scopedLock(sleeper_mutex);
 
199
    return count();
 
200
  }
 
201
 
 
202
  int64_t getLimit() const
 
203
  {
 
204
    return limit;
 
205
  }
 
206
 
 
207
private:
 
208
  void wakeAll()
 
209
  {
 
210
    generation++;
 
211
    current_wait= limit;
 
212
    sleep_threshhold.notify_all();
 
213
 
 
214
    checkObservers();
 
215
  }
 
216
 
 
217
  struct isReady : public std::unary_function<Observer::list::const_reference, bool>
 
218
  {
 
219
    const int64_t count;
 
220
 
 
221
    isReady(int64_t arg) :
 
222
      count(arg)
 
223
    { }
 
224
 
 
225
    result_type operator() (argument_type observer)
 
226
    {
 
227
      if (observer->getLimit() <= count or count == 0)
 
228
      {
 
229
        observer->wake();
 
230
        return true;
 
231
      }
 
232
 
 
233
      return false;
 
234
    }
 
235
  };
 
236
 
 
237
  void checkObservers()
 
238
  {
 
239
    observers.remove_if(isReady(count()));
 
240
  }
 
241
 
 
242
  int64_t count() const
 
243
  {
 
244
    if (limit)
 
245
    {
 
246
      return limit - current_wait;
 
247
    }
 
248
    return std::abs(static_cast<long int>(current_wait));
 
249
  }
 
250
 
 
251
 
 
252
  drizzled::session_id_t owner;
 
253
 
 
254
  const int64_t limit;
 
255
  int64_t current_wait;
 
256
  int64_t generation;
 
257
 
 
258
  Observer::list observers;
 
259
 
 
260
  boost::mutex sleeper_mutex;
 
261
  boost::condition_variable_any sleep_threshhold;
 
262
 
 
263
};
 
264
 
 
265
} // namespace barriers
 
266
} // namespace user_locks
 
267
 
 
268
#endif /* PLUGIN_USER_LOCKS_BARRIER_H */