~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/pbmslib.cc

  • Committer: Monty Taylor
  • Date: 2010-07-06 00:44:32 UTC
  • mfrom: (1643.1.13 build)
  • Revision ID: mordred@inaugust.com-20100706004432-843uftc92rc2497l
Merged in PBMS, translation updates, a few build fixes and a few bug fixes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 *  PrimeBase Media Stream (PBMS)
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * 2008-09-10   Barry Leslie
 
20
 *
 
21
 * H&G2JCtL
 
22
 */
 
23
#ifdef DRIZZLED
 
24
#include "config.h"
 
25
#include <drizzled/common.h>
 
26
#include <drizzled/session.h>
 
27
#include <drizzled/charset_info.h>
 
28
#endif
 
29
 
 
30
#include "cslib/CSConfig.h"
 
31
#include <inttypes.h>
 
32
 
 
33
#include <curl/curl.h>
 
34
#include <string.h>
 
35
 
 
36
#include "pbmslib.h"
 
37
#include "pbms.h"
 
38
#include "cslib/CSGlobal.h"
 
39
#include "cslib/CSThread.h"
 
40
#include "cslib/CSString.h"
 
41
#include "cslib/CSStrUtil.h"
 
42
//#include "cslib/CSSocket.h"
 
43
#include "cslib/CSHTTPStream.h"
 
44
#include "cslib/CSMd5.h"
 
45
#include "cslib/CSS3Protocol.h"
 
46
#include "metadata_ms.h"
 
47
 
 
48
#define CLEAR_SELF()    CSThread::setSelf(NULL)
 
49
#define MAX_STMT_SIZE   1024
 
50
 
 
51
static int global_errno;
 
52
static char global_err_message[MS_RESULT_MESSAGE_SIZE];
 
53
 
 
54
static unsigned long init_count = 0;
 
55
static CSThreadList     *pbms_thread_list = NULL;
 
56
static  CSThread *mslib_global_thread = NULL;
 
57
 
 
58
static void report_global_mse_error(CSThread *thd)
 
59
{
 
60
        global_errno = thd->myException.getErrorCode();
 
61
        cs_strcpy(MS_RESULT_MESSAGE_SIZE, global_err_message,  thd->myException.getMessage());
 
62
}
 
63
 
 
64
#define DFLT_CONNECTION_TIMEOUT 10      // Changing this required a documentation update.
 
65
 
 
66
#define THROW_CURL_IF(v) { if (v) CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);}
 
67
//======================================
 
68
static size_t receive_data(void *ptr, size_t size, size_t nmemb, void *stream);
 
69
static size_t receive_header(void *ptr, size_t size, size_t nmemb, void *stream);
 
70
static size_t send_callback(void *ptr, size_t size, size_t nmemb, void *stream);
 
71
//--------------------------------------------
 
72
class  PBMS_ConHandle:public CSThread {
 
73
        public:
 
74
        
 
75
        // Fields that must be freed when the object is destroyed.
 
76
        CSString                        *ms_host;
 
77
        CSString                        *ms_database;
 
78
        struct curl_slist       *ms_header_list;        // A curl list of headers to be sent with the next PUT.
 
79
        struct curl_slist       *ms_info_only;  // A curl list of headers to be sent with Get.
 
80
        struct curl_slist       *ms_ping_header;        // A curl list of headers to be sent with ping.
 
81
        CSStringBuffer          *ms_url_str;
 
82
        CURL                            *ms_curl;       
 
83
        CSStringBuffer          *ms_buffer;
 
84
        CSStringBuffer          *ms_errorReply;
 
85
        CSS3Protocol            *ms_cloud;
 
86
        uint64_t                        ms_range_start;
 
87
        uint64_t                        ms_range_end;
 
88
 
 
89
        unsigned int            ms_replyStatus;
 
90
        CSHTTPHeaders           ms_headers; 
 
91
        CSHTTPHeaders           ms_metadata_out;
 
92
        uint32_t                                ms_next_header;
 
93
        uint32_t                                ms_max_header;  
 
94
        unsigned int            ms_port;
 
95
        unsigned int            ms_transmition_timeout; // In the future this may have some effect but for now it is always be 0 (no timeout).
 
96
        unsigned int            ms_url_base_len;
 
97
        bool                            ms_throw_error; // Gets set if an exception occurs in a callback.
 
98
 
 
99
        int                                     ms_errno;
 
100
        char                            ms_err_message[MS_RESULT_MESSAGE_SIZE];
 
101
        
 
102
        
 
103
        char ms_curl_error[CURL_ERROR_SIZE];
 
104
        
 
105
        size_t ms_DataToBeTransfered;
 
106
        // Get data caller parameters:
 
107
        u_char                          *ms_getBuffer;
 
108
        size_t                          ms_getBufferSize;
 
109
        PBMS_WRITE_CALLBACK_FUNC        ms_writeCB;
 
110
        void                            *ms_getCBData;
 
111
        
 
112
        // Put data caller parameters:
 
113
        const u_char            *ms_putData;
 
114
        size_t                          ms_putDataLen;
 
115
        size_t                          ms_putDataOffset;
 
116
        PBMS_READ_CALLBACK_FUNC ms_readCB;
 
117
        void                            *ms_putCBData;
 
118
        
 
119
        PBMS_ConHandle():
 
120
                CSThread(pbms_thread_list),
 
121
                ms_host(NULL),
 
122
                ms_database(NULL),
 
123
                ms_header_list(NULL),
 
124
                ms_info_only(NULL),
 
125
                ms_ping_header(NULL),
 
126
                ms_url_str(NULL),
 
127
                ms_curl(NULL),  
 
128
                ms_buffer(NULL),
 
129
                ms_errorReply(NULL),
 
130
                ms_cloud(NULL),
 
131
                ms_range_start(0),
 
132
                ms_range_end(0),
 
133
                ms_replyStatus(0),
 
134
                ms_next_header(0),
 
135
                ms_max_header(0),       
 
136
                ms_port(0),
 
137
                ms_transmition_timeout(0),
 
138
                ms_url_base_len(0),
 
139
                ms_throw_error(false),
 
140
                ms_errno(0),
 
141
                ms_DataToBeTransfered(0),
 
142
                ms_getBuffer(NULL),
 
143
                ms_getBufferSize(0),
 
144
                ms_getCBData(NULL),
 
145
                ms_putData(NULL),
 
146
                ms_putDataLen(0),
 
147
                ms_putDataOffset(0),
 
148
                ms_putCBData(NULL)
 
149
        {
 
150
                }
 
151
                
 
152
        ~PBMS_ConHandle() {
 
153
                if (ms_curl) 
 
154
                        curl_easy_cleanup(ms_curl);
 
155
                        
 
156
                if (ms_cloud) 
 
157
                        ms_cloud->release();
 
158
                        
 
159
                if (ms_database) 
 
160
                        ms_database->release();
 
161
                        
 
162
                if (ms_host) 
 
163
                        ms_host->release();
 
164
                        
 
165
                if (ms_buffer) 
 
166
                        ms_buffer->release();
 
167
                        
 
168
                if (ms_errorReply) 
 
169
                        ms_errorReply->release();
 
170
                        
 
171
                if (ms_url_str) 
 
172
                        ms_url_str->release();
 
173
                        
 
174
                if (ms_header_list) 
 
175
                        curl_slist_free_all(ms_header_list);
 
176
                        
 
177
                if (ms_info_only) 
 
178
                        curl_slist_free_all(ms_info_only);
 
179
                        
 
180
                if(ms_ping_header)
 
181
                        curl_slist_free_all(ms_ping_header);
 
182
                        
 
183
                ms_headers.clearHeaders();
 
184
                ms_metadata_out.clearHeaders();
 
185
                        
 
186
        }
 
187
        void set_downLoadUserData(u_char *buffer, size_t buffer_size, PBMS_WRITE_CALLBACK_FUNC cb = NULL, void *caller_data = NULL)     
 
188
        {
 
189
                ms_DataToBeTransfered = buffer_size;
 
190
                ms_getBuffer = buffer;
 
191
                ms_getBufferSize = buffer_size;
 
192
                ms_writeCB = cb;
 
193
                ms_getCBData = caller_data;             
 
194
        }
 
195
        
 
196
        void set_upLoadUserData(const u_char *buffer, size_t size, PBMS_READ_CALLBACK_FUNC cb = NULL, void *caller_data = NULL) 
 
197
        {
 
198
                ms_DataToBeTransfered = size;
 
199
                ms_putData = buffer;
 
200
                ms_putDataLen = size;
 
201
                ms_putDataOffset =0;
 
202
                ms_readCB = cb;
 
203
                ms_putCBData = caller_data;
 
204
        }
 
205
        
 
206
        
 
207
        void ms_initConnection(const char* host, unsigned int port, const char* database)
 
208
        {
 
209
                ms_curl = curl_easy_init();
 
210
                if (!ms_curl)
 
211
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_easy_init() failed.");
 
212
                
 
213
                if (curl_easy_setopt(ms_curl, CURLOPT_ERRORBUFFER, ms_curl_error))
 
214
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_easy_setopt(CURLOPT_ERRORBUFFER) failed.");
 
215
                
 
216
                // Uncomment this line to trace network action during request. Very Usefull!!
 
217
                //curl_easy_setopt(ms_curl, CURLOPT_VERBOSE, 1L);
 
218
                
 
219
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_TCP_NODELAY, 1L));
 
