~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/pbmslib.cc

Added the PBMS daemon plugin.

(Augen zu und durch!)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 *  PrimeBase Media Stream (PBMS)
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * 2008-09-10   Barry Leslie
 
20
 *
 
21
 * H&G2JCtL
 
22
 */
 
23
#include "CSConfig.h"
 
24
#include <inttypes.h>
 
25
 
 
26
#include <curl/curl.h>
 
27
#include <string.h>
 
28
 
 
29
#ifdef DRIZZLED
 
30
#include <libdrizzle/drizzle_client.h>
 
31
#define MYSQL drizzle_con_st
 
32
#define MYSQL_RES drizzle_result_st
 
33
#define MYSQL_ROW DRIZZLE_ROW
 
34
 
 
35
#define mysql_query                     drizzle_query
 
36
#define mysql_store_result      drizzle_store_result
 
37
#define mysql_errno                     drizzle_errno
 
38
#define mysql_error                     drizzle_error
 
39
#define mysql_num_rows          drizzle_num_rows
 
40
#define mysql_fetch_row         drizzle_fetch_row
 
41
#define mysql_free_result       drizzle_free_result
 
42
 
 
43
#else
 
44
#include "mysql.h"
 
45
#endif
 
46
 
 
47
#include "pbmslib.h"
 
48
#include "pbms.h"
 
49
#include "CSGlobal.h"
 
50
#include "CSThread.h"
 
51
#include "CSString.h"
 
52
#include "CSStrUtil.h"
 
53
//#include "CSSocket.h"
 
54
#include "CSHTTPStream.h"
 
55
#include "CSMd5.h"
 
56
#include "CSS3Protocol.h"
 
57
#include "Util_ms.h"
 
58
#include "metadata_ms.h"
 
59
 
 
60
#define CLEAR_SELF()    CSThread::setSelf(NULL)
 
61
#define MAX_STMT_SIZE   1024
 
62
 
 
63
static int global_errno;
 
64
static char global_err_message[MS_RESULT_MESSAGE_SIZE];
 
65
 
 
66
static unsigned long init_count = 0;
 
67
static CSThreadList     *pbms_thread_list = NULL;
 
68
static  CSThread *mslib_global_thread = NULL;
 
69
 
 
70
static void report_global_mse_error(CSThread *thd)
 
71
{
 
72
        global_errno = thd->myException.getErrorCode();
 
73
        cs_strcpy(MS_RESULT_MESSAGE_SIZE, global_err_message,  thd->myException.getMessage());
 
74
}
 
75
 
 
76
#define DFLT_CONNECTION_TIMEOUT 10      // Changing this required a documentation update.
 
77
 
 
78
#define THROW_CURL_IF(v) { if (v) CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);}
 
79
//======================================
 
80
static size_t receive_data(void *ptr, size_t size, size_t nmemb, void *stream);
 
81
static size_t receive_header(void *ptr, size_t size, size_t nmemb, void *stream);
 
82
static size_t send_callback(void *ptr, size_t size, size_t nmemb, void *stream);
 
83
//--------------------------------------------
 
84
class  PBMS_ConHandle:public CSThread {
 
85
        public:
 
86
        
 
87
        // Fields that must be freed when the object is destroyed.
 
88
        CSString                        *ms_host;
 
89
        CSString                        *ms_database;
 
90
        struct curl_slist       *ms_header_list;        // A curl list of headers to be sent with the next PUT.
 
91
        struct curl_slist       *ms_info_only;  // A curl list of headers to be sent with Get.
 
92
        struct curl_slist       *ms_ping_header;        // A curl list of headers to be sent with ping.
 
93
        CSStringBuffer          *ms_url_str;
 
94
        CURL                            *ms_curl;       
 
95
        CSStringBuffer          *ms_buffer;
 
96
        CSStringBuffer          *ms_errorReply;
 
97
        CSS3Protocol            *ms_cloud;
 
98
        uint64_t                        ms_range_start;
 
99
        uint64_t                        ms_range_end;
 
100
 
 
101
        PBMS_ConHandle():
 
102
                CSThread(pbms_thread_list),
 
103
                ms_host(NULL),
 
104
                ms_database(NULL),
 
105
                ms_port(0),
 
106
                ms_transmition_timeout(0),
 
107
                ms_curl(NULL),
 
108
                ms_header_list(NULL),
 
109
                ms_info_only(NULL),
 
110
                ms_ping_header(NULL),
 
111
                ms_buffer(NULL),
 
112
                ms_errorReply(NULL),
 
113
                ms_url_str(NULL),
 
114
                ms_cloud(NULL),
 
115
                ms_errno(0),
 
116
                ms_throw_error(false),
 
117
                ms_range_start(1),
 
118
                ms_range_end(0)
 
119
        {
 
120
                }
 
121
                
 
122
        ~PBMS_ConHandle() {
 
123
                if (ms_curl) 
 
124
                        curl_easy_cleanup(ms_curl);
 
125
                        
 
126
                if (ms_cloud) 
 
127
                        ms_cloud->release();
 
128
                        
 
129
                if (ms_database) 
 
130
                        ms_database->release();
 
131
                        
 
132
                if (ms_host) 
 
133
                        ms_host->release();
 
134
                        
 
135
                if (ms_buffer) 
 
136
                        ms_buffer->release();
 
137
                        
 
138
                if (ms_errorReply) 
 
139
                        ms_errorReply->release();
 
140
                        
 
141
                if (ms_url_str) 
 
142
                        ms_url_str->release();
 
143
                        
 
144
                if (ms_header_list) 
 
145
                        curl_slist_free_all(ms_header_list);
 
146
                        
 
147
                if (ms_info_only) 
 
148
                        curl_slist_free_all(ms_info_only);
 
149
                        
 
150
                if(ms_ping_header)
 
151
                        curl_slist_free_all(ms_ping_header);
 
152
                        
 
153
                ms_headers.clearHeaders();
 
154
                ms_metadata_out.clearHeaders();
 
155
                        
 
156
        }
 
157
        unsigned int            ms_replyStatus;
 
158
        CSHTTPHeaders           ms_headers; 
 
159
        CSHTTPHeaders           ms_metadata_out;
 
160
        u_int                           ms_next_header;
 
161
        u_int                           ms_max_header;  
 
162
        unsigned int            ms_port;
 
163
        unsigned int            ms_transmition_timeout; // In the future this may have some effect but for now it is always be 0 (no timeout).
 
164
        unsigned int            ms_url_base_len;
 
165
        bool                            ms_throw_error; // Gets set if an exception occurs in a callback.
 
166
 
 
167
        int                                     ms_errno;
 
168
        char                            ms_err_message[MS_RESULT_MESSAGE_SIZE];
 
169
        
 
170
        
 
171
        char ms_curl_error[CURL_ERROR_SIZE];
 
172
        
 
173
        size_t ms_DataToBeTransfered;
 
174
        // Get data caller parameters:
 
175
        u_char                          *ms_getBuffer;
 
176
        size_t                          ms_getBufferSize;
 
177
        PBMS_WRITE_CALLBACK_FUNC        ms_writeCB;
 
178
        void                            *ms_getCBData;
 
179
        
 
180
        void set_downLoadUserData(u_char *buffer, size_t buffer_size, PBMS_WRITE_CALLBACK_FUNC cb = NULL, void *caller_data = NULL)     
 
181
        {
 
182
                ms_DataToBeTransfered = buffer_size;
 
183
                ms_getBuffer = buffer;
 
184
                ms_getBufferSize = buffer_size;
 
185
                ms_writeCB = cb;
 
186
                ms_getCBData = caller_data;             
 
187
        }
 
188
        
 
189
        // Put data caller parameters:
 
190
        const u_char            *ms_putData;
 
191
        size_t                          ms_putDataLen;
 
192
        size_t                          ms_putDataOffset;
 
193
        PBMS_READ_CALLBACK_FUNC ms_readCB;
 
194
        void                            *ms_putCBData;
 
195
        
 
196
        void set_upLoadUserData(const u_char *buffer, size_t size, PBMS_READ_CALLBACK_FUNC cb = NULL, void *caller_data = NULL) 
 
197
        {
 
198
                ms_DataToBeTransfered = size;
 
199
                ms_putData = buffer;
 
200
                ms_putDataLen = size;
 
201
                ms_putDataOffset =0;
 
202
                ms_readCB = cb;
 
203
                ms_putCBData = caller_data;
 
204
        }
 
205
        
 
206
        
 
207
        void ms_initConnection(const char* host, unsigned int port, const char* database)
 
208
        {
 
209
                ms_curl = curl_easy_init();
 
210
                if (!ms_curl)
 
211
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_easy_init() failed.");
 
212
                
 
213
                if (curl_easy_setopt(ms_curl, CURLOPT_ERRORBUFFER, ms_curl_error))
 
214
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_easy_setopt(CURLOPT_ERRORBUFFER) failed.");
 
215
                
 
216
                // Uncomment this line to trace network action during request. Very Usefull!!
 
217
                //curl_easy_setopt(ms_curl, CURLOPT_VERBOSE, 1L);
 
218
                
 
219
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_TCP_NODELAY, 1L));
 
