~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/network_ms.cc

  • Committer: Mark Atwood
  • Date: 2011-12-20 02:32:53 UTC
  • mfrom: (2469.1.1 drizzle-build)
  • Revision ID: me@mark.atwood.name-20111220023253-bvu0kr14kwsdvz7g
mergeĀ lp:~brianaker/drizzle/deprecate-pbms

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
2
 
 *
3
 
 * PrimeBase Media Stream for MySQL
4
 
 *
5
 
 * This program is free software; you can redistribute it and/or modify
6
 
 * it under the terms of the GNU General Public License as published by
7
 
 * the Free Software Foundation; either version 2 of the License, or
8
 
 * (at your option) any later version.
9
 
 *
10
 
 * This program is distributed in the hope that it will be useful,
11
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
 * GNU General Public License for more details.
14
 
 *
15
 
 * You should have received a copy of the GNU General Public License
16
 
 * along with this program; if not, write to the Free Software
17
 
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
 
 *
19
 
 * Original author: Paul McCullagh
20
 
 * Continued development: Barry Leslie
21
 
 *
22
 
 * 2007-05-25
23
 
 *
24
 
 * H&G2JCtL
25
 
 *
26
 
 * Network interface.
27
 
 *
28
 
 */
29
 
#include "cslib/CSConfig.h"
30
 
 
31
 
#include "defs_ms.h"
32
 
 
33
 
#include "cslib/CSGlobal.h"
34
 
#include "cslib/CSLog.h"
35
 
 
36
 
#include "network_ms.h"
37
 
#include "connection_handler_ms.h"
38
 
 
39
 
MSSystemThread          *MSNetwork::gSystemThread;
40
 
time_t                          MSNetwork::gCurrentTime;
41
 
time_t                          MSNetwork::gLastService;
42
 
CSThreadList            *MSNetwork::gHandlerList;
43
 
CSSync                          MSNetwork::gListenerLock;
44
 
CSSocket                        *MSNetwork::gListenerSocket;
45
 
MSConnectionHandler     *MSNetwork::gListenerThread;
46
 
uint32_t                                MSNetwork::gWaitingToListen;
47
 
int                                     MSNetwork::handlerCount;
48
 
 
49
 
/*
50
 
 * -------------------------------------------------------------------------
51
 
 * SYSTEM THREAD
52
 
 */
53
 
 
54
 
bool MSSystemThread::doWork()
55
 
{
56
 
        bool    killed = true;
57
 
 
58
 
        enter_();
59
 
        MSNetwork::gCurrentTime = time(NULL);
60
 
        if ((MSNetwork::gCurrentTime - MSNetwork::gLastService) >= (MS_IDLE_THREAD_TIMEOUT/2)) {
61
 
                MSNetwork::gLastService = MSNetwork::gCurrentTime;
62
 
                while (!myMustQuit && killed) {
63
 
                        killed = MSNetwork::killListener();
64
 
                        MSNetwork::gCurrentTime = time(NULL);
65
 
                }
66
 
        }
67
 
        return_(true);
68
 
}
69
 
 
70
 
/*
71
 
 * -------------------------------------------------------------------------
72
 
 * NETWORK FUNCTIONS
73
 
 */
74
 
 
75
 
void MSNetwork::startUp(int port)
76
 
{
77
 
        enter_();
78
 
        gCurrentTime = time(NULL);
79
 
        gLastService = gCurrentTime;
80
 
        gListenerSocket = NULL;
81
 
        handlerCount = 0;
82
 
        
83
 
        CSL.lock();
84
 
        CSL.log(self, CSLog::Protocol, "Media Stream Daemon ");
85
 
        if (port) {
86
 
        CSL.log(self, CSLog::Protocol, " listening on port ");
87
 
        CSL.log(self, CSLog::Protocol, port);
88
 
        } else
89
 
                CSL.log(self, CSLog::Protocol, " not published ");
90
 
        CSL.log(self, CSLog::Protocol, "\n");
91
 
        CSL.unlock();
92
 
 
93
 
        new_(gHandlerList, CSThreadList());
94
 
        if (port) {
95
 
        gListenerSocket = CSSocket::newSocket();
96
 
        gListenerSocket->publish(NULL, port);
97
 
        } else 
98
 
                gListenerSocket = NULL;
99
 
 
100
 
        new_(gSystemThread, MSSystemThread(1000 /* 1 sec */, NULL));
101
 
        gSystemThread->start();
102
 
        exit_();
103
 
}
104
 
 
105
 