220
 
 
221
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_NOPROGRESS, 1L));
 
222
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_WRITEFUNCTION, receive_data));
 
223
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_READFUNCTION, send_callback));  
 
224
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HEADERFUNCTION, receive_header));
 
225
                
 
226
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_WRITEDATA, this));
 
227
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_READDATA, this));
 
228
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_WRITEHEADER, this));
 
229
 
 
230
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_FOLLOWLOCATION, 1L)); // Follow redirects.
 
231
 
 
232
                ms_port = port;
 
233
                pbms_api_owner = true;
 
234
                ms_host = CSString::newString(host);
 
235
                ms_database = CSString::newString(database);
 
236
                
 
237
                ms_url_str = new CSStringBuffer(50);    
 
238
                ms_url_str->append("http://");
 
239
                ms_url_str->append(host);
 
240
                ms_url_str->append(":");
 
241
                ms_url_str->append(port);
 
242
                ms_url_str->append("/");
 
243
                ms_url_base_len =       ms_url_str->length();
 
244
                        
 
245
                ms_buffer = new CSStringBuffer(50);     
 
246
 
 
247
                ms_info_only = curl_slist_append(ms_info_only, MS_BLOB_INFO_REQUEST ": yes");
 
248
                if (!ms_info_only) 
 
249
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
 
250
                        
 
251
                ms_buffer->append(MS_PING_REQUEST": ");
 
252
                        ms_buffer->append(database);
 
253
                ms_ping_header = curl_slist_append(ms_ping_header, ms_buffer->getCString());
 
254
                if (!ms_ping_header) 
 
255
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
 
256
 
 
257
                ms_buffer->setLength(0);
 
258
 
 
259
        }
 
260
        
 
261
        void ms_ping();
 
262
        
 
263
        void ms_setSelf()
 
264
        {
 
265
                setSelf(this);
 
266
        }
 
267
 
 
268
        void ms_init_put_blob(curl_off_t size, const char *table, const char *alias, const char *checksum, bool use_cloud);
 
269
        
 
270
        void ms_init_get_blob(const char *ref, bool is_alias, bool info_only);
 
271
        
 
272
        void ms_get_info(const char *ref, bool is_alias);
 
273
        void ms_sendCloudBLOB(size_t size)      ;
 
274
        
 
275
        pbms_bool ms_downLoadData(const char *ref, u_char *buffer, size_t buffer_size, PBMS_WRITE_CALLBACK_FUNC cb = NULL, void *caller_data = NULL);
 
276
 
 
277
        pbms_bool ms_upLoadData(const char *table, const char *alias, const char *checksum, char *ref, size_t size, const u_char *data, PBMS_READ_CALLBACK_FUNC cb = NULL, void *caller_data = NULL);
 
278
        
 
279
        uint32_t ms_init_fetch() {ms_next_header =0; return ms_max_header = ms_headers.numHeaders();}
 
280
        
 
281
        bool ms_next(const char **name, const char **value) 
 
282
        {
 
283
                if (ms_next_header >= ms_max_header)
 
284
                        return false;
 
285
                        
 
286
                CSHeader *header = ms_headers.getHeader(ms_next_header++);
 
287
                *name = header->getNameCString();
 
288
                *value = header->getValueCString();
 
289
                header->release();
 
290
                return true;
 
291
        }
 
292
        
 
293
        void dump_headers() 
 
294
        {
 
295
                uint32_t i = 0;
 
296
                CSHeader *header;
 
297
                printf("Headers:\n");
 
298
                printf("---------------------------------------\n");
 
299
                while  ( (header = ms_headers.getHeader(i++)) ) {
 
300
                        printf("%s : %s\n", header->getNameCString(), header->getValueCString());
 
301
                        header->release();                      
 
302
                }
 
303
                printf("---------------------------------------\n");
 
304
        }
 
305
                                        
 
306
        void ms_addS3HeadersHeaders(CSVector *s3Headers);
 
307
 
 
308
        CSString        *ms_get_metadata(const char *name) 
 
309
        { 
 
310
                return ms_headers.getHeaderValue(name);
 
311
        }
 
312
                                        
 
313
        CSString        *ms_get_alias() {return ms_get_metadata(MS_ALIAS_TAG);}
 
314
 
 
315
        CSString        *ms_get_checksum() {return ms_get_metadata(MS_CHECKSUM_TAG);}
 
316
 
 
317
        void report_error(CSThread *self)
 
318
        {
 
319
                ms_errno = self->myException.getErrorCode();
 
320
                cs_strcpy(MS_RESULT_MESSAGE_SIZE, ms_err_message,  self->myException.getMessage());
 
321
        }
 
322
 
 
323
        void report_error(int err, const char *msg)
 
324
        {
 
325
                ms_errno = err;
 
326
                cs_strcpy(MS_RESULT_MESSAGE_SIZE, ms_err_message,  msg);
 
327
        }
 
328
 
 
329
        void throw_http_reply_exception();
 
330
        
 
331
        void check_reply_status() 
 
332
        {
 
333
                switch (ms_replyStatus) {
 
334
                        case 200:
 
335
                        //case 301: // Moved Permanently
 
336
                        //case 307: // Temporary Redirect
 
337
                                break;
 
338
                        default:
 
339
                                throw_http_reply_exception();
 
340
                }
 
341
                
 
342
        }
 
343
        
 
344
        void ms_getCloudHeaders() 
 