220
 
 
221
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_NOPROGRESS, 1L));
 
222
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_WRITEFUNCTION, receive_data));
 
223
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_READFUNCTION, send_callback));  
 
224
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HEADERFUNCTION, receive_header));
 
225
                
 
226
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_WRITEDATA, this));
 
227
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_READDATA, this));
 
228
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_WRITEHEADER, this));
 
229
 
 
230
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_FOLLOWLOCATION, 1L)); // Follow redirects.
 
231
 
 
232
                ms_port = port;
 
233
                pbms_api_owner = true;
 
234
                ms_host = CSString::newString(host);
 
235
                ms_database = CSString::newString(database);
 
236
                
 
237
                ms_url_str = new CSStringBuffer(50);    
 
238
                ms_url_str->append("http://");
 
239
                ms_url_str->append(host);
 
240
                ms_url_str->append(":");
 
241
                ms_url_str->append(port);
 
242
                ms_url_str->append("/");
 
243
                ms_url_base_len =       ms_url_str->length();
 
244
                        
 
245
                ms_buffer = new CSStringBuffer(50);     
 
246
 
 
247
                ms_info_only = curl_slist_append(ms_info_only, MS_BLOB_INFO_REQUEST ": yes");
 
248
                if (!ms_info_only) 
 
249
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
 
250
                        
 
251
                ms_buffer->append(MS_PING_REQUEST": ");
 
252
                        ms_buffer->append(database);
 
253
                ms_ping_header = curl_slist_append(ms_ping_header, ms_buffer->getCString());
 
254
                if (!ms_ping_header) 
 
255
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
 
256
 
 
257
                ms_buffer->setLength(0);
 
258
 
 
259
        }
 
260
        
 
261
        void ms_ping();
 
262
        
 
263
        void ms_setSelf()
 
264
        {
 
265
                setSelf(this);
 
266
        }
 
267
 
 
268
        void ms_init_put_blob(curl_off_t size, const char *table, const char *alias, const char *checksum, bool use_cloud);
 
269
        
 
270
        void ms_init_get_blob(const char *ref, bool is_alias, bool info_only);
 
271
        
 
272
        void ms_get_info(const char *ref, bool is_alias);
 
273
        void ms_sendCloudBLOB(size_t size)      ;
 
274
        
 
275
        pbms_bool ms_downLoadData(const char *ref, u_char *buffer, size_t buffer_size, PBMS_WRITE_CALLBACK_FUNC cb = NULL, void *caller_data = NULL);
 
276
 
 
277
        pbms_bool ms_upLoadData(const char *table, const char *alias, const char *checksum, char *ref, size_t size, const u_char *data, PBMS_READ_CALLBACK_FUNC cb = NULL, void *caller_data = NULL);
 
278
        
 
279
        u_int ms_init_fetch() {ms_next_header =0; return ms_max_header = ms_headers.numHeaders();}
 
280
        
 
281
        bool ms_next(const char **name, const char **value) 
 
282
        {
 
283
                if (ms_next_header >= ms_max_header)
 
284
                        return false;
 
285
                        
 
286
                CSHeader *header = ms_headers.getHeader(ms_next_header++);
 
287
                *name = header->getNameCString();
 
288
                *value = header->getValueCString();
 
289
                header->release();
 
290
                return true;
 
291
        }
 
292
        
 
293
        void dump_headers() 
 
294
        {
 
295
                u_int i = 0;
 
296
                CSHeader *header;
 
297
                printf("Headers:\n");
 
298
                printf("---------------------------------------\n");
 
299
                while  ( (header = ms_headers.getHeader(i++)) ) {
 
300
                        printf("%s : %s\n", header->getNameCString(), header->getValueCString());
 
301
                        header->release();                      
 
302
                }
 
303
                printf("---------------------------------------\n");
 
304
        }
 
305
                                        
 
306
        void ms_addS3HeadersHeaders(CSVector *s3Headers);
 
307
 
 
308
        CSString        *ms_get_metadata(const char *name) 
 
309
        { 
 
310
                return ms_headers.getHeaderValue(name);
 
311
        }
 
312
                                        
 
313
        CSString        *ms_get_alias() {return ms_get_metadata(MS_ALIAS_TAG);}
 
314
 
 
315
        CSString        *ms_get_checksum() {return ms_get_metadata(MS_CHECKSUM_TAG);}
 
316
 
 
317
        void report_error(CSThread *self)
 
318
        {
 
319
                ms_errno = self->myException.getErrorCode();
 
320
                cs_strcpy(MS_RESULT_MESSAGE_SIZE, ms_err_message,  self->myException.getMessage());
 
321
        }
 
322
 
 
323
        void report_error(int err, const char *msg)
 
324
        {
 
325
                ms_errno = err;
 
326
                cs_strcpy(MS_RESULT_MESSAGE_SIZE, ms_err_message,  msg);
 
327
        }
 
328
 
 
329
        void throw_http_reply_exception();
 
330
        
 
331
        void check_reply_status() 
 
332
        {
 
333
                switch (ms_replyStatus) {
 
334
                        case 200:
 
335
                        //case 301: // Moved Permanently
 
336
                        //case 307: // Temporary Redirect
 
337
                                break;
 
338
                        default:
 
339
                                throw_http_reply_exception();
 
340
                }
 
341
                
 
342
        }
 
343
        
 
344
        void ms_getCloudHeaders() 
 