void MSNetwork::shutDown()
106
 
{
107
 
        enter_();
108
 
 
109
 
        if (gSystemThread) {
110
 
                gSystemThread->stop();
111
 
                gSystemThread->release();
112
 
                gSystemThread = NULL;
113
 
        }
114
 
 
115
 
        /* This will set all threads to quiting: */
116
 
        if (gHandlerList)
117
 
                gHandlerList->quitAllThreads();
118
 
 
119
 
        /* Close the socket: */
120
 
        if (gListenerThread)
121
 
                gListenerThread->shuttingDown = true; // Block error messages as a result of the listener being killed
122
 
        
123
 
        lock_(&gListenerLock);
124
 
        if (gListenerSocket) {
125
 
                try_(a) {
126
 
                        gListenerSocket->release();
127
 
                }
128
 
                catch_(a) {
129
 
                        self->logException();
130
 
                }
131
 
                cont_(a);
132
 
        }
133
 
        gListenerSocket = NULL;
134
 
        unlock_(&gListenerLock);
135
 
 
136
 
        if (gHandlerList) {
137
 
                try_(b) {
138
 
                        /* This will stop any threads remaining: */
139
 
                        gHandlerList->release();
140
 
                }
141
 
                catch_(b) {
142
 
                        self->logException();
143
 
                }
144
 
                cont_(b);
145
 
        }
146
 
 
147
 
        CSL.log(self, CSLog::Protocol, "PrimeBase Media Stream Daemon no longer published\n");
148
 
        exit_();
149
 
}
150
 
 
151
 
void MSNetwork::startConnectionHandler()
152
 
{
153
 
        char                            buffer[120];
154
 
        MSConnectionHandler     *thread;
155
 
 
156
 
        enter_();
157
 
        handlerCount++;
158
 
        snprintf(buffer, 120, "NetworkHandler%d", handlerCount);
159
 
        lock_(gHandlerList);
160
 
        thread = MSConnectionHandler::newHandler(MSNetwork::gHandlerList);
161
 
        unlock_(gHandlerList);
162
 
        push_(thread);
163
 
        thread->threadName = CSString::newString(buffer);
164
 
        thread->start();
165
 
        release_(thread);
166
 
        exit_();
167
 
}
168
 
 
169
 
/*
170
 
 * Return NULL of a connection cannot be openned, and the
171
 
 * thread must quit.
172
 
 */
173
 
class OpenConnectioCleanUp : public CSRefObject {
174
 
        bool do_cleanup;
175
 
 
176
 
        public:
177
 
        
178
 
        OpenConnectioCleanUp(): CSRefObject(),
179
 
                do_cleanup(false){}
180
 
                
181
 
        ~OpenConnectioCleanUp() 
182
 
        {
183
 
                if (do_cleanup) {
184
 
                        MSNetwork::unlockListenerSocket();
185
 
                }
186
 
        }
187
 
        
188
 
        void setCleanUp()
189
 
        {
190
 
                do_cleanup = true;
191
 
        }
192
 
        
193
 
        void cancelCleanUp()
194
 
        {
195
 
                do_cleanup = false;
196
 
        }
197
 
        
198
 
};
199
 
 
200
 
/*
201
 
 * Return NULL if a connection cannot be openned, and the
202
 
 * thread must quit.
203
 
 */
204
 
CSSocket *MSNetwork::openConnection(MSConnectionHandler *handler)
205
 