345
        {
 
346
                CSString *value = NULL;
 
347
                
 
348
                enter_();
 
349
                
 
350
                // Get the S3 server
 
351
                value = ms_headers.getHeaderValue(MS_CLOUD_SERVER);
 
352
                if (!value) {
 
353
                        if (ms_cloud)
 
354
                                ms_cloud->release();
 
355
                        ms_cloud = NULL;
 
356
                        exit_();
 
357
                }
 
358
                
 
359
                push_(value);
 
360
                if (!ms_cloud)
 
361
                        new_(ms_cloud, CSS3Protocol());
 
362
                        
 
363
                // Remove the cloud headers so they are not visable to the caller.
 
364
                ms_headers.removeHeader(MS_CLOUD_SERVER); 
 
365
                
 
366
                ms_cloud->s3_setServer(value->getCString());
 
367
                release_(value);                
 
368
                        
 
369
                // Get the S3 public key
 
370
                value = ms_headers.getHeaderValue(MS_CLOUD_KEY);
 
371
                if (!value)
 
372
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Missing S3 public key in reply.");
 
373
                
 
374
                push_(value);
 
375
                ms_headers.removeHeader(MS_CLOUD_KEY);
 
376
                ms_cloud->s3_setPublicKey(value->getCString());
 
377
                release_(value);                
 
378
 
 
379
                exit_();                                
 
380
        }
 
381
 
 
382
};
 
383
 
 
384
// CURL callback functions:
 
385
////////////////////////////
 
386
static size_t receive_data(void *vptr, size_t objs, size_t obj_size, void *v_con)
 
387
{
 
388
        PBMS_ConHandle *con = (PBMS_ConHandle*) v_con;
 
389
        size_t data_len = objs * obj_size;
 
390
        char *ptr = (char*)vptr;
 
391
 
 
392
        CLOBBER_PROTECT(data_len);
 
393
 
 
394
        if (con->ms_replyStatus >= 400) { // Collect the error reply.
 
395
                enter_();
 
396
                try_(a) {
 
397
                        if (!con->ms_errorReply)
 
398
                                con->ms_errorReply = new CSStringBuffer(50);            
 
399
                        con->ms_errorReply->append(ptr, data_len);
 
400
                }
 
401
                catch_(a);
 
402
                con->ms_throw_error = true;
 
403
                data_len = -1;
 
404
                
 
405
                cont_(a);
 
406
                return_(data_len);
 
407
        }
 
408
        
 
409
        if (data_len > con->ms_DataToBeTransfered) { // This should never happen.
 
410
                CSException::RecordException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Blob data overflow.");
 
411
                con->ms_throw_error = true;
 
412
                return -1;
 
413
        }
 
414
        
 
415
        con->ms_DataToBeTransfered -= data_len;
 
416
        if (con->ms_writeCB) 
 
417
                return (con->ms_writeCB)(con->ms_getCBData, ptr, data_len, false); 
 
418
                                
 
419
        memcpy(con->ms_getBuffer, ptr, data_len);
 
420
        con->ms_getBuffer += data_len;
 
421
        
 
422
        return data_len;
 
423
}
 
424
 
 
425
#define IS_REDIRECT(s) ((s >= 300) && (s < 400))
 
426
//----------------------
 
427
static size_t receive_header(void *header, size_t objs, size_t obj_size, void *v_con)
 
428
{
 
429
        PBMS_ConHandle *con = (PBMS_ConHandle*) v_con;
 
430
        size_t size = objs * obj_size;
 
431
        char *end, *ptr = (char*) header, *value = NULL;
 
432
        const char *name = NULL;
 
433
        uint32_t name_len =0, value_len =0;
 
434
        
 
435
        CLOBBER_PROTECT(size);
 
436
        CLOBBER_PROTECT(ptr);
 
437
        CLOBBER_PROTECT(value);
 
438
        CLOBBER_PROTECT(name_len);
 
439
        CLOBBER_PROTECT(value_len);
 
440
        CLOBBER_PROTECT(name);
 
441
 
 
442
        end = ptr + size;
 
443
        if (*(end -2) == '\r' && *(end -1) == '\n')
 
444
                end -=2;
 
445
                
 
446
        while ((end != ptr) && (*ptr == ' ')) ptr++;
 
447
        if (end == ptr)
 
448
                return size;
 
449
        
 
450
        // Get the reply status.        
 
451
        if (((!con->ms_replyStatus) || (con->ms_replyStatus == 100) || IS_REDIRECT(con->ms_replyStatus) ) && !strncasecmp(ptr, "HTTP", 4)) {
 
452
                char status[4];
 
453
                while ((end != ptr) && (*ptr != ' ')) ptr++; // skip HTTP stuff
 
454
                while ((end != ptr) && (*ptr == ' ')) ptr++; // find the start of eh status code.
 
455
                if (end == ptr)
 
456
                        return size;
 
457
                        
 
458
                if (end < (ptr +3)) // expecting a 3 digit status code.
 
459
                        return size;
 
460
                        
 
461
                memcpy(status, ptr, 3);
 
462
                status[3] = 0;
 
463
                
 
464
                con->ms_replyStatus = atoi(status);
 
465
        }
 
466
        
 
467
        name = ptr;
 
468
        while ((end != ptr) && (*ptr != ':')) ptr++;
 
469
        if (end == ptr)
 
470
                return size;
 
471
        name_len = ptr - name;
 
472
        
 
473
        ptr++; 
 
474
        while ((end != ptr) && (*ptr == ' ')) ptr++;
 
475
        if (end == ptr)
 
476
                return size;
 
477
        
 
478
        value = ptr;
 
479
        value_len = end - value;
 
480
        
 
481
        while (name[name_len-1] == ' ') name_len--;
 
482
        while (value[value_len-1] == ' ') value_len--;
 
483
        
 
484
        if (!strncasecmp(name, "Content-Length", 14)) {
 
485
                size_t length;
 
486
                char len[32];
 
487
                memcpy(len, value, (value_len > 31)?31:value_len);
 
488
                len[(value_len > 31)?31:value_len] = 0;
 
489
                
 
490
                length = atoll(len);
 
491
                // If there is no callback then the data size is limited
 
492
                // to the GetBuffer size.
 
493
                if (con->ms_writeCB || (length < con->ms_getBufferSize))
 
494
                        con->ms_DataToBeTransfered = length;
 
495
                        
 
496
        }
 
497
        
 
498
 
 
499
        enter_();
 
500
        try_(a) {
 
501
                if (!strncasecmp(name, "ETag", 4)) { // S3 checksum
 
502
                        name = MS_CHECKSUM_TAG;
 
503
                        name_len = strlen(MS_CHECKSUM_TAG);
 
504
                        // Strip any quotes
 
505
                        if (*value == '"') {
 
506
                                value++;value_len--;
 
507
                        }
 
508
                        if (value[value_len-1] == '"') {
 
509
                                value_len--;
 
510
                        }
 
511
                        con->ms_headers.removeHeader(MS_CHECKSUM_TAG);
 
512
                }
 
513
                con->ms_headers.addHeader(name, name_len, value, value_len);
 
514
        }
 
515
        
 
516
        catch_(a);
 
517
        con->ms_throw_error = true;
 
518
        return_(-1);
 
519
                
 
520
        cont_(a);
 
521
        return_(size);
 
522
}
 
523
 
 
524
//----------------------
 
525
static size_t send_callback(void *ptr, size_t objs, size_t obj_size, void *v_con)
 
526
{
 
527
        PBMS_ConHandle *con = (PBMS_ConHandle*) v_con;
 
528
        char *buffer = (char*) ptr;
 
529
        size_t data_sent = 0, buffer_size = objs * obj_size;
 
530
 
 
531
        if (con->ms_putDataLen == 0)
 
532
                return 0;
 
533
                
 
534
        if (con->ms_readCB) 
 
535
                data_sent = (con->ms_readCB)(con->ms_putCBData, buffer , buffer_size, false);           
 
536
        else {
 
537
                data_sent = (buffer_size < con->ms_putDataLen)? buffer_size: con->ms_putDataLen;
 
538
                memcpy(buffer,con->ms_putData, data_sent);
 
539
                con->ms_putData += data_sent;
 
540
        }
 
541
        con->ms_putDataLen -= data_sent;
 
542
        
 
543
        return data_sent;
 
544
}
 