345
        {
 
346
                CSString *value = NULL;
 
347
                
 
348
                enter_();
 
349
                
 
350
                // Get the S3 server
 
351
                value = ms_headers.getHeaderValue(MS_CLOUD_SERVER);
 
352
                if (!value) {
 
353
                        if (ms_cloud)
 
354
                                ms_cloud->release();
 
355
                        ms_cloud = NULL;
 
356
                        exit_();
 
357
                }
 
358
                
 
359
                push_(value);
 
360
                if (!ms_cloud)
 
361
                        new_(ms_cloud, CSS3Protocol());
 
362
                        
 
363
                // Remove the cloud headers so they are not visable to the caller.
 
364
                ms_headers.removeHeader(MS_CLOUD_SERVER); 
 
365
                
 
366
                ms_cloud->s3_setServer(value->getCString());
 
367
                release_(value);                
 
368
                        
 
369
                // Get the S3 public key
 
370
                value = ms_headers.getHeaderValue(MS_CLOUD_KEY);
 
371
                if (!value)
 
372
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Missing S3 public key in reply.");
 
373
                
 
374
                push_(value);
 
375
                ms_headers.removeHeader(MS_CLOUD_KEY);
 
376
                ms_cloud->s3_setPublicKey(value->getCString());
 
377
                release_(value);                
 
378
 
 
379
                exit_();                                
 
380
        }
 
381
 
 
382
};
 
383
 
 
384
// CURL callback functions:
 
385
////////////////////////////
 
386
static size_t receive_data(void *vptr, size_t objs, size_t obj_size, void *v_con)
 
387
{
 
388
        PBMS_ConHandle *con = (PBMS_ConHandle*) v_con;
 
389
        size_t data_len = objs * obj_size;
 
390
        char *ptr = (char*)vptr;
 
391
 
 
392
        if (con->ms_replyStatus >= 400) { // Collect the error reply.
 
393
                enter_();
 
394
                try_(a) {
 
395
                        if (!con->ms_errorReply)
 
396
                                con->ms_errorReply = new CSStringBuffer(50);            
 
397
                        con->ms_errorReply->append(ptr, data_len);
 
398
                }
 
399
                catch_(a);
 
400
                con->ms_throw_error = true;
 
401
                data_len = -1;
 
402
                
 
403
                cont_(a);
 
404
                return_(data_len);
 
405
        }
 
406
        
 
407
        if (data_len > con->ms_DataToBeTransfered) { // This should never happen.
 
408
                CSException::RecordException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Blob data overflow.");
 
409
                con->ms_throw_error = true;
 
410
                return -1;
 
411
        }
 
412
        
 
413
        con->ms_DataToBeTransfered -= data_len;
 
414
        if (con->ms_writeCB) 
 
415
                return (con->ms_writeCB)(con->ms_getCBData, ptr, data_len, false); 
 
416
                                
 
417
        memcpy(con->ms_getBuffer, ptr, data_len);
 
418
        con->ms_getBuffer += data_len;
 
419
        
 
420
        return data_len;
 
421
}
 
422
 
 
423
#define IS_REDIRECT(s) ((s >= 300) && (s < 400))
 
424
//----------------------
 
425
static size_t receive_header(void *header, size_t objs, size_t obj_size, void *v_con)
 
426
{
 
427
        PBMS_ConHandle *con = (PBMS_ConHandle*) v_con;
 
428
        size_t size = objs * obj_size;
 
429
        char *end, *ptr = (char*) header, *value;
 
430
        const char *name;
 
431
        u_int name_len, value_len;
 
432
        
 
433
        end = ptr + size;
 
434
        if (*(end -2) == '\r' && *(end -1) == '\n')
 
435
                end -=2;
 
436
                
 
437
        while ((end != ptr) && (*ptr == ' ')) ptr++;
 
438
        if (end == ptr)
 
439
                return size;
 
440
        
 
441
        // Get the reply status.        
 
442
        if (((!con->ms_replyStatus) || (con->ms_replyStatus == 100) || IS_REDIRECT(con->ms_replyStatus) ) && !strncasecmp(ptr, "HTTP", 4)) {
 
443
                char status[4];
 
444
                while ((end != ptr) && (*ptr != ' ')) ptr++; // skip HTTP stuff
 
445
                while ((end != ptr) && (*ptr == ' ')) ptr++; // find the start of eh status code.
 
446
                if (end == ptr)
 
447
                        return size;
 
448
                        
 
449
                if (end < (ptr +3)) // expecting a 3 digit status code.
 
450
                        return size;
 
451
                        
 
452
                memcpy(status, ptr, 3);
 
453
                status[3] = 0;
 
454
                
 
455
                con->ms_replyStatus = atoi(status);
 
456
        }
 
457
        
 
458
        name = ptr;
 
459
        while ((end != ptr) && (*ptr != ':')) ptr++;
 
460
        if (end == ptr)
 
461
                return size;
 
462
        name_len = ptr - name;
 
463
        
 
464
        ptr++; 
 
465
        while ((end != ptr) && (*ptr == ' ')) ptr++;
 
466
        if (end == ptr)
 
467
                return size;
 
468
        
 
469
        value = ptr;
 
470
        value_len = end - value;
 
471
        
 
472
        while (name[name_len-1] == ' ') name_len--;
 
473
        while (value[value_len-1] == ' ') value_len--;
 
474
        
 
475
        if (!strncasecmp(name, "Content-Length", 14)) {
 
476
                size_t length;
 
477
                char len[32];
 
478
                memcpy(len, value, (value_len > 31)?31:value_len);
 
479
                len[(value_len > 31)?31:value_len] = 0;
 
480
                
 
481
                length = atoll(len);
 
482
                // If there is no callback then the data size is limited
 
483
                // to the GetBuffer size.
 
484
                if (con->ms_writeCB || (length < con->ms_getBufferSize))
 
485
                        con->ms_DataToBeTransfered = length;
 
486
                        
 
487
        }
 
488
        
 
489
 
 
490
        enter_();
 
491
        try_(a) {
 
492
                if (!strncasecmp(name, "ETag", 4)) { // S3 checksum
 
493
                        name = MS_CHECKSUM_TAG;
 
494
                        name_len = strlen(MS_CHECKSUM_TAG);
 
495
                        // Strip any quotes
 
496
                        if (*value == '"') {
 
497
                                value++;value_len--;
 
498
                        }
 
499
                        if (value[value_len-1] == '"') {
 
500
                                value_len--;
 
501
                        }
 
502
                        con->ms_headers.removeHeader(MS_CHECKSUM_TAG);
 
503
                }
 
504
                con->ms_headers.addHeader(name, name_len, value, value_len);
 
505
        }
 
506
        
 
507
        catch_(a);
 
508
        con->ms_throw_error = true;
 
509
        return_(-1);
 
510
                
 
511
        cont_(a);
 
512
        return_(size);
 
513
}
 
514
 
 
515
//----------------------
 
516
static size_t send_callback(void *ptr, size_t objs, size_t obj_size, void *v_con)
 
517
{
 
518
        PBMS_ConHandle *con = (PBMS_ConHandle*) v_con;
 
519
        char *buffer = (char*) ptr;
 
520
        size_t data_sent = 0, buffer_size = objs * obj_size;
 
521
 
 
522
        if (con->ms_putDataLen == 0)
 
523
                return 0;
 
524
                
 
525
        if (con->ms_readCB) 
 
526
                data_sent = (con->ms_readCB)(con->ms_putCBData, buffer , buffer_size, false);           
 
527
        else {
 
528
                data_sent = (buffer_size < con->ms_putDataLen)? buffer_size: con->ms_putDataLen;
 
529
                memcpy(buffer,con->ms_putData, data_sent);
 
530
                con->ms_putData += data_sent;
 
531
        }
 
532
        con->ms_putDataLen -= data_sent;
 
533
        
 
534
        return data_sent;
 
535
}
 
536
 
 
537
#define CONTENT_TYPE "Content-Type"
 
538
//------------------------------------------------
 
539
void PBMS_ConHandle::ms_init_put_blob(curl_off_t size, const char *table, const char *alias, const char *checksum, bool use_cloud)
 
