~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/user_locks/barrier.h

  • Committer: Brian Aker
  • Date: 2010-11-23 09:35:51 UTC
  • mfrom: (1933.2.13 catalogs)
  • Revision ID: brian@tangent.org-20101123093551-l5m7zhz3m5c4wmlk
Merge in changes for barriers (update for kill, etc).

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2
2
 *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
3
 *
4
 
 *  Copyright (C) 2010 Brian Aker
5
 
 *
6
 
 *  This program is free software; you can redistribute it and/or modify
7
 
 *  it under the terms of the GNU General Public License as published by
8
 
 *  the Free Software Foundation; either version 2 of the License, or
9
 
 *  (at your option) any later version.
10
 
 *
11
 
 *  This program is distributed in the hope that it will be useful,
12
 
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 
 *  GNU General Public License for more details.
15
 
 *
16
 
 *  You should have received a copy of the GNU General Public License
17
 
 *  along with this program; if not, write to the Free Software
18
 
 *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 
4
 * Copyright (c) 2010, Brian Aker
 
5
 * All rights reserved.
 
6
 *
 
7
 * Redistribution and use in source and binary forms, with or without
 
8
 * modification, are permitted provided that the following conditions are met:
 
9
 *     * Redistributions of source code must retain the above copyright
 
10
 *       notice, this list of conditions and the following disclaimer.
 
11
 *     * Redistributions in binary form must reproduce the above copyright
 
12
 *       notice, this list of conditions and the following disclaimer in the
 
13
 *       documentation and/or other materials provided with the distribution.
 
14
 *     * Neither the name of the <organization> nor the
 
15
 *       names of its contributors may be used to endorse or promote products
 
16
 *       derived from this software without specific prior written permission.
 
17
 *
 
18
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
 
19
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 
20
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 
21
 * DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
 
22
 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 
23
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 
24
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 
25
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
26
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 
27
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
28
 *
19
29
 */
20
30
 
21
31
#include <boost/thread/mutex.hpp>
23
33
#include <boost/shared_ptr.hpp>
24
34
#include <boost/foreach.hpp>
25
35
 
 
36
#include "observer.h"
 
37
 
26
38
#ifndef PLUGIN_USER_LOCKS_BARRIER_H
27
39
#define PLUGIN_USER_LOCKS_BARRIER_H
28
40
 
44
56
 
45
57
// Barrier starts in a blocking posistion
46
58
class Barrier {
47
 
  struct observer_st {
48
 
    typedef boost::shared_ptr<observer_st> shared_ptr;
49
 
    typedef std::list <shared_ptr> list;
50
 
 
51
 
    bool woken;
52
 
    int64_t waiting_for;
53
 
    int64_t generation;
54
 
    boost::mutex _mutex;
55
 
    boost::condition_variable_any cond;
56
 
 
57
 
    observer_st(int64_t wait_until_arg, int64_t generation_arg) :
58
 
      woken(false),
59
 
      waiting_for(wait_until_arg),
60
 
      generation(generation_arg)
61
 
    { 
62
 
      _mutex.lock();
63
 
    }
64
 
 
65
 
    void sleep()
66
 
    {
67
 
      while (not woken)
68
 
      {
69
 
        cond.wait(_mutex);
70
 
      }
71
 
      _mutex.unlock();
72
 
    }
73
 
 
74
 
    void wake()
75
 
    {
76
 
      {
77
 
        boost::mutex::scoped_lock mutex;
78
 
        woken= true;
79
 
      }
80
 
      cond.notify_all();
81
 
    }
82
 
 
83
 
 
84
 
    ~observer_st()
85
 
    {
86
 
    }
87
 
  };
88
 
 
89
59
public:
90
60
  typedef boost::shared_ptr<Barrier> shared_ptr;
91
61
 
139
109
    }
140
110
    checkObservers();
141
111
 
142
 
    while (my_generation == generation)
143
 
    {
144
 
      sleep_threshhold.wait(sleeper_mutex);
 
112
    // If we are interrupted we remove ourself from the list, and check on
 
113
    // the observers.
 
114
    try 
 
115
    {
 
116
      while (my_generation == generation)
 
117
      {
 
118
        sleep_threshhold.wait(sleeper_mutex);
 
119
      }
 
120
    }
 
121
    catch(boost::thread_interrupted const& error)
 
122
    {
 
123
      current_wait++;
 
124
      checkObservers();
145
125
    }
146
126
  }
147
127
 
148
128
  // A call to either signal or a release will cause wait_for() to continue
149
129
  void wait_until(int64_t wait_until_arg)
150
130
  {
151
 
    observer_st::shared_ptr observer;
 
131
    Observer::shared_ptr observer;
152
132
    {
153
133
      boost::mutex::scoped_lock scopedLock(sleeper_mutex);
154
134
 
155
135
      if (wait_until_arg <= count())
156
136
        return;
157
137
 
158
 
      observer.reset(new observer_st(wait_until_arg, generation));
 
138
      observer.reset(new Observer(wait_until_arg));
159
139
      observers.push_back(observer);
160
140
    }
161
 
    observer->sleep();
 
141
 
 
142
    try {
 
143
      observer->sleep();
 
144
    }
 
145
    catch(boost::thread_interrupted const& error)
 
146
    {
 
147
      boost::mutex::scoped_lock scopedLock(sleeper_mutex);
 
148
      // Someone has interrupted us, we now try to remove ourself from the
 
149
      // observer chain ourself
 
150
 
 
151
      observers.remove(observer);
 
152
      
 
153
      throw error;
 
154
    }
162
155
  }
163
156
 
164
157
  void wait(int64_t generation_arg)
221
214
    checkObservers();
222
215
  }
223
216
 
224
 
  struct isReady : public std::unary_function<observer_st::list::const_reference, bool>
 
217
  struct isReady : public std::unary_function<Observer::list::const_reference, bool>
225
218
  {
226
219
    const int64_t count;
227
220
 
231
224
 
232
225
    result_type operator() (argument_type observer)
233
226
    {
234
 
      if (observer->waiting_for <= count or count == 0)
 
227
      if (observer->getLimit() <= count or count == 0)
235
228
      {
236
229
        observer->wake();
237
230
        return true;
262
255
  int64_t current_wait;
263
256
  int64_t generation;
264
257
 
265
 
  observer_st::list observers;
 
258
  Observer::list observers;
266
259
 
267
260
  boost::mutex sleeper_mutex;
268
261
  boost::condition_variable_any sleep_threshhold;