545
 
 
546
#define CONTENT_TYPE "Content-Type"
 
547
//------------------------------------------------
 
548
void PBMS_ConHandle::ms_init_put_blob(curl_off_t size, const char *table, const char *alias, const char *checksum, bool use_cloud)
 
549
{
 
550
        char buffer[MS_META_VALUE_SIZE + MS_META_NAME_SIZE +2];
 
551
        int buffer_size = MS_META_VALUE_SIZE + MS_META_NAME_SIZE +2;
 
552
        bool have_content_type = false;
 
553
        
 
554
        (void)alias;
 
555
        
 
556
        ms_url_str->setLength(ms_url_base_len);
 
557
 
 
558
        ms_url_str->append(ms_database->getCString());
 
559
        if (table) {
 
560
                ms_url_str->append("/");
 
561
                ms_url_str->append(table);
 
562
        }
 
563
        
 
564
        // Remove any old headers
 
565
        if (ms_header_list) {
 
566
                curl_slist_free_all(ms_header_list);
 
567
                ms_header_list = NULL;
 
568
        }
 
569
        
 
570
        // Add metadata headers.
 
571
        uint32_t i = 0;
 
572
        CSHeader *header;
 
573
        while  ( (header = ms_metadata_out.getHeader(i++)) ) {
 
574
                cs_strcpy(buffer_size, buffer, header->getNameCString());
 
575
                if (!strcasecmp(  buffer, CONTENT_TYPE))
 
576
                        have_content_type = true;
 
577
                cs_strcat(buffer_size, buffer, ':');
 
578
                cs_strcat(buffer_size, buffer, header->getValueCString());
 
579
                header->release();                      
 
580
                ms_header_list = curl_slist_append(ms_header_list, buffer);
 
581
                if (!ms_header_list) 
 
582
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
 
583
        }
 
584
        
 
585
        if (!have_content_type) {
 
586
                // Prevent CURLOPT_POST from adding a content type. 
 
587
                ms_header_list = curl_slist_append(ms_header_list, CONTENT_TYPE ":");
 
588
                if (!ms_header_list) 
 
589
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
 
590
        }
 
591
                
 
592
        if (checksum) { 
 
593
                cs_strcpy(buffer_size, buffer, MS_CHECKSUM_TAG ":");
 
594
                cs_strcat(buffer_size, buffer, checksum);
 
595
                
 
596
                ms_header_list = curl_slist_append(ms_header_list, buffer);
 
597
                if (!ms_header_list) 
 
598
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
 
599
        }
 
600
                
 
601
        
 
602
#ifdef HAVE_ALIAS_SUPPORT
 
603
        if (alias) {
 
604
                if (strlen(alias) > MS_META_VALUE_SIZE) 
 
605
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "BLOB alias name too long.");
 
606
 
 
607
                cs_strcpy(buffer_size, buffer, MS_ALIAS_TAG ":");
 
608
                cs_strcat(buffer_size, buffer, alias);
 
609
                
 
610
                ms_header_list = curl_slist_append(ms_header_list, buffer);
 
611
                if (!ms_header_list) 
 
612
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
 
613
        }
 
614
#endif
 
615
        
 
616
        if (use_cloud) {
 
617
                cs_strcpy(buffer_size, buffer, MS_BLOB_SIZE ":");
 
618
                snprintf(buffer + strlen(buffer), buffer_size - strlen(buffer), "%"PRIu64"", size);
 
619
                ms_header_list = curl_slist_append(ms_header_list, buffer);
 
620
                if (!ms_header_list) 
 
621
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
 
622
                        
 
623
                size = 0; // The BLOB data is not being sent to the PBMS server
 
624
        }
 
625
        
 
626
        // Using CURLOPT_UPLOAD is VERY slow!
 
627
        //THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_INFILESIZE_LARGE, size));
 
628
        //THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_UPLOAD, 1L));
 
629
        THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_POSTFIELDSIZE_LARGE, size));
 
630
        THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_POST, 1L));
 
631
 
 
632
        THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, ms_header_list));
 
633
        THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_URL, ms_url_str->getCString() ));       
 
634
                
 
635
        ms_replyStatus = 0;
 
636
        ms_headers.clearHeaders();
 
637
        if (ms_errorReply)
 
638
                ms_errorReply->setLength(0);
 
639
}
 
640
        
 
641
 
 
642
//------------------------------------------------
 
643
void PBMS_ConHandle::ms_init_get_blob(const char *ref, bool is_alias, bool info_only)
 
644
{
 
645
        (void)is_alias;
 
646
        
 
647
        ms_url_str->setLength(ms_url_base_len);
 
648
        
 
649
#ifdef HAVE_ALIAS_SUPPORT
 
650
        MSBlobURLRec blob;
 
651
        if (is_alias || !PBMSBlobURLTools::couldBeURL(ref, &blob)) {
 
652
                ms_url_str->append(ms_database->getCString());
 
653
                ms_url_str->append("/");
 
654
        }
 
655
#endif
 
656
        ms_url_str->append((char *) ref);
 
657
        
 
658
        //THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_UPLOAD, 0L));
 
659
        THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_POST, 0L));
 
660
        THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, (info_only)?ms_info_only: NULL));
 
661
        THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_URL, ms_url_str->getCString()));
 
662
        
 
663
        // NOTE: range 0-0 is valid, it returns the first byte.
 
664
        if (ms_range_start <= ms_range_end) {
 
665
                char range[80];
 
666
                snprintf(range, 80, "%"PRIu64"-%"PRIu64"", ms_range_start, ms_range_end);
 
667
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_RANGE, range));
 
668
                ms_range_start = 1;
 
669
                ms_range_end = 0;
 
670
        } else {
 
671
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_RANGE, NULL));
 
672
        }
 
673
        
 
674
        ms_replyStatus = 0;
 
675
        ms_headers.clearHeaders();
 
676
        if (ms_errorReply)
 
677
                ms_errorReply->setLength(0);
 
678
}
 
679
        
 
680
//------------------------------------------------
 
681
void PBMS_ConHandle::ms_get_info(const char *ref, bool is_alias)
 
682
{
 
683
        enter_();
 
684
        ms_init_get_blob(ref, is_alias, true);
 
685
        
 
686
        if (curl_easy_perform(ms_curl) && !ms_throw_error)
 
687
                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
 
688
        
 
689
        if (ms_throw_error)
 
690
                throw_();
 
691
                
 
692
        check_reply_status();
 
693
        
 
694
        exit_();
 
695
}
 
696
 
 
697
//------------------------------------------------
 
698
void PBMS_ConHandle::ms_ping()
 
699
{
 
700
        enter_();
 
701
        ms_url_str->setLength(ms_url_base_len);
 
702
        
 
703
//      THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_UPLOAD, 0L));
 
704
        THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_POST, 0L));
 
705
        THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, ms_ping_header));
 
706
        THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_URL, ms_url_str->getCString()));
 
707
        
 
708
        ms_replyStatus = 0;
 
709
        ms_headers.clearHeaders();
 
710
        if (ms_errorReply)
 
711
                ms_errorReply->setLength(0);
 
712
 
 
713
        if (curl_easy_perform(ms_curl) && !ms_throw_error)
 
714
                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
 
715
        
 
716
        if (ms_throw_error)
 
717
                throw_();
 
718
                
 
719
        check_reply_status();
 
720
        
 
721
        // Check to see if the BLOBs in the database are stored in a cloud: 
 