540
{
 
541
        char buffer[MS_META_VALUE_SIZE + MS_META_NAME_SIZE +2];
 
542
        int buffer_size = MS_META_VALUE_SIZE + MS_META_NAME_SIZE +2;
 
543
        bool have_content_type = false;
 
544
        
 
545
        ms_url_str->setLength(ms_url_base_len);
 
546
 
 
547
        ms_url_str->append(ms_database->getCString());
 
548
        if (table) {
 
549
                ms_url_str->append("/");
 
550
                ms_url_str->append(table);
 
551
        }
 
552
        
 
553
        // Remove any old headers
 
554
        if (ms_header_list) {
 
555
                curl_slist_free_all(ms_header_list);
 
556
                ms_header_list = NULL;
 
557
        }
 
558
        
 
559
        // Add metadata headers.
 
560
        u_int i = 0;
 
561
        CSHeader *header;
 
562
        while  ( (header = ms_metadata_out.getHeader(i++)) ) {
 
563
                cs_strcpy(buffer_size, buffer, header->getNameCString());
 
564
                if (!strcasecmp(  buffer, CONTENT_TYPE))
 
565
                        have_content_type = true;
 
566
                cs_strcat(buffer_size, buffer, ':');
 
567
                cs_strcat(buffer_size, buffer, header->getValueCString());
 
568
                header->release();                      
 
569
                ms_header_list = curl_slist_append(ms_header_list, buffer);
 
570
                if (!ms_header_list) 
 
571
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
 
572
        }
 
573
        
 
574
        if (!have_content_type) {
 
575
                // Prevent CURLOPT_POST from adding a content type. 
 
576
                ms_header_list = curl_slist_append(ms_header_list, CONTENT_TYPE ":");
 
577
                if (!ms_header_list) 
 
578
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
 
579
        }
 
580
                
 
581
        if (checksum) { 
 
582
                cs_strcpy(buffer_size, buffer, MS_CHECKSUM_TAG ":");
 
583
                cs_strcat(buffer_size, buffer, checksum);
 
584
                
 
585
                ms_header_list = curl_slist_append(ms_header_list, buffer);
 
586
                if (!ms_header_list) 
 
587
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
 
588
        }
 
589
                
 
590
        
 
591
#ifdef HAVE_ALIAS_SUPPORT
 
592
        if (alias) {
 
593
                if (strlen(alias) > MS_META_VALUE_SIZE) 
 
594
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "BLOB alias name too long.");
 
595
 
 
596
                cs_strcpy(buffer_size, buffer, MS_ALIAS_TAG ":");
 
597
                cs_strcat(buffer_size, buffer, alias);
 
598
                
 
599
                ms_header_list = curl_slist_append(ms_header_list, buffer);
 
600
                if (!ms_header_list) 
 
601
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
 
602
        }
 
603
#endif
 
604
        
 
605
        if (use_cloud) {
 
606
                cs_strcpy(buffer_size, buffer, MS_BLOB_SIZE ":");
 
607
                snprintf(buffer + strlen(buffer), buffer_size - strlen(buffer), "%"PRIu64"", size);
 
608
                ms_header_list = curl_slist_append(ms_header_list, buffer);
 
609
                if (!ms_header_list) 
 
610
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
 
611
                        
 
612
                size = 0; // The BLOB data is not being sent to the PBMS server
 
613
        }
 
614
        
 
615
        // Using CURLOPT_UPLOAD is VERY slow!
 
616
        //THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_INFILESIZE_LARGE, size));
 
617
        //THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_UPLOAD, 1L));
 
618
        THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_POSTFIELDSIZE_LARGE, size));
 
619
        THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_POST, 1L));
 
620
 
 
621
        THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, ms_header_list));
 
622
        THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_URL, ms_url_str->getCString() ));       
 
623
                
 
624
        ms_replyStatus = 0;
 
625
        ms_headers.clearHeaders();
 
626
        if (ms_errorReply)
 
627
                ms_errorReply->setLength(0);
 
628
}
 
629
        
 
630
 
 
631
//------------------------------------------------
 
632
void PBMS_ConHandle::ms_init_get_blob(const char *ref, bool is_alias, bool info_only)
 
633
{
 
634
        MSBlobURLRec blob;
 
635
        
 
636
        ms_url_str->setLength(ms_url_base_len);
 
637
        
 
638
#ifdef HAVE_ALIAS_SUPPORT
 
639
        if (is_alias || !ms_parse_blob_url(&blob, ref)) {
 
640
                ms_url_str->append(ms_database->getCString());
 
641
                ms_url_str->append("/");
 
642
        }
 
643
#endif
 
644
        ms_url_str->append((char *) ref);
 
645
        
 
646
        //THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_UPLOAD, 0L));
 
647
        THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_POST, 0L));
 
648
        THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, (info_only)?ms_info_only: NULL));
 
649
        THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_URL, ms_url_str->getCString()));
 
650
        
 
651
        // NOTE: range 0-0 is valid, it returns the first byte.
 
652
        if (ms_range_start <= ms_range_end) {
 
653
                char range[80];
 
654
                snprintf(range, 80, "%"PRIu64"-%"PRIu64"", ms_range_start, ms_range_end);
 
655
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_RANGE, range));
 
656
                ms_range_start = 1;
 
657
                ms_range_end = 0;
 
658
        } else {
 
659
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_RANGE, NULL));
 
660
        }
 
661
        
 
662
        ms_replyStatus = 0;
 
663
        ms_headers.clearHeaders();
 
664
        if (ms_errorReply)
 
665
                ms_errorReply->setLength(0);
 
666
}
 
667
        
 
668
//------------------------------------------------
 
669
void PBMS_ConHandle::ms_get_info(const char *ref, bool is_alias)
 
670
{
 
671
        enter_();
 
672
        ms_init_get_blob(ref, is_alias, true);
 
673
        
 
674
        if (curl_easy_perform(ms_curl) && !ms_throw_error)
 
675
                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
 
676
        
 
677
        if (ms_throw_error)
 
678
                throw_();
 
679
                
 
680
        check_reply_status();
 
681
        
 
682
        exit_();
 
683
}
 
684
 
 
685
//------------------------------------------------
 
686
void PBMS_ConHandle::ms_ping()
 
687
{
 
688
        enter_();
 
689
        ms_url_str->setLength(ms_url_base_len);
 
690
        
 
691
//      THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_UPLOAD, 0L));
 
692
        THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_POST, 0L));
 
693
        THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, ms_ping_header));
 
694
        THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_URL, ms_url_str->getCString()));
 
695
        
 
696
        ms_replyStatus = 0;
 
697
        ms_headers.clearHeaders();
 
698
        if (ms_errorReply)
 
699
                ms_errorReply->setLength(0);
 
700
 
 
701
        if (curl_easy_perform(ms_curl) && !ms_throw_error)
 
702
                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
 
703
        
 
704
        if (ms_throw_error)
 
705
                throw_();
 
706
                
 
707
        check_reply_status();
 
708
        
 
709
        // Check to see if the BLOBs in the database are stored in a cloud: 
 
710
        CSString *cloud_server;
 
711
        cloud_server = ms_headers.getHeaderValue(MS_CLOUD_SERVER);
 
712
        if (!cloud_server) {
 
713
                if (ms_cloud)
 
714
                        ms_cloud->release();
 
715
                ms_cloud = NULL;
 
716
        } else {
 
717
                cloud_server->release();
 
718
                if (!ms_cloud)
 
719
                        new_(ms_cloud, CSS3Protocol());
 
720
        }
 