{
206
 
        CSSocket *sock = NULL;
207
 
        OpenConnectioCleanUp *cleanup;
208
 
 
209
 
        enter_();
210
 
        
211
 
        if(!MSNetwork::gListenerSocket) {
212
 
                return_(NULL);
213
 
        }
214
 
        
215
 
        sock = CSSocket::newSocket();
216
 
        push_(sock);
217
 
 
218
 
        /* Wait for a connection: */
219
 
        if (!lockListenerSocket(handler)) {
220
 
                release_(sock);
221
 
                return_(NULL);
222
 
        }
223
 
 
224
 
        new_(cleanup, OpenConnectioCleanUp());
225
 
        push_(cleanup);
226
 
        
227
 
        cleanup->setCleanUp();
228
 
        sock->open(MSNetwork::gListenerSocket);
229
 
        cleanup->cancelCleanUp();
230
 
 
231
 
        handler->lastUse = gCurrentTime;
232
 
 
233
 
        unlockListenerSocket();
234
 
 
235
 
        release_(cleanup);
236
 
        pop_(sock);
237
 
        return_(sock);
238
 
}
239
 
 
240
 
void MSNetwork::startNetwork()
241
 
{
242
 
        enter_();
243
 
        startConnectionHandler();
244
 
        exit_();
245
 
}
246
 
 
247
 
bool MSNetwork::lockListenerSocket(MSConnectionHandler *handler)
248
 
{
249
 
        bool socket_locked = false;
250
 
 
251
 
        enter_();
252
 
        if (handler->myMustQuit)
253
 
                return false;
254
 
        lock_(&gListenerLock);
255
 
        if (gListenerSocket) {
256
 
                /* Wait for the listen socket to be freed: */
257
 
                if (gListenerThread) {
258
 
                        gWaitingToListen++;
259
 
                        handler->amWaitingToListen = true;
260
 
                        while (gListenerThread) {
261
 
                                if (handler->myMustQuit)
262
 
                                        break;
263
 
                                try_(a) {
264
 
                                        gListenerLock.wait(2000);
265
 
                                }
266
 
                                catch_(a) {
267
 
                                        /* Catch any error */;
268
 
                                }
269
 
                                cont_(a);
270
 
                        }
271
 
                        gWaitingToListen--;
272
 
                        handler->amWaitingToListen = false;
273
 
                }
274
 
                if (!handler->myMustQuit) {
275
 
                        gListenerThread = handler;
276
 
                        socket_locked = true;
277
 
                }
278
 
        }
279
 
        unlock_(&gListenerLock);
280
 
        return_(socket_locked);
281
 
}
282
 
 
283
 
void MSNetwork::unlockListenerSocket()
284
 
{
285
 
        enter_();
286
 
        lock_(&gListenerLock);
287
 
        gListenerThread = NULL;
288
 
        gListenerLock.wakeup();
289
 
        unlock_(&gListenerLock);
290
 
        exit_();
291
 
}
292
 
 
293
 
/* Kill a listener if possible!
294
 
 * Return true if a thread was killed.
295
 
 */
296
 
bool MSNetwork::killListener()
297
 
{
298
 
        MSConnectionHandler     *ptr = NULL;
299
 
 
300
 
        enter_();
301
 
        lock_(&gListenerLock);
302
 
        if (gListenerThread && gWaitingToListen > 0) {
303
 
                /* Kill one: */
304
 
                lock_(gHandlerList);
305
 
                ptr = (MSConnectionHandler *) gHandlerList->getBack();
306
 
                while (ptr) {
307
 
                        if (ptr->amWaitingToListen) {
308
 
                                if (gCurrentTime > ptr->lastUse && (gCurrentTime - ptr->lastUse) > MS_IDLE_THREAD_TIMEOUT) {
309
 
                                        ptr->myMustQuit = true;
310
 
                                        ptr->wakeup();
311
 
                                        break;
312
 
                                }
313
 
                        }
314
 
                        ptr = (MSConnectionHandler *) ptr->getNextLink();
315
 
                }
316
 
                unlock_(gHandlerList);
317
 
        }
318
 
        unlock_(&gListenerLock);
319
 
        if (ptr) {
320
 
                ptr->join();
321
 
                return_(true);
322
 
        }
323
 
        return_(false);
324
 
}
325
 
 
326