722
        CSString *cloud_server;
 
723
        cloud_server = ms_headers.getHeaderValue(MS_CLOUD_SERVER);
 
724
        if (!cloud_server) {
 
725
                if (ms_cloud)
 
726
                        ms_cloud->release();
 
727
                ms_cloud = NULL;
 
728
        } else {
 
729
                cloud_server->release();
 
730
                if (!ms_cloud)
 
731
                        new_(ms_cloud, CSS3Protocol());
 
732
        }
 
733
                
 
734
        exit_();
 
735
}
 
736
 
 
737
//------------------------------------------------
 
738
void PBMS_ConHandle::throw_http_reply_exception()
 
739
{
 
740
        CSString *reply = NULL, *error_text = NULL;
 
741
        
 
742
        enter_();
 
743
        
 
744
        size_t size = 0;
 
745
         //dump_headers();
 
746
         
 
747
        if (ms_errorReply)
 
748
                size = ms_errorReply->length();
 
749
        
 
750
        if (!size) {
 
751
                error_text = CSString::newString("Missing HTTP reply: possible Media Stream engine connection failure.");
 
752
        } else {
 
753
                uint32_t start, end;
 
754
        
 
755
                reply = CSString::newString(ms_errorReply);
 
756
                push_(reply);
 
757
                ms_errorReply = NULL;
 
758
                
 
759
                start = reply->locate(EXCEPTION_REPLY_MESSAGE_PREFIX_TAG, 1);
 
760
                start += strlen(EXCEPTION_REPLY_MESSAGE_PREFIX_TAG);
 
761
                end = reply->locate(EXCEPTION_REPLY_MESSAGE_SUFFIX_TAG, 1); 
 
762
                if (start < end) {
 
763
                        error_text = reply->substr(start, end - start);
 
764
                        push_(error_text);
 
765
                } else {
 
766
                        error_text = reply;
 
767
                }
 
768
        }
 
769
        
 
770
        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, error_text->getCString());
 
771
        
 
772
        // We never get here.
 
773
        exit_();
 
774
}
 
775
 
 
776
//------------------------------------------------
 
777
pbms_bool PBMS_ConHandle::ms_downLoadData(const char *ref, u_char *buffer, size_t buffer_size, PBMS_WRITE_CALLBACK_FUNC cb, void *caller_data)
 
778
{
 
779
        pbms_bool ok = true;
 
780
        ms_setSelf();   
 
781
 
 
782
        enter_();
 
783
        
 
784
        try_(a) {       
 
785
                set_downLoadUserData(buffer, buffer_size, cb, caller_data);
 
786
                ms_init_get_blob(ref, false, false);
 
787
                if (curl_easy_perform(ms_curl) && !ms_throw_error)
 
788
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
 
789
                
 
790
                if (ms_throw_error)
 
791
                        throw_();
 
792
                
 
793
                check_reply_status();
 
794
                        
 
795
        }
 
796
        
 
797
        catch_(a);
 
798
        report_error(self);
 
799
        ok = false;
 
800
        
 
801
        cont_(a);
 
802
        
 
803
        return_(ok);
 
804
}
 
805
 
 
806
//------------------------------------------------
 
807
void PBMS_ConHandle::ms_sendCloudBLOB(size_t size)
 
808
{
 
809
        CSInputStream *input;
 
810
        CSString *bucket, *object_key, *content_type, *signature, *signature_time;
 
811
        CSVector *s3Headers;
 
812
        
 
813
        enter_();
 
814
        
 
815
        // Get the S3 bucket
 
816
        bucket = ms_headers.getHeaderValue(MS_CLOUD_BUCKET);
 
817
        if (!bucket)
 
818
                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Missing S3 bucket header in reply.");
 
819
        push_(bucket);
 
820
        ms_headers.removeHeader(MS_CLOUD_BUCKET);
 
821
 
 
822
        // Get the S3 object key
 
823
        object_key = ms_headers.getHeaderValue(MS_CLOUD_OBJECT_KEY);
 
824
        if (!object_key)
 
825
                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Missing S3 object key header in reply.");
 
826
        push_(object_key);
 
827
        ms_headers.removeHeader(MS_CLOUD_OBJECT_KEY);
 
828
 
 
829
        // Get the S3 blob signature
 
830
        signature = ms_headers.getHeaderValue(MS_BLOB_SIGNATURE);
 
831
        if (!signature)
 
832
                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Missing S3 blob signature in reply.");
 
833
        
 
834
        push_(signature);
 
835
        ms_headers.removeHeader(MS_BLOB_SIGNATURE);
 
836
 
 
837
        // Get the S3 blob signature date
 
838
        signature_time = ms_headers.getHeaderValue(MS_BLOB_DATE);
 
839
        if (!signature_time)
 
840
                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Missing S3 blob signature date in reply.");
 
841
        push_(signature_time);
 
842
        ms_headers.removeHeader(MS_BLOB_DATE);
 
843
 
 
844
        content_type = ms_headers.getHeaderValue(CONTENT_TYPE);
 
845
        if (content_type)
 
846
                push_(content_type);
 
847
 
 
848
        if (ms_readCB)
 
849
                input = CSCallbackInputStream::newStream(ms_readCB, ms_putCBData);
 
850
        else
 
851
                input = CSMemoryInputStream::newStream(ms_putData, ms_putDataLen);
 
852
        
 
853
        // Send the BLOB to the cloud storage.
 
854
        s3Headers = ms_cloud->s3_send(input, bucket->getCString(), object_key->getCString(), size, (content_type)?content_type->getCString():NULL, NULL, signature->getCString(), atol(signature_time->getCString()));
 
855
 
 
856
        ms_addS3HeadersHeaders(s3Headers);
 
857
        
 
858
        if (content_type)
 
859
                release_(content_type);
 
860
                
 
861
        release_(signature_time);
 
862
        release_(signature);
 
863
        release_(object_key);
 
864
        release_(bucket);
 
865
        
 
866
        exit_();
 
867
}
 
868
 
 
869
//------------------------------------------------
 
870
void PBMS_ConHandle::ms_addS3HeadersHeaders(CSVector *s3Headers)
 
871
{
 
872
        CSHTTPHeaders headers;
 
873
        enter_();
 
874
        
 
875
        try_(a) {
 
876
                headers.setHeaders(s3Headers);
 
877
                
 
878
                for (uint32_t i = 0; i < headers.numHeaders(); i++) {
 
879
                        CSHeader *h = headers.getHeader(i);
 
880
                        const char *name = h->getNameCString();
 
881
                        
 
882
                        if (strcasecmp(name, "ETag") == 0){
 
883
                                const char *value = h->getValueCString();
 
884
                                uint32_t value_len = strlen(value);
 
885
                                
 
886
                                // Strip any quotes
 
887
                                if (*value == '"') {
 
888
                                        value++;value_len--;
 
889
                                }
 
890
                                if (value[value_len-1] == '"') {
 
891
                                        value_len--;
 
892
                                }
 
893
                                
 
894
                                ms_headers.removeHeader(MS_CHECKSUM_TAG);
 
895
 
 
896
                                ms_headers.addHeader(MS_CHECKSUM_TAG, CSString::newString(value, value_len));
 
897
                                h->release();
 
898
                        } else {
 
899
                                ms_headers.removeHeader(name);
 
900
                                ms_headers.addHeader(h);
 
901
                        }
 
902
                }
 
903
        }
 
904
        
 
905
        catch_(a);      
 
906
        headers.clearHeaders();
 
907
        throw_();
 
908
        
 
909
        cont_(a);       
 
910
        headers.clearHeaders();
 
911
        
 
912
        exit_();
 
913
}
 
914
//------------------------------------------------
 