721
                
 
722
        exit_();
 
723
}
 
724
 
 
725
//------------------------------------------------
 
726
void PBMS_ConHandle::throw_http_reply_exception()
 
727
{
 
728
        CSString *reply = NULL, *error_text = NULL;
 
729
        
 
730
        enter_();
 
731
        
 
732
        try_(a) {
 
733
                size_t size = 0;
 
734
                 //dump_headers();
 
735
                 
 
736
                if (ms_errorReply)
 
737
                        size = ms_errorReply->length();
 
738
                
 
739
                if (!size) {
 
740
                        error_text = CSString::newString("Missing HTTP reply: possible Media Stream engine connection failure.");
 
741
                } else {
 
742
                        u_int start, end;
 
743
                
 
744
                        reply = CSString::newString(ms_errorReply);
 
745
                        ms_errorReply = NULL;
 
746
                        
 
747
                        start = reply->locate(EXCEPTION_REPLY_MESSAGE_PREFIX_TAG, 1);
 
748
                        start += strlen(EXCEPTION_REPLY_MESSAGE_PREFIX_TAG);
 
749
                        end = reply->locate(EXCEPTION_REPLY_MESSAGE_SUFFIX_TAG, 1); 
 
750
                        if (start < end)
 
751
                                error_text = reply->substr(start, end - start);
 
752
                        else {
 
753
                                error_text = reply;
 
754
                                reply->retain();
 
755
                        }
 
756
                }
 
757
                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, error_text->getCString());
 
758
        }
 
759
        
 
760
        finally_(a) {
 
761
                if (reply) reply->release();
 
762
                if (error_text) error_text->release();
 
763
        }       
 
764
        cont_(a);
 
765
        
 
766
        exit_();
 
767
}
 
768
 
 
769
//------------------------------------------------
 
770
pbms_bool PBMS_ConHandle::ms_downLoadData(const char *ref, u_char *buffer, size_t buffer_size, PBMS_WRITE_CALLBACK_FUNC cb, void *caller_data)
 
771
{
 
772
        pbms_bool ok = true;
 
773
        ms_setSelf();   
 
774
 
 
775
        enter_();
 
776
        
 
777
        try_(a) {       
 
778
                set_downLoadUserData(buffer, buffer_size, cb, caller_data);
 
779
                ms_init_get_blob(ref, false, false);
 
780
                if (curl_easy_perform(ms_curl) && !ms_throw_error)
 
781
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
 
782
                
 
783
                if (ms_throw_error)
 
784
                        throw_();
 
785
                
 
786
                check_reply_status();
 
787
                        
 
788
        }
 
789
        
 
790
        catch_(a);
 
791
        report_error(self);
 
792
        ok = false;
 
793
        
 
794
        cont_(a);
 
795
        
 
796
        return_(ok);
 
797
}
 
798
 
 
799
//------------------------------------------------
 
800
void PBMS_ConHandle::ms_sendCloudBLOB(size_t size)
 
801
{
 
802
        CSInputStream *input;
 
803
        CSString *bucket, *object_key, *content_type, *signature, *signature_time;
 
804
        CSVector *s3Headers;
 
805
        
 
806
        enter_();
 
807
        
 
808
        // Get the S3 bucket
 
809
        bucket = ms_headers.getHeaderValue(MS_CLOUD_BUCKET);
 
810
        if (!bucket)
 
811
                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Missing S3 bucket header in reply.");
 
812
        push_(bucket);
 
813
        ms_headers.removeHeader(MS_CLOUD_BUCKET);
 
814
 
 
815
        // Get the S3 object key
 
816
        object_key = ms_headers.getHeaderValue(MS_CLOUD_OBJECT_KEY);
 
817
        if (!object_key)
 
818
                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Missing S3 object key header in reply.");
 
819
        push_(object_key);
 
820
        ms_headers.removeHeader(MS_CLOUD_OBJECT_KEY);
 
821
 
 
822
        // Get the S3 blob signature
 
823
        signature = ms_headers.getHeaderValue(MS_BLOB_SIGNATURE);
 
824
        if (!signature)
 
825
                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Missing S3 blob signature in reply.");
 
826
        
 
827
        push_(signature);
 
828
        ms_headers.removeHeader(MS_BLOB_SIGNATURE);
 
829
 
 
830
        // Get the S3 blob signature date
 
831
        signature_time = ms_headers.getHeaderValue(MS_BLOB_DATE);
 
832
        if (!signature_time)
 
833
                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Missing S3 blob signature date in reply.");
 
834
        push_(signature_time);
 
835
        ms_headers.removeHeader(MS_BLOB_DATE);
 
836
 
 
837
        content_type = ms_headers.getHeaderValue(CONTENT_TYPE);
 
838
        if (content_type)
 
839
                push_(content_type);
 
840
 
 
841
        if (ms_readCB)
 
842
                input = CSCallbackInputStream::newStream(ms_readCB, ms_putCBData);
 
843
        else
 
844
                input = CSMemoryInputStream::newStream(ms_putData, ms_putDataLen);
 
845
        
 
846
        // Send the BLOB to the cloud storage.
 
847
        s3Headers = ms_cloud->s3_send(input, bucket->getCString(), object_key->getCString(), size, (content_type)?content_type->getCString():NULL, NULL, signature->getCString(), atol(signature_time->getCString()));
 
848
 
 
849
        ms_addS3HeadersHeaders(s3Headers);
 
850
        
 
851
        if (content_type)
 
852
                release_(content_type);
 
853
                
 
854
        release_(signature_time);
 
855
        release_(signature);
 
856
        release_(object_key);
 
857
        release_(bucket);
 
858
        
 
859
        exit_();
 
860
}
 
861
 
 
862
//------------------------------------------------
 
863
void PBMS_ConHandle::ms_addS3HeadersHeaders(CSVector *s3Headers)
 
864
{
 
865
        CSHTTPHeaders headers;
 
866
        enter_();
 
867
        
 
868
        try_(a) {
 
869
                headers.setHeaders(s3Headers);
 
870
                
 
871
                for (u_int i = 0; i < headers.numHeaders(); i++) {
 
872
                        CSHeader *h = headers.getHeader(i);
 
873
                        const char *name = h->getNameCString();
 
874
                        
 
875
                        if (strcasecmp(name, "ETag") == 0){
 
876
                                const char *value = h->getValueCString();
 
877
                                u_int value_len = strlen(value);
 
878
                                
 
879
                                // Strip any quotes
 
880
                                if (*value == '"') {
 
881
                                        value++;value_len--;
 
882
                                }
 
883
                                if (value[value_len-1] == '"') {
 
884
                                        value_len--;
 
885
                                }
 
886
                                
 
887
                                ms_headers.removeHeader(MS_CHECKSUM_TAG);
 
888
 
 
889
                                ms_headers.addHeader(MS_CHECKSUM_TAG, CSString::newString(value, value_len));
 
890
                                h->release();
 
891
                        } else {
 
892
                                ms_headers.removeHeader(name);
 
893
                                ms_headers.addHeader(h);
 
894
                        }
 
895
                }
 
896
        }
 
897
        
 
898
        catch_(a);      
 
899
        headers.clearHeaders();
 
900
        throw_();
 
901
        
 
902
        cont_(a);       
 
903
        headers.clearHeaders();
 
904
        
 
905
        exit_();
 
906
}
 
907
//------------------------------------------------
 
908
pbms_bool PBMS_ConHandle::ms_upLoadData(const char *table, const char *alias, const char *checksum, char *ref, size_t size, const u_char *data, PBMS_READ_CALLBACK_FUNC cb, void *caller_data)
 
