1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
* Copyright (C) 2010 Brian Aker
7
* Redistribution and use in source and binary forms, with or without
8
* modification, are permitted provided that the following conditions are met:
9
* * Redistributions of source code must retain the above copyright
10
* notice, this list of conditions and the following disclaimer.
11
* * Redistributions in binary form must reproduce the above copyright
12
* notice, this list of conditions and the following disclaimer in the
13
* documentation and/or other materials provided with the distribution.
14
* * Neither the name of the <organization> nor the
15
* names of its contributors may be used to endorse or promote products
16
* derived from this software without specific prior written permission.
18
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
19
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21
* DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
22
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
24
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
25
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
27
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31
#include <boost/thread/mutex.hpp>
32
#include <boost/thread/condition_variable.hpp>
33
#include <boost/shared_ptr.hpp>
34
#include <boost/foreach.hpp>
38
#ifndef PLUGIN_USER_LOCKS_BARRIER_H
39
#define PLUGIN_USER_LOCKS_BARRIER_H
42
Barrier was designed with the following concepts.
44
1) A barrier can be set with an initial limit which can be used such that if the limit is met, it releases all waiters.
45
2) A barrier can be released at any time, even if the limit is not met by an outside caller.
46
3) An observer can register itself to the barrier, it will wait until some predicate action releases it.
47
4) Observers are always released by limit, or in the case where the barrier is released or destroyed.
48
5) Observers should be held by copy, not by reference in order to allow for correct deletion.
50
@todo while we do pass an owner type to a barrier currently, we make no usage of it, and instead we currently protect
51
poor use, namely the owner of a barrier calling wait() via the layer above. It may be a good idea to change this.
54
namespace user_locks {
57
// Barrier starts in a blocking posistion
60
typedef boost::shared_ptr<Barrier> shared_ptr;
62
Barrier(drizzled::session_id_t owner_arg) :
69
Barrier(drizzled::session_id_t owner_arg, int64_t limit_arg) :
82
// Signal all of the observers to start
85
boost::mutex::scoped_lock scopedBarrier(sleeper_mutex);
89
drizzled::session_id_t getOwner() const
96
boost::mutex::scoped_lock scopedLock(sleeper_mutex);
97
int64_t my_generation= generation;
102
if (not current_wait)
112
// If we are interrupted we remove ourself from the list, and check on
116
while (my_generation == generation)
118
sleep_threshhold.wait(sleeper_mutex);
121
catch(boost::thread_interrupted const& error)
128
// A call to either signal or a release will cause wait_for() to continue
129
void wait_until(int64_t wait_until_arg)
131
Observer::shared_ptr observer;
133
boost::mutex::scoped_lock scopedLock(sleeper_mutex);
135
if (wait_until_arg <= count())
138
observer.reset(new Observer(wait_until_arg));
139
observers.push_back(observer);
145
catch(boost::thread_interrupted const& error)
147
boost::mutex::scoped_lock scopedLock(sleeper_mutex);
148
// Someone has interrupted us, we now try to remove ourself from the
149
// observer chain ourself
151
observers.remove(observer);
157
void wait(int64_t generation_arg)
159
boost::mutex::scoped_lock scopedLock(sleeper_mutex);
160
int64_t my_generation= generation;
162
// If the generation is newer then we just return immediatly
163
if (my_generation > generation_arg)
170
if (not current_wait)
178
while (my_generation == generation)
180
sleep_threshhold.wait(sleeper_mutex);
184
int64_t getGeneration()
186
boost::mutex::scoped_lock scopedLock(sleeper_mutex);
190
int64_t sizeObservers()
192
boost::mutex::scoped_lock scopedLock(sleeper_mutex);
193
return static_cast<int64_t>(observers.size());
196
int64_t sizeWaiters()
198
boost::mutex::scoped_lock scopedLock(sleeper_mutex);
202
int64_t getLimit() const
212
sleep_threshhold.notify_all();
217
struct isReady : public std::unary_function<Observer::list::const_reference, bool>
221
isReady(int64_t arg) :
225
result_type operator() (argument_type observer)
227
if (observer->getLimit() <= count or count == 0)
237
void checkObservers()
239
observers.remove_if(isReady(count()));
242
int64_t count() const
246
return limit - current_wait;
248
return std::abs(current_wait);
252
drizzled::session_id_t owner;
255
int64_t current_wait;
258
Observer::list observers;
260
boost::mutex sleeper_mutex;
261
boost::condition_variable_any sleep_threshhold;
265
} // namespace barriers
266
} // namespace user_locks
268
#endif /* PLUGIN_USER_LOCKS_BARRIER_H */