915
pbms_bool PBMS_ConHandle::ms_upLoadData(const char *table, const char *alias, const char *checksum, char *ref, size_t size, const u_char *data, PBMS_READ_CALLBACK_FUNC cb, void *caller_data)
 
916
{
 
917
        pbms_bool ok = true, use_cloud = (ms_cloud != NULL);
 
918
        
 
919
        ms_setSelf();   
 
920
 
 
921
        enter_();
 
922
        
 
923
        try_(a) {
 
924
 
 
925
resend:
 
926
                ms_init_put_blob(size, table, alias, checksum, use_cloud);
 
927
                set_upLoadUserData(data, size, cb, caller_data);
 
928
                set_downLoadUserData((u_char*) ref, PBMS_BLOB_URL_SIZE -1); // Set things up to receive the BLOB ref back.
 
929
 
 
930
                if (curl_easy_perform(ms_curl) && !ms_throw_error)
 
931
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
 
932
                
 
933
                if (ms_throw_error)
 
934
                        throw_();
 
935
                        
 
936
                check_reply_status();
 
937
        
 
938
                *ms_getBuffer =0; // null terminate the blob reference string.
 
939
                ms_getCloudHeaders();
 
940
                if (ms_cloud && use_cloud) 
 
941
                        ms_sendCloudBLOB(size);
 
942
                else if (use_cloud && !ms_cloud) {
 
943
                        // We were expecting to send the BLOB to the cloud but 
 
944
                        // the server did not respond with cloud data. The retry will
 
945
                        // send the data to the PBMS server.
 
946
                        use_cloud = false;
 
947
                        goto resend;
 
948
                }
 
949
                
 
950
        }
 
951
        
 
952
        catch_(a);
 
953
        report_error(self);
 
954
        ok = false;
 
955
        cont_(a);
 
956
        
 
957
        return_(ok);
 
958
}
 
959
 
 
960
//------------------------------------------------
 
961
static void report_global_error(int err, const char *message, int line)
 
962
{
 
963
        char line_no[32];
 
964
        
 
965
        if (global_errno)
 
966
                return;
 
967
        global_errno = err;
 
968
        cs_strcpy(MS_RESULT_MESSAGE_SIZE, global_err_message,  message);
 
969
        snprintf(line_no, 32, ": line %d", line);
 
970
        cs_strcat(MS_RESULT_MESSAGE_SIZE, global_err_message,  line_no);
 
971
}
 
972
 
 
973
//------------------------------------------------
 
974
static void clear_global_error()
 
975
{
 
976
        global_errno = 0;
 
977
        *global_err_message = 0;
 
978
}
 
979
 
 
980
//------------------------------------------------
 
981
pbms_bool pbms_library_init()
 
982
{
 
983
        clear_global_error();
 
984
        init_count++;
 
985
        
 
986
        if (init_count == 1 ) {
 
987
                CLEAR_SELF();
 
988
 
 
989
                CURLcode curl_code = curl_global_init(CURL_GLOBAL_ALL);
 
990
                if (curl_code) {
 
991
                        report_global_error(curl_code, curl_easy_strerror(curl_code) , __LINE__);
 
992
                        init_count  = 0;
 
993
                        return 0;
 
994
                }
 
995
                
 
996
                if (! CSThread::startUp()) {
 
997
                        report_global_error(ENOMEM, "CSThread::startUp() failed.", __LINE__);
 
998
                        init_count  = 0;
 
999
                        return 0;
 
1000
                }
 
1001
                
 
1002
                cs_init_memory();
 
1003
                
 
1004
                mslib_global_thread = new CSThread( NULL);
 
1005
                CSThread::setSelf(mslib_global_thread);
 
1006
                
 
1007
                CSThread *self = mslib_global_thread;
 
1008
                if (!mslib_global_thread) {
 
1009
                        report_global_error(ENOMEM, "new CSThread( NULL) failed.", __LINE__);
 
1010
                        init_count  = 0;
 
1011
                         CSThread::shutDown();
 
1012
                        return 0;
 
1013
                }
 
1014
        
 
1015
                try_(a) {
 
1016
                        pbms_thread_list = new CSThreadList();
 
1017
                }
 
1018
                catch_(a);
 
1019
                report_global_mse_error(self);
 
1020
                init_count  = 0;
 
1021
                mslib_global_thread->release();
 
1022
                mslib_global_thread = NULL;
 
1023
                cs_exit_memory();
 
1024
                CSThread::shutDown();
 
1025
                        
 
1026
                cont_(a);
 
1027
        }
 
1028
        
 
1029
        return(init_count > 0);
 
1030
}
 
1031
 
 
1032
//------------------------------------------------
 
1033
void pbms_library_end()
 
1034
{
 
1035
         
 
1036
        if (init_count == 1 ) {
 
1037
                init_count  = 0; 
 
1038
                        
 
1039
 
 
1040
                if (pbms_thread_list) {
 
1041
                        PBMS_ConHandle *con;
 
1042
                        while ((con = (PBMS_ConHandle *) pbms_thread_list->getFront())) {
 
1043
                                con->ms_setSelf();
 
1044
                                CSThread::detach(con);
 
1045
                        }
 
1046
                        CSThread::setSelf(mslib_global_thread);
 
1047
                        pbms_thread_list->release();
 
1048
                } else
 
1049
                        CSThread::setSelf(mslib_global_thread);
 
1050
 
 
1051
                
 
1052
                mslib_global_thread->release();
 
1053
                mslib_global_thread = NULL;
 
1054
                cs_exit_memory();
 
1055
                CSThread::shutDown();
 
1056
                
 
1057
                curl_global_cleanup();
 
1058
        }
 
1059
        
 
1060
        if (init_count > 0)
 
1061
                init_count--;
 
1062
}
 
1063
 
 
1064
//------------------------------------------------
 
1065
int pbms_errno(PBMS myhndl)
 
1066
{
 
1067
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1068
 
 
1069
        if (con) {      
 
1070
                return con->ms_errno;
 
1071
        }
 
1072
                
 
1073
        return global_errno;
 
1074
}
 
1075
 
 
1076
//------------------------------------------------
 
1077
const char *pbms_error(PBMS myhndl)
 
1078
{
 
1079
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1080
 
 
1081
        if (con) {      
 
1082
                return con->ms_err_message;
 
1083
        }
 
1084
                
 
1085
        return global_err_message;
 
1086
}
 
1087
 
 
1088
//------------------------------------------------
 
1089
PBMS pbms_connect(const char* host, unsigned int port, const char *database)
 
1090
{
 
1091
        PBMS_ConHandle *con = NULL;
 
1092
        CLEAR_SELF(); 
 
1093
        CLOBBER_PROTECT(con);
 
1094
        
 
1095
        clear_global_error();
 
1096
 
 
1097
        new_(con, PBMS_ConHandle());
 
1098
        
 
1099
        if ((!con) || !CSThread::attach(con)) {
 
1100
                report_global_error(ENOMEM, "new PBMS_Ref() failed.", __LINE__);
 
1101
                if (con) {
 
1102
                        con->release();
 
1103
                        con = NULL;
 
1104
                }
 
1105
        } else {
 
1106
                CSThread *self = con;
 
1107
                
 
1108
                try_(a) {
 
1109
                                con->ms_initConnection(host, port, database);
 
1110
                                con->ms_ping();
 
1111
                        }
 
1112
                catch_(a);
 
1113
                        report_global_mse_error(con);
 
1114
                        CSThread::detach(con);
 
1115
                        con = NULL;
 
1116
                        
 
1117
                cont_(a);
 
1118
        }
 
1119
        
 
1120
        return(con);
 
1121
}
 