909
{
 
910
        pbms_bool ok = true, use_cloud = (ms_cloud != NULL);
 
911
        
 
912
        ms_setSelf();   
 
913
 
 
914
        enter_();
 
915
        
 
916
        try_(a) {
 
917
 
 
918
resend:
 
919
                ms_init_put_blob(size, table, alias, checksum, use_cloud);
 
920
                set_upLoadUserData(data, size, cb, caller_data);
 
921
                set_downLoadUserData((u_char*) ref, PBMS_BLOB_URL_SIZE -1); // Set things up to receive the BLOB ref back.
 
922
 
 
923
                if (curl_easy_perform(ms_curl) && !ms_throw_error)
 
924
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
 
925
                
 
926
                if (ms_throw_error)
 
927
                        throw_();
 
928
                        
 
929
                check_reply_status();
 
930
        
 
931
                *ms_getBuffer =0; // null terminate the blob reference string.
 
932
                ms_getCloudHeaders();
 
933
                if (ms_cloud && use_cloud) 
 
934
                        ms_sendCloudBLOB(size);
 
935
                else if (use_cloud && !ms_cloud) {
 
936
                        // We were expecting to send the BLOB to the cloud but 
 
937
                        // the server did not respond with cloud data. The retry will
 
938
                        // send the data to the PBMS server.
 
939
                        use_cloud = false;
 
940
                        goto resend;
 
941
                }
 
942
                
 
943
        }
 
944
        
 
945
        catch_(a);
 
946
        report_error(self);
 
947
        ok = false;
 
948
        cont_(a);
 
949
        
 
950
        return_(ok);
 
951
}
 
952
 
 
953
//------------------------------------------------
 
954
static void report_global_error(int err, const char *message, int line)
 
955
{
 
956
        char line_no[32];
 
957
        
 
958
        if (global_errno)
 
959
                return;
 
960
        global_errno = err;
 
961
        cs_strcpy(MS_RESULT_MESSAGE_SIZE, global_err_message,  message);
 
962
        snprintf(line_no, 32, ": line %d", line);
 
963
        cs_strcat(MS_RESULT_MESSAGE_SIZE, global_err_message,  line_no);
 
964
}
 
965
 
 
966
//------------------------------------------------
 
967
static void clear_global_error()
 
968
{
 
969
        global_errno = 0;
 
970
        *global_err_message = 0;
 
971
}
 
972
 
 
973
//------------------------------------------------
 
974
pbms_bool pbms_library_init()
 
975
{
 
976
        clear_global_error();
 
977
        init_count++;
 
978
        
 
979
        if (init_count == 1 ) {
 
980
                CLEAR_SELF();
 
981
 
 
982
                CURLcode curl_code = curl_global_init(CURL_GLOBAL_ALL);
 
983
                if (curl_code) {
 
984
                        report_global_error(curl_code, curl_easy_strerror(curl_code) , __LINE__);
 
985
                        init_count  = 0;
 
986
                        return 0;
 
987
                }
 
988
                
 
989
                if (! CSThread::startUp()) {
 
990
                        report_global_error(ENOMEM, "CSThread::startUp() failed.", __LINE__);
 
991
                        init_count  = 0;
 
992
                        return 0;
 
993
                }
 
994
                
 
995
                cs_init_memory();
 
996
                
 
997
                mslib_global_thread = new CSThread( NULL);
 
998
                CSThread::setSelf(mslib_global_thread);
 
999
                
 
1000
                CSThread *self = mslib_global_thread;
 
1001
                if (!mslib_global_thread) {
 
1002
                        report_global_error(ENOMEM, "new CSThread( NULL) failed.", __LINE__);
 
1003
                        init_count  = 0;
 
1004
                         CSThread::shutDown();
 
1005
                        return 0;
 
1006
                }
 
1007
        
 
1008
                try_(a) {
 
1009
                        pbms_thread_list = new CSThreadList();
 
1010
                }
 
1011
                catch_(a);
 
1012
                report_global_mse_error(self);
 
1013
                init_count  = 0;
 
1014
                mslib_global_thread->release();
 
1015
                mslib_global_thread = NULL;
 
1016
                cs_exit_memory();
 
1017
                CSThread::shutDown();
 
1018
                        
 
1019
                cont_(a);
 
1020
        }
 
1021
        
 
1022
        return(init_count > 0);
 
1023
}
 
1024
 
 
1025
//------------------------------------------------
 
1026
void pbms_library_end()
 
1027
{
 
1028
         
 
1029
        if (init_count == 1 ) {
 
1030
                init_count  = 0; 
 
1031
                        
 
1032
 
 
1033
                if (pbms_thread_list) {
 
1034
                        PBMS_ConHandle *con;
 
1035
                        while (con = (PBMS_ConHandle *) pbms_thread_list->getFront()) {
 
1036
                                con->ms_setSelf();
 
1037
                                CSThread::detach(con);
 
1038
                        }
 
1039
                        CSThread::setSelf(mslib_global_thread);
 
1040
                        pbms_thread_list->release();
 
1041
                } else
 
1042
                        CSThread::setSelf(mslib_global_thread);
 
1043
 
 
1044
                
 
1045
                mslib_global_thread->release();
 
1046
                mslib_global_thread = NULL;
 
1047
                cs_exit_memory();
 
1048
                CSThread::shutDown();
 
1049
                
 
1050
                curl_global_cleanup();
 
1051
        }
 
1052
        
 
1053
        if (init_count > 0)
 
1054
                init_count--;
 
1055
}
 
1056
 
 
1057
//------------------------------------------------
 
1058
int pbms_errno(PBMS myhndl)
 
1059
{
 
1060
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1061
 
 
1062
        if (con) {      
 
1063
                return con->ms_errno;
 
1064
        }
 
1065
                
 
1066
        return global_errno;
 
1067
}
 
1068
 
 
1069
//------------------------------------------------
 
1070
const char *pbms_error(PBMS myhndl)
 
1071
{
 
1072
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1073
 
 
1074
        if (con) {      
 
1075
                return con->ms_err_message;
 
1076
        }
 
1077
                
 
1078
        return global_err_message;
 
1079
}
 
1080
 
 
1081
//------------------------------------------------
 
1082
PBMS pbms_connect(const char* host, unsigned int port, const char *database)
 
1083
{
 
1084
        PBMS_ConHandle *con = NULL;
 
1085
        CLEAR_SELF(); 
 
1086
        
 
1087
        clear_global_error();
 
1088
 
 
1089
        new_(con, PBMS_ConHandle());
 
1090
        
 
1091
        if ((!con) || !CSThread::attach(con)) {
 
1092
                report_global_error(ENOMEM, "new PBMS_Ref() failed.", __LINE__);
 
1093
                if (con) {
 
1094
                        con->release();
 
1095
                        con = NULL;
 
1096
                }
 
1097
        } else {
 
1098
                CSThread *self = con;
 
1099
                
 
1100
                try_(a) {
 
1101
                                con->ms_initConnection(host, port, database);
 
1102
                                con->ms_ping();
 
1103
                        }
 
1104
                catch_(a);
 
1105
                        report_global_mse_error(con);
 
1106
                        CSThread::detach(con);
 
1107
                        con = NULL;
 
1108
                        
 
1109
                cont_(a);
 
1110
        }
 
1111
        
 
1112
        return(con);
 
1113
}
 
1114
 
 
1115
//------------------------------------------------
 
1116
void pbms_close(PBMS myhndl)
 
