~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/user_locks/barrier.h

move ATAN() and ATAN2() function into math_functions plugin. ATAN2() is just another instance of ATAN()

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
 
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
 
 *
4
 
 * Copyright (C) 2010 Brian Aker
5
 
 * All rights reserved.
6
 
 *
7
 
 * Redistribution and use in source and binary forms, with or without
8
 
 * modification, are permitted provided that the following conditions are met:
9
 
 *     * Redistributions of source code must retain the above copyright
10
 
 *       notice, this list of conditions and the following disclaimer.
11
 
 *     * Redistributions in binary form must reproduce the above copyright
12
 
 *       notice, this list of conditions and the following disclaimer in the
13
 
 *       documentation and/or other materials provided with the distribution.
14
 
 *     * Neither the name of the <organization> nor the
15
 
 *       names of its contributors may be used to endorse or promote products
16
 
 *       derived from this software without specific prior written permission.
17
 
 *
18
 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
19
 
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
 
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21
 
 * DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
22
 
 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23
 
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
24
 
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
25
 
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26
 
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
27
 
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
 
 *
29
 
 */
30
 
 
31
 
#include <boost/thread/mutex.hpp>
32
 
#include <boost/thread/condition_variable.hpp>
33
 
#include <boost/shared_ptr.hpp>
34
 
#include <boost/foreach.hpp>
35
 
 
36
 
#include "observer.h"
37
 
 
38
 
#ifndef PLUGIN_USER_LOCKS_BARRIER_H
39
 
#define PLUGIN_USER_LOCKS_BARRIER_H
40
 
 
41
 
/*
42
 
  Barrier was designed with the following concepts.
43
 
 
44
 
  1) A barrier can be set with an initial limit which can be used such that if the limit is met, it releases all waiters.
45
 
  2) A barrier can be released at any time, even if the limit is not met by an outside caller.
46
 
  3) An observer can register itself to the barrier, it will wait until some predicate action releases it.
47
 
  4) Observers are always released by limit, or in the case where the barrier is released or destroyed.
48
 
  5) Observers should be held by copy, not by reference in order to allow for correct deletion.
49
 
 
50
 
  @todo while we do pass an owner type to a barrier currently, we make no usage of it, and instead we currently protect
51
 
  poor use, namely the owner of a barrier calling wait() via the layer above. It may be a good idea to change this.
52
 
*/
53
 
 
54
 
namespace user_locks {
55
 
namespace barriers {
56
 
 
57
 
// Barrier starts in a blocking posistion
58
 
class Barrier {
59
 
public:
60
 
  typedef boost::shared_ptr<Barrier> shared_ptr;
61
 
 
62
 
  Barrier(drizzled::session_id_t owner_arg) :
63
 
    owner(owner_arg),
64
 
    limit(0),
65
 
    current_wait(0),
66
 
    generation(0)
67
 
  { }
68
 
 
69
 
  Barrier(drizzled::session_id_t owner_arg, int64_t limit_arg) :
70
 
    owner(owner_arg),
71
 
    limit(limit_arg),
72
 
    current_wait(limit),
73
 
    generation(0)
74
 
  {
75
 
  }
76
 
 
77
 
  ~Barrier()
78
 
  {
79
 
    wakeAll();
80
 
  }
81
 
 
82
 
  // Signal all of the observers to start
83
 
  void signal()
84
 
  {
85
 
    boost::mutex::scoped_lock scopedBarrier(sleeper_mutex);
86
 
    wakeAll();
87
 
  }
88
 
 
89
 
  drizzled::session_id_t getOwner() const
90
 
  {
91
 
    return owner;
92
 
  }
93
 
 
94
 
  void wait()
95
 
  {
96
 
    boost::mutex::scoped_lock scopedLock(sleeper_mutex);
97
 
    int64_t my_generation= generation;
98
 
 
99
 
    --current_wait;
100
 
    if (limit)
101
 
    {
102
 
      if (not current_wait)
103
 
      {
104
 
        wakeAll();
105
 
 
106
 
        return;
107
 
      }
108
 
 
109
 
    }
110
 
    checkObservers();
111
 
 
112
 
    // If we are interrupted we remove ourself from the list, and check on
113
 
    // the observers.
114
 
    try 
115
 
    {
116
 
      while (my_generation == generation)
117
 
      {
118
 
        sleep_threshhold.wait(sleeper_mutex);
119
 
      }
120
 
    }
121
 
    catch(boost::thread_interrupted const& error)
122
 
    {
123
 
      current_wait++;
124
 
      checkObservers();
125
 
    }
126
 
  }
127
 
 
128
 