1122
 
 
1123
//------------------------------------------------
 
1124
void pbms_close(PBMS myhndl)
 
1125
{
 
1126
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1127
        
 
1128
        //      This will kill the 'self' thread so do not try and do any exception handling.
 
1129
        con->ms_setSelf();
 
1130
        CSThread::detach(con); // This will also release the connection.
 
1131
}
 
1132
 
 
1133
//------------------------------------------------
 
1134
pbms_bool pbms_is_blob_reference(PBMS myhndl, const char *ref)
 
1135
{
 
1136
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1137
        pbms_bool ok = false;
 
1138
        
 
1139
        con->ms_setSelf();      
 
1140
        enter_();
 
1141
        
 
1142
        try_(a) {
 
1143
                MSBlobURLRec blob;
 
1144
                if (PBMSBlobURLTools::couldBeURL(ref, &blob)) 
 
1145
                        ok = true;
 
1146
        }
 
1147
        
 
1148
        catch_(a);
 
1149
        cont_(a);
 
1150
        return_(ok);
 
1151
}
 
1152
 
 
1153
//------------------------------------------------
 
1154
pbms_bool pbms_get_blob_size(PBMS myhndl, const char *ref, size_t *size)
 
1155
{
 
1156
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1157
        bool ok = false;
 
1158
        
 
1159
        con->ms_setSelf();      
 
1160
        enter_();
 
1161
        
 
1162
        try_(a) {
 
1163
                MSBlobURLRec blob;
 
1164
                if (PBMSBlobURLTools::couldBeURL(ref, &blob)) {
 
1165
                        *size = blob.bu_blob_size;
 
1166
                        ok = true;
 
1167
                } 
 
1168
#ifdef HAVE_ALIAS_SUPPORT
 
1169
                else { // Assume it is a BLOB alias
 
1170
                        CSVector *saved_metadata;
 
1171
                        CSString *data = NULL;
 
1172
                        
 
1173
                        saved_metadata = con->ms_headers.takeHeaders();
 
1174
                        try_(b) {
 
1175
                                con->ms_get_info(ref, true);
 
1176
                                data = con->ms_get_metadata(MS_BLOB_SIZE);
 
1177
                                *size =         atol(data->getCString());       
 
1178
                                data->release();
 
1179
                                ok = true;
 
1180
                        }
 
1181
                        catch_(b);
 
1182
                        con->report_error(self);
 
1183
                        cont_(b);
 
1184
                        con->ms_headers.setHeaders(saved_metadata);
 
1185
                }
 
1186
#else
 
1187
                con->report_error(MS_ERR_INCORRECT_URL, "Invalid BLOB URL");
 
1188
#endif
 
1189
        }
 
1190
        
 
1191
        catch_(a);
 
1192
        cont_(a);
 
1193
        return_(ok);
 
1194
}
 
1195
/*
 
1196
 * pbms_add_metadata() and pbms_clear_metadata() deal with metadata for outgoing BLOBs only.
 
1197
 */
 
1198
//------------------------------------------------
 
1199
pbms_bool pbms_add_metadata(PBMS myhndl, const char *name, const char *value)
 
1200
{
 
1201
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1202
        bool ok = false;
 
1203
        
 
1204
        con->ms_setSelf();      
 
1205
        enter_();
 
1206
        
 
1207
        try_(a) {
 
1208
                if (strlen(name) > MS_META_NAME_SIZE)
 
1209
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Metadata name too long.");
 
1210
                if (strlen(value) > MS_META_VALUE_SIZE)
 
1211
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Metadata value too long.");
 
1212
                
 
1213
                con->ms_metadata_out.addHeader(name, value);
 
1214
 
 
1215
                ok = true;
 
1216
        }
 
1217
        
 
1218
        catch_(a) {
 
1219
                con->report_error(self);
 
1220
        }
 
1221
        
 
1222
        cont_(a);
 
1223
        
 
1224
        return_(ok);
 
1225
}
 
1226
 
 
1227
//------------------------------------------------
 
1228
void pbms_clear_metadata(PBMS myhndl, const char *name)
 
1229
{
 
1230
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1231
        
 
1232
        con->ms_setSelf();      
 
1233
        enter_();
 
1234
        
 
1235
        try_(a) {
 
1236
                if (name)
 
1237
                        con->ms_metadata_out.removeHeader(name);
 
1238
                else
 
1239
                        con->ms_metadata_out.clearHeaders();
 
1240
        }
 
1241
        
 
1242
        catch_(a) {
 
1243
                con->report_error(self);
 
1244
        }
 
1245
        
 
1246
        cont_(a);
 
1247
        
 
1248
}
 
1249
 
 
1250
/*
 
1251
 * pbms_reset_metadata(), pbms_next_metadata() and pbms_get_metadata_value() deal with metadata for the last
 
1252
 * BLOB received on the connection.
 
1253
 */
 
1254
//------------------------------------------------
 
1255
unsigned int pbms_reset_metadata(PBMS myhndl)
 
1256
{
 
1257
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1258
        unsigned int count = 0;
 
1259
        
 
1260
        con->ms_setSelf();      
 
1261
        enter_();
 
1262
        
 
1263
        try_(a) {
 
1264
                count = con->ms_init_fetch();
 
1265
        }
 
1266
        
 
1267
        catch_(a) {
 
1268
                con->report_error(self);
 
1269
        }
 
1270
        
 
1271
        cont_(a);
 
1272
        
 
1273
        return_(count);
 
1274
}
 
1275
 
 
1276
//------------------------------------------------
 
1277
pbms_bool pbms_next_metadata(PBMS myhndl, char *name, char *value, size_t *v_size)
 
1278
{
 
1279
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1280
        bool ok = false;
 
1281
        size_t null_size = MS_META_VALUE_SIZE;
 
1282
        
 
1283
        CLOBBER_PROTECT(v_size);
 
1284
 
 
1285
        con->ms_setSelf();      
 
1286
        enter_();
 
1287
        
 
1288
        if (!v_size)
 
1289
                v_size = &null_size;
 
1290
                
 
1291
        try_(a) {
 
1292
                const char *m_name, *m_value;
 
1293
                ok = con->ms_next(&m_name, &m_value);
 
1294
                if (ok) {
 
1295
                        cs_strcpy(MS_META_NAME_SIZE, name, m_name);
 
1296
                        cs_strcpy(*v_size, value, m_value);
 
1297
 
 
1298
                        if (*v_size <= strlen(m_value)) 
 
1299
                                *v_size = strlen(m_value) +1;
 
1300
 
 
1301
                }
 
1302
        }
 
1303
        
 
1304
        catch_(a) {
 
1305
                con->report_error(self);
 
1306
        }
 
1307
        
 
1308
        cont_(a);
 
1309
        
 
1310
        return_(ok);
 
1311
}
 
1312
 
 
1313
//------------------------------------------------
 
1314
pbms_bool pbms_get_metadata_value(PBMS myhndl, const char *name, char *buffer, size_t *size)
 
1315
{
 
1316
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1317
        bool ok = false;
 
1318
        
 
1319
        con->ms_setSelf();      
 
1320
        enter_();
 
1321
        
 
1322
        try_(a) {
 
1323
                CSString *data = NULL;
 
1324
                data = con->ms_get_metadata(name);
 
1325
                if (data) {
 
1326
                        ok = true;
 
1327
                        cs_strcpy(*size, buffer, data->getCString());
 
1328
                        if (data->length() >= *size) {
 
1329
                                *size = data->length() +1;                              
 
1330
                        }
 
1331
                        data->release();
 
1332
                }
 
1333
        }
 
1334
        
 
1335
        catch_(a) {
 
1336
                con->report_error(self);
 
1337
        }
 
1338
        
 
1339
        cont_(a);
 
1340
        
 
1341
        return_(ok);
 
1342
}
 