1117
{
 
1118
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1119
        
 
1120
        //      This will kill the 'self' thread so do not try and do any exception handling.
 
1121
        con->ms_setSelf();
 
1122
        CSThread::detach(con); // This will also release the connection.
 
1123
}
 
1124
 
 
1125
//------------------------------------------------
 
1126
pbms_bool pbms_is_blob_reference(PBMS myhndl, const char *ref)
 
1127
{
 
1128
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1129
        pbms_bool ok = false;
 
1130
        
 
1131
        con->ms_setSelf();      
 
1132
        enter_();
 
1133
        
 
1134
        try_(a) {
 
1135
                MSBlobURLRec blob;
 
1136
                if (ms_parse_blob_url(&blob, (char *)ref)) 
 
1137
                        ok = true;
 
1138
        }
 
1139
        
 
1140
        catch_(a);
 
1141
        cont_(a);
 
1142
        return_(ok);
 
1143
}
 
1144
 
 
1145
//------------------------------------------------
 
1146
pbms_bool pbms_get_blob_size(PBMS myhndl, const char *ref, size_t *size)
 
1147
{
 
1148
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1149
        bool ok = false;
 
1150
        
 
1151
        con->ms_setSelf();      
 
1152
        enter_();
 
1153
        
 
1154
        try_(a) {
 
1155
                MSBlobURLRec blob;
 
1156
                if (ms_parse_blob_url(&blob, (char *)ref)) {
 
1157
                        *size = blob.bu_blob_size;
 
1158
                        ok = true;
 
1159
                } 
 
1160
#ifdef HAVE_ALIAS_SUPPORT
 
1161
                else { // Assume it is a BLOB alias
 
1162
                        CSVector *saved_metadata;
 
1163
                        CSString *data = NULL;
 
1164
                        
 
1165
                        saved_metadata = con->ms_headers.takeHeaders();
 
1166
                        try_(b) {
 
1167
                                con->ms_get_info(ref, true);
 
1168
                                data = con->ms_get_metadata(MS_BLOB_SIZE);
 
1169
                                *size =         atol(data->getCString());       
 
1170
                                data->release();
 
1171
                                ok = true;
 
1172
                        }
 
1173
                        catch_(b);
 
1174
                        con->report_error(self);
 
1175
                        cont_(b);
 
1176
                        con->ms_headers.setHeaders(saved_metadata);
 
1177
                }
 
1178
#else
 
1179
                con->report_error(MS_ERR_INCORRECT_URL, "Invalid BLOB URL");
 
1180
#endif
 
1181
        }
 
1182
        
 
1183
        catch_(a);
 
1184
        cont_(a);
 
1185
        return_(ok);
 
1186
}
 
1187
/*
 
1188
 * pbms_add_metadata() and pbms_clear_metadata() deal with metadata for outgoing BLOBs only.
 
1189
 */
 
1190
//------------------------------------------------
 
1191
pbms_bool pbms_add_metadata(PBMS myhndl, const char *name, const char *value)
 
1192
{
 
1193
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1194
        bool ok = false;
 
1195
        
 
1196
        con->ms_setSelf();      
 
1197
        enter_();
 
1198
        
 
1199
        try_(a) {
 
1200
                if (strlen(name) > MS_META_NAME_SIZE)
 
1201
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Metadata name too long.");
 
1202
                if (strlen(value) > MS_META_VALUE_SIZE)
 
1203
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Metadata value too long.");
 
1204
                
 
1205
                con->ms_metadata_out.addHeader(name, value);
 
1206
 
 
1207
                ok = true;
 
1208
        }
 
1209
        
 
1210
        catch_(a) {
 
1211
                con->report_error(self);
 
1212
        }
 
1213
        
 
1214
        cont_(a);
 
1215
        
 
1216
        return_(ok);
 
1217
}
 
1218
 
 
1219
//------------------------------------------------
 
1220
void pbms_clear_metadata(PBMS myhndl, const char *name)
 
1221
{
 
1222
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1223
        
 
1224
        con->ms_setSelf();      
 
1225
        enter_();
 
1226
        
 
1227
        try_(a) {
 
1228
                if (name)
 
1229
                        con->ms_metadata_out.removeHeader(name);
 
1230
                else
 
1231
                        con->ms_metadata_out.clearHeaders();
 
1232
        }
 
1233
        
 
1234
        catch_(a) {
 
1235
                con->report_error(self);
 
1236
        }
 
1237
        
 
1238
        cont_(a);
 
1239
        
 
1240
}
 
1241
 
 
1242
/*
 
1243
 * pbms_reset_metadata(), pbms_next_metadata() and pbms_get_metadata_value() deal with metadata for the last
 
1244
 * BLOB received on the connection.
 
1245
 */
 
1246
//------------------------------------------------
 
1247
unsigned int pbms_reset_metadata(PBMS myhndl)
 
1248
{
 
1249
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1250
        unsigned int count = 0;
 
1251
        
 
1252
        con->ms_setSelf();      
 
1253
        enter_();
 
1254
        
 
1255
        try_(a) {
 
1256
                count = con->ms_init_fetch();
 
1257
        }
 
1258
        
 
1259
        catch_(a) {
 
1260
                con->report_error(self);
 
1261
        }
 
1262
        
 
1263
        cont_(a);
 
1264
        
 
1265
        return_(count);
 
1266
}
 
1267
 
 
1268
//------------------------------------------------
 
1269
pbms_bool pbms_next_metadata(PBMS myhndl, char *name, char *value, size_t *v_size)
 
1270
{
 
1271
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1272
        bool ok = false;
 
1273
        size_t null_size = MS_META_VALUE_SIZE;
 
1274
        
 
1275
        con->ms_setSelf();      
 
1276
        enter_();
 
1277
        
 
1278
        if (!v_size)
 
1279
                v_size = &null_size;
 
1280
                
 
1281
        try_(a) {
 
1282
                const char *m_name, *m_value;
 
1283
                ok = con->ms_next(&m_name, &m_value);
 
1284
                if (ok) {
 
1285
                        cs_strcpy(MS_META_NAME_SIZE, name, m_name);
 
1286
                        cs_strcpy(*v_size, value, m_value);
 
1287
 
 
1288
                        if (*v_size <= strlen(m_value)) 
 
1289
                                *v_size = strlen(m_value) +1;
 
1290
 
 
1291
                }
 
1292
        }
 
1293
        
 
1294
        catch_(a) {
 
1295
                con->report_error(self);
 
1296
        }
 
1297
        
 
1298
        cont_(a);
 
1299
        
 
1300
        return_(ok);
 
1301
}
 
1302
 
 
1303
//------------------------------------------------
 
1304
pbms_bool pbms_get_metadata_value(PBMS myhndl, const char *name, char *buffer, size_t *size)
 
1305
{
 
1306
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1307
        bool ok = false;
 
1308
        
 
1309
        con->ms_setSelf();      
 
1310
        enter_();
 
1311
        
 
1312
        try_(a) {
 
1313
                CSString *data = NULL;
 
1314
                data = con->ms_get_metadata(name);
 
1315
                if (data) {
 
1316
                        ok = true;
 
1317
                        cs_strcpy(*size, buffer, data->getCString());
 
1318
                        if (data->length() >= *size) {
 
1319
                                *size = data->length() +1;                              
 
1320
                        }
 
1321
                        data->release();
 
1322
                }
 
1323
        }
 
1324
        
 
1325
        catch_(a) {
 
1326
                con->report_error(self);
 
1327
        }
 
1328
        
 
1329
        cont_(a);
 
1330
        
 
1331
        return_(ok);
 
1332
}
 