  // A call to either signal or a release will cause wait_for() to continue
129
 
  void wait_until(int64_t wait_until_arg)
130
 
  {
131
 
    Observer::shared_ptr observer;
132
 
    {
133
 
      boost::mutex::scoped_lock scopedLock(sleeper_mutex);
134
 
 
135
 
      if (wait_until_arg <= count())
136
 
        return;
137
 
 
138
 
      observer.reset(new Observer(wait_until_arg));
139
 
      observers.push_back(observer);
140
 
    }
141
 
 
142
 
    try {
143
 
      observer->sleep();
144
 
    }
145
 
    catch(boost::thread_interrupted const& error)
146
 
    {
147
 
      boost::mutex::scoped_lock scopedLock(sleeper_mutex);
148
 
      // Someone has interrupted us, we now try to remove ourself from the
149
 
      // observer chain ourself
150
 
 
151
 
      observers.remove(observer);
152
 
      
153
 
      throw error;
154
 
    }
155
 
  }
156
 
 
157
 
  void wait(int64_t generation_arg)
158
 
  {
159
 
    boost::mutex::scoped_lock scopedLock(sleeper_mutex);
160
 
    int64_t my_generation= generation;
161
 
 
162
 
    // If the generation is newer  then we just return immediatly
163
 
    if (my_generation > generation_arg)
164
 
      return;
165
 
 
166
 
    --current_wait;
167
 
 
168
 
    if (limit)
169
 
    {
170
 
      if (not current_wait)
171
 
      {
172
 
        wakeAll();
173
 
        return;
174
 
      }
175
 
 
176
 
    }
177
 
 
178
 
    while (my_generation == generation)
179
 
    {
180
 
      sleep_threshhold.wait(sleeper_mutex);
181
 
    }
182
 
  }
183
 
 
184
 
  int64_t getGeneration()
185
 
  {
186
 
    boost::mutex::scoped_lock scopedLock(sleeper_mutex);
187
 
    return generation;
188
 
  }
189
 
 
190
 
  int64_t sizeObservers()
191
 
  {
192
 
    boost::mutex::scoped_lock scopedLock(sleeper_mutex);
193
 
    return static_cast<int64_t>(observers.size());
194
 
  }
195
 
 
196
 
  int64_t sizeWaiters()
197
 
  {
198
 
    boost::mutex::scoped_lock scopedLock(sleeper_mutex);
199
 
    return count();
200
 
  }
201
 
 
202
 
  int64_t getLimit() const
203
 
  {
204
 
    return limit;
205
 
  }
206
 
 
207
 
private:
208
 
  void wakeAll()
209
 
  {
210
 
    generation++;
211
 
    current_wait= limit;
212
 
    sleep_threshhold.notify_all();
213
 
 
214
 
    checkObservers();
215
 
  }
216
 
 
217
 
  struct isReady : public std::unary_function<Observer::list::const_reference, bool>
218
 
  {
219
 
    const int64_t count;
220
 
 
221
 
    isReady(int64_t arg) :
222
 
      count(arg)
223
 
    { }
224
 
 
225
 
    result_type operator() (argument_type observer)
226
 
    {
227
 
      if (observer->getLimit() <= count or count == 0)
228
 
      {
229
 
        observer->wake();
230
 
        return true;
231
 
      }
232
 
 
233
 
      return false;
234
 
    }
235
 
  };
236
 
 
237
 
  void checkObservers()
238
 
  {
239
 
    observers.remove_if(isReady(count()));
240
 
  }
241
 
 
242
 
  int64_t count() const
243
 
  {
244
 
    if (limit)
245
 
    {
246
 
      return limit - current_wait;
247
 
    }
248
 
    return std::abs(current_wait);
249
 
  }
250
 
 
251
 
 
252
 
  drizzled::session_id_t owner;
253
 
 
254
 
  const int64_t limit;
255
 
  int64_t current_wait;
256
 
  int64_t generation;
257
 
 
258
 
  Observer::list observers;
259
 
 
260
 
  boost::mutex sleeper_mutex;
261
 
  boost::condition_variable_any sleep_threshhold;
262
 
 
263
 
};
264
 
 
265
 
} // namespace barriers
266
 
} // namespace user_locks
267
 
 
268
 
#endif /* PLUGIN_USER_LOCKS_BARRIER_H */