1343
 
 
1344
//------------------------------------------------
 
1345
pbms_bool pbms_get_md5_digest(PBMS myhndl, char *md5_digest)
 
1346
{
 
1347
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1348
        bool ok = false;
 
1349
        
 
1350
        con->ms_setSelf();      
 
1351
        enter_();
 
1352
        
 
1353
        try_(a) {               
 
1354
                CSString *data = NULL;
 
1355
                data = con->ms_get_checksum();
 
1356
                if (!data)
 
1357
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "No checksum found.");
 
1358
                push_(data);
 
1359
                if (cs_hex_to_bin(16, md5_digest, data->length(), data->getCString()) != 16) {
 
1360
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Invalid MD5 Digest.");
 
1361
                } 
 
1362
                release_(data);
 
1363
                ok = true;
 
1364
        }
 
1365
        
 
1366
        catch_(a) {
 
1367
                con->report_error(self);
 
1368
        }
 
1369
        
 
1370
        cont_(a);
 
1371
        
 
1372
        return_(ok);
 
1373
}
 
1374
 
 
1375
//------------------------------------------------
 
1376
//------------------------------------------------
 
1377
pbms_bool pbms_put_data(PBMS myhndl, const char *table, const char *checksum, char *ref, size_t size, const unsigned char *data)
 
1378
{
 
1379
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1380
        
 
1381
        return con->ms_upLoadData(table, NULL, checksum, ref, size, data);
 
1382
}
 
1383
//------------------------------------------------
 
1384
pbms_bool pbms_put_data_cb(PBMS myhndl, const char *table, const char *checksum, char *ref, size_t size, PBMS_READ_CALLBACK_FUNC cb, void *caller_data)
 
1385
{
 
1386
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1387
        
 
1388
        return con->ms_upLoadData(table, NULL, checksum, ref, size, NULL, cb, caller_data);
 
1389
}
 
1390
 
 
1391
//------------------------------------------------
 
1392
pbms_bool pbms_get_data(PBMS myhndl, const char *ref, unsigned char *buffer, size_t buffer_size)
 
1393
{
 
1394
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1395
        
 
1396
        return con->ms_downLoadData(ref, buffer, buffer_size);
 
1397
}
 
1398
 
 
1399
//------------------------------------------------
 
1400
pbms_bool pbms_get_data_range(PBMS myhndl, const char *ref, size_t start_offset, size_t end_offset, unsigned char *buffer, size_t buffer_size, size_t *data_size)
 
1401
{
 
1402
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1403
        
 
1404
        con->ms_range_start = start_offset;
 
1405
        con->ms_range_end = end_offset;
 
1406
        
 
1407
        if (con->ms_downLoadData(ref, buffer, buffer_size)) {
 
1408
                if (data_size)
 
1409
                        *data_size = con->ms_getBuffer - buffer;
 
1410
                return true;
 
1411
        }
 
1412
  
 
1413
        return false;
 
1414
}
 
1415
 
 
1416
//------------------------------------------------
 
1417
pbms_bool pbms_get_data_cb(PBMS myhndl, const char *ref, PBMS_WRITE_CALLBACK_FUNC cb, void *caller_data)
 
1418
{
 
1419
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1420
        
 
1421
        return con->ms_downLoadData(ref, NULL, 0, cb, caller_data);
 
1422
}
 
1423
 
 
1424
//------------------------------------------------
 
1425
pbms_bool pbms_get_data_range_cb(PBMS myhndl, const char *ref, size_t start_offset, size_t end_offset, PBMS_WRITE_CALLBACK_FUNC cb, void *caller_data)
 
1426
{
 
1427
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1428
        
 
1429
        con->ms_range_start = start_offset;
 
1430
        con->ms_range_end = end_offset;
 
1431
 
 
1432
        return con->ms_downLoadData(ref, NULL, 0, cb, caller_data);
 
1433
}
 
1434
 
 
1435
//------------------------------------------------
 
1436
pbms_bool pbms_get_info(PBMS myhndl, const char *ref)
 
1437
{
 
1438
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1439
        pbms_bool ok = true;
 
1440
        
 
1441
        con->ms_setSelf();      
 
1442
        enter_();
 
1443
        
 
1444
        try_(a) {       
 
1445
                con->ms_get_info(ref, false);                   
 
1446
        }
 
1447
        
 
1448
        catch_(a);
 
1449
        con->report_error(self);
 
1450
        ok = false;
 
1451
        
 
1452
        cont_(a);
 
1453
        
 
1454
        return_(ok);
 
1455
}
 
1456
 
 
1457
//------------------------------------------------
 
1458
pbms_bool pbms_set_option(PBMS myhndl, enum pbms_option option, const void *in_value)
 
1459
{
 
1460
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1461
        pbms_bool ok = true;
 
1462
        
 
1463
        con->ms_setSelf();      
 
1464
        enter_();
 
1465
        
 
1466
        try_(a) {
 
1467
 
 
1468
                switch (option) {
 
1469
                        case PBMS_OPTION_DATABASE: {
 
1470
                                CSString *database = CSString::newString((char *)in_value);
 
1471
                                con->ms_database->release();
 
1472
                                con->ms_database = database;
 
1473
                                break;
 
1474
                        }
 
1475
                                
 
1476
                        case PBMS_OPTION_TRANSMITION_TIMEOUT:
 
1477
                                con->ms_transmition_timeout = *((unsigned int*)in_value);
 
1478
                                break; 
 
1479
                        
 
1480
                        case PBMS_OPTION_HOST:
 
1481
                        case PBMS_OPTION_PORT:
 
1482
                                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Option is ReadOnly.");
 
1483
                                break;
 
1484
 
 
1485
                        default:
 
1486
                                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Unknown Option.");
 
1487
                }
 
1488
        
 
1489
        }
 
1490
        
 
1491
        catch_(a);
 
1492
        con->report_error(self);
 
1493
        ok = false;
 
1494
        cont_(a);
 
1495
        return_(ok);
 
1496
}
 
1497
 
 
1498
//------------------------------------------------
 
1499
pbms_bool pbms_get_option(PBMS myhndl, enum pbms_option option, void *out_value)
 
1500
{
 
1501
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1502
        pbms_bool ok = true;
 
1503
        
 
1504
        con->ms_setSelf();      
 
1505
        enter_();
 
1506
        
 
1507
        try_(a) {
 
1508
 
 
1509
                switch (option) {
 
1510
                        case PBMS_OPTION_DATABASE:
 
1511
                                *((const char**)out_value) = con->ms_database->getCString();
 
1512
                                break;
 
1513
                        
 
1514
                        case PBMS_OPTION_TRANSMITION_TIMEOUT:
 
1515
                                *((unsigned int*)out_value) = con->ms_transmition_timeout;
 
1516
                                break; 
 
1517
                        
 
1518
                        case PBMS_OPTION_HOST:
 
1519
                                *((const char**)out_value) = con->ms_host->getCString();
 
1520
                                break;
 
1521
                                
 
1522
                        case PBMS_OPTION_PORT:
 
1523
                                *((unsigned int*)out_value) = con->ms_port;
 
1524
                                break;
 
1525
 
 
1526
                        default:
 
1527
                                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Unknown Option.");
 
1528
                }
 
1529
        
 
1530
        }
 
1531
        
 
1532
        catch_(a);
 
1533
        con->report_error(self);
 
1534
        ok = false;
 
1535
        cont_(a);
 
1536
        return_(ok);
 
1537
}
 
1538
 
 
1539
 
 
1540