1333
 
 
1334
//------------------------------------------------
 
1335
pbms_bool pbms_get_md5_digest(PBMS myhndl, char *md5_digest)
 
1336
{
 
1337
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1338
        bool ok = false;
 
1339
        
 
1340
        con->ms_setSelf();      
 
1341
        enter_();
 
1342
        
 
1343
        try_(a) {               
 
1344
                CSString *data = NULL;
 
1345
                data = con->ms_get_checksum();
 
1346
                if (!data)
 
1347
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "No checksum found.");
 
1348
                push_(data);
 
1349
                if (cs_hex_to_bin(16, md5_digest, data->length(), data->getCString()) != 16) {
 
1350
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Invalid MD5 Digest.");
 
1351
                } 
 
1352
                release_(data);
 
1353
                ok = true;
 
1354
        }
 
1355
        
 
1356
        catch_(a) {
 
1357
                con->report_error(self);
 
1358
        }
 
1359
        
 
1360
        cont_(a);
 
1361
        
 
1362
        return_(ok);
 
1363
}
 
1364
 
 
1365
//------------------------------------------------
 
1366
//------------------------------------------------
 
1367
pbms_bool pbms_put_data(PBMS myhndl, const char *table, const char *checksum, char *ref, size_t size, const unsigned char *data)
 
1368
{
 
1369
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1370
        
 
1371
        return con->ms_upLoadData(table, NULL, checksum, ref, size, data);
 
1372
}
 
1373
//------------------------------------------------
 
1374
pbms_bool pbms_put_data_cb(PBMS myhndl, const char *table, const char *checksum, char *ref, size_t size, PBMS_READ_CALLBACK_FUNC cb, void *caller_data)
 
1375
{
 
1376
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1377
        
 
1378
        return con->ms_upLoadData(table, NULL, checksum, ref, size, NULL, cb, caller_data);
 
1379
}
 
1380
 
 
1381
//------------------------------------------------
 
1382
pbms_bool pbms_get_data(PBMS myhndl, const char *ref, unsigned char *buffer, size_t buffer_size)
 
1383
{
 
1384
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1385
        
 
1386
        return con->ms_downLoadData(ref, buffer, buffer_size);
 
1387
}
 
1388
 
 
1389
//------------------------------------------------
 
1390
pbms_bool pbms_get_data_range(PBMS myhndl, const char *ref, size_t start_offset, size_t end_offset, unsigned char *buffer, size_t buffer_size, size_t *data_size)
 
1391
{
 
1392
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1393
        
 
1394
        con->ms_range_start = start_offset;
 
1395
        con->ms_range_end = end_offset;
 
1396
        
 
1397
        if (con->ms_downLoadData(ref, buffer, buffer_size)) {
 
1398
                if (data_size)
 
1399
                        *data_size = con->ms_getBuffer - buffer;
 
1400
                return true;
 
1401
        }
 
1402
  
 
1403
        return false;
 
1404
}
 
1405
 
 
1406
//------------------------------------------------
 
1407
pbms_bool pbms_get_data_cb(PBMS myhndl, const char *ref, PBMS_WRITE_CALLBACK_FUNC cb, void *caller_data)
 
1408
{
 
1409
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1410
        
 
1411
        return con->ms_downLoadData(ref, NULL, 0, cb, caller_data);
 
1412
}
 
1413
 
 
1414
//------------------------------------------------
 
1415
pbms_bool pbms_get_data_range_cb(PBMS myhndl, const char *ref, size_t start_offset, size_t end_offset, PBMS_WRITE_CALLBACK_FUNC cb, void *caller_data)
 
1416
{
 
1417
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1418
        
 
1419
        con->ms_range_start = start_offset;
 
1420
        con->ms_range_end = end_offset;
 
1421
 
 
1422
        return con->ms_downLoadData(ref, NULL, 0, cb, caller_data);
 
1423
}
 
1424
 
 
1425
//------------------------------------------------
 
1426
pbms_bool pbms_get_info(PBMS myhndl, const char *ref)
 
1427
{
 
1428
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1429
        pbms_bool ok = true;
 
1430
        
 
1431
        con->ms_setSelf();      
 
1432
        enter_();
 
1433
        
 
1434
        try_(a) {       
 
1435
                con->ms_get_info(ref, false);                   
 
1436
        }
 
1437
        
 
1438
        catch_(a);
 
1439
        con->report_error(self);
 
1440
        ok = false;
 
1441
        
 
1442
        cont_(a);
 
1443
        
 
1444
        return_(ok);
 
1445
}
 
1446
 
 
1447
//------------------------------------------------
 
1448
pbms_bool pbms_set_option(PBMS myhndl, enum pbms_option option, const void *in_value)
 
1449
{
 
1450
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1451
        pbms_bool ok = true;
 
1452
        
 
1453
        con->ms_setSelf();      
 
1454
        enter_();
 
1455
        
 
1456
        try_(a) {
 
1457
 
 
1458
                switch (option) {
 
1459
                        case PBMS_OPTION_DATABASE: {
 
1460
                                CSString *database = CSString::newString((char *)in_value);
 
1461
                                con->ms_database->release();
 
1462
                                con->ms_database = database;
 
1463
                                break;
 
1464
                        }
 
1465
                                
 
1466
                        case PBMS_OPTION_TRANSMITION_TIMEOUT:
 
1467
                                con->ms_transmition_timeout = *((unsigned int*)in_value);
 
1468
                                break; 
 
1469
                        
 
1470
                        case PBMS_OPTION_HOST:
 
1471
                        case PBMS_OPTION_PORT:
 
1472
                                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Option is ReadOnly.");
 
1473
                                break;
 
1474
 
 
1475
                        default:
 
1476
                                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Unknown Option.");
 
1477
                }
 
1478
        
 
1479
        }
 
1480
        
 
1481
        catch_(a);
 
1482
        con->report_error(self);
 
1483
        ok = false;
 
1484
        cont_(a);
 
1485
        return_(ok);
 
1486
}
 
1487
 
 
1488
//------------------------------------------------
 
1489
pbms_bool pbms_get_option(PBMS myhndl, enum pbms_option option, void *out_value)
 
1490
{
 
1491
        PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
 
1492
        pbms_bool ok = true;
 
1493
        
 
1494
        con->ms_setSelf();      
 
1495
        enter_();
 
1496
        
 
1497
        try_(a) {
 
1498
 
 
1499
                switch (option) {
 
1500
                        case PBMS_OPTION_DATABASE:
 
1501
                                *((const char**)out_value) = con->ms_database->getCString();
 
1502
                                break;
 
1503
                        
 
1504
                        case PBMS_OPTION_TRANSMITION_TIMEOUT:
 
1505
                                *((unsigned int*)out_value) = con->ms_transmition_timeout;
 
1506
                                break; 
 
1507
                        
 
1508
                        case PBMS_OPTION_HOST:
 
1509
                                *((const char**)out_value) = con->ms_host->getCString();
 
1510
                                break;
 
1511
                                
 
1512
                        case PBMS_OPTION_PORT:
 
1513
                                *((unsigned int*)out_value) = con->ms_port;
 
1514
                                break;
 
1515
 
 
1516
                        default:
 
1517
                                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Unknown Option.");
 
1518
                }
 
1519
        
 
1520
        }
 
1521
        
 
1522
        catch_(a);
 
1523
        con->report_error(self);
 
1524
        ok = false;
 
1525
        cont_(a);
 
1526
        return_(ok);
 
1527
}
 
1528
 
 
1529
 
 
1530