1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream (PBMS)
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
* 2008-09-10 Barry Leslie
25
#include <drizzled/common.h>
26
#include <drizzled/session.h>
27
#include <drizzled/charset_info.h>
30
#include "cslib/CSConfig.h"
33
#include <curl/curl.h>
38
#include "cslib/CSGlobal.h"
39
#include "cslib/CSThread.h"
40
#include "cslib/CSString.h"
41
#include "cslib/CSStrUtil.h"
42
//#include "cslib/CSSocket.h"
43
#include "cslib/CSHTTPStream.h"
44
#include "cslib/CSMd5.h"
45
#include "cslib/CSS3Protocol.h"
46
#include "metadata_ms.h"
48
#define CLEAR_SELF() CSThread::setSelf(NULL)
49
#define MAX_STMT_SIZE 1024
51
static int global_errno;
52
static char global_err_message[MS_RESULT_MESSAGE_SIZE];
54
static unsigned long init_count = 0;
55
static CSThreadList *pbms_thread_list = NULL;
56
static CSThread *mslib_global_thread = NULL;
58
static void report_global_mse_error(CSThread *thd)
60
global_errno = thd->myException.getErrorCode();
61
cs_strcpy(MS_RESULT_MESSAGE_SIZE, global_err_message, thd->myException.getMessage());
64
#define DFLT_CONNECTION_TIMEOUT 10 // Changing this required a documentation update.
66
#define THROW_CURL_IF(v) { if (v) CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);}
67
//======================================
68
static size_t receive_data(void *ptr, size_t size, size_t nmemb, void *stream);
69
static size_t receive_header(void *ptr, size_t size, size_t nmemb, void *stream);
70
static size_t send_callback(void *ptr, size_t size, size_t nmemb, void *stream);
71
//--------------------------------------------
72
class PBMS_ConHandle:public CSThread {
75
// Fields that must be freed when the object is destroyed.
77
CSString *ms_database;
78
struct curl_slist *ms_header_list; // A curl list of headers to be sent with the next PUT.
79
struct curl_slist *ms_info_only; // A curl list of headers to be sent with Get.
80
struct curl_slist *ms_ping_header; // A curl list of headers to be sent with ping.
81
CSStringBuffer *ms_url_str;
83
CSStringBuffer *ms_buffer;
84
CSStringBuffer *ms_errorReply;
85
CSS3Protocol *ms_cloud;
86
uint64_t ms_range_start;
87
uint64_t ms_range_end;
89
unsigned int ms_replyStatus;
90
CSHTTPHeaders ms_headers;
91
CSHTTPHeaders ms_metadata_out;
92
uint32_t ms_next_header;
93
uint32_t ms_max_header;
95
unsigned int ms_transmition_timeout; // In the future this may have some effect but for now it is always be 0 (no timeout).
96
unsigned int ms_url_base_len;
97
bool ms_throw_error; // Gets set if an exception occurs in a callback.
100
char ms_err_message[MS_RESULT_MESSAGE_SIZE];
103
char ms_curl_error[CURL_ERROR_SIZE];
105
size_t ms_DataToBeTransfered;
106
// Get data caller parameters:
107
u_char *ms_getBuffer;
108
size_t ms_getBufferSize;
109
PBMS_WRITE_CALLBACK_FUNC ms_writeCB;
112
// Put data caller parameters:
113
const u_char *ms_putData;
114
size_t ms_putDataLen;
115
size_t ms_putDataOffset;
116
PBMS_READ_CALLBACK_FUNC ms_readCB;
120
CSThread(pbms_thread_list),
123
ms_header_list(NULL),
125
ms_ping_header(NULL),
137
ms_transmition_timeout(0),
139
ms_throw_error(false),
141
ms_DataToBeTransfered(0),
154
curl_easy_cleanup(ms_curl);
160
ms_database->release();
166
ms_buffer->release();
169
ms_errorReply->release();
172
ms_url_str->release();
175
curl_slist_free_all(ms_header_list);
178
curl_slist_free_all(ms_info_only);
181
curl_slist_free_all(ms_ping_header);
183
ms_headers.clearHeaders();
184
ms_metadata_out.clearHeaders();
187
void set_downLoadUserData(u_char *buffer, size_t buffer_size, PBMS_WRITE_CALLBACK_FUNC cb = NULL, void *caller_data = NULL)
189
ms_DataToBeTransfered = buffer_size;
190
ms_getBuffer = buffer;
191
ms_getBufferSize = buffer_size;
193
ms_getCBData = caller_data;
196
void set_upLoadUserData(const u_char *buffer, size_t size, PBMS_READ_CALLBACK_FUNC cb = NULL, void *caller_data = NULL)
198
ms_DataToBeTransfered = size;
200
ms_putDataLen = size;
203
ms_putCBData = caller_data;
207
void ms_initConnection(const char* host, unsigned int port, const char* database)
209
ms_curl = curl_easy_init();
211
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_easy_init() failed.");
213
if (curl_easy_setopt(ms_curl, CURLOPT_ERRORBUFFER, ms_curl_error))
214
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_easy_setopt(CURLOPT_ERRORBUFFER) failed.");
216
// Uncomment this line to trace network action during request. Very Usefull!!
217
//curl_easy_setopt(ms_curl, CURLOPT_VERBOSE, 1L);
219
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_TCP_NODELAY, 1L));
221
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_NOPROGRESS, 1L));
222
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_WRITEFUNCTION, receive_data));
223
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_READFUNCTION, send_callback));
224
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HEADERFUNCTION, receive_header));
226
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_WRITEDATA, this));
227
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_READDATA, this));
228
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_WRITEHEADER, this));
230
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_FOLLOWLOCATION, 1L)); // Follow redirects.
233
pbms_api_owner = true;
234
ms_host = CSString::newString(host);
235
ms_database = CSString::newString(database);
237
ms_url_str = new CSStringBuffer(50);
238
ms_url_str->append("http://");
239
ms_url_str->append(host);
240
ms_url_str->append(":");
241
ms_url_str->append(port);
242
ms_url_str->append("/");
243
ms_url_base_len = ms_url_str->length();
245
ms_buffer = new CSStringBuffer(50);
247
ms_info_only = curl_slist_append(ms_info_only, MS_BLOB_INFO_REQUEST ": yes");
249
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
251
ms_buffer->append(MS_PING_REQUEST": ");
252
ms_buffer->append(database);
253
ms_ping_header = curl_slist_append(ms_ping_header, ms_buffer->getCString());
255
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
257
ms_buffer->setLength(0);
268
void ms_init_put_blob(curl_off_t size, const char *table, const char *alias, const char *checksum, bool use_cloud);
270
void ms_init_get_blob(const char *ref, bool is_alias, bool info_only);
272
void ms_get_info(const char *ref, bool is_alias);
273
void ms_sendCloudBLOB(size_t size) ;
275
pbms_bool ms_downLoadData(const char *ref, u_char *buffer, size_t buffer_size, PBMS_WRITE_CALLBACK_FUNC cb = NULL, void *caller_data = NULL);
277
pbms_bool ms_upLoadData(const char *table, const char *alias, const char *checksum, char *ref, size_t size, const u_char *data, PBMS_READ_CALLBACK_FUNC cb = NULL, void *caller_data = NULL);
279
uint32_t ms_init_fetch() {ms_next_header =0; return ms_max_header = ms_headers.numHeaders();}
281
bool ms_next(const char **name, const char **value)
283
if (ms_next_header >= ms_max_header)
286
CSHeader *header = ms_headers.getHeader(ms_next_header++);
287
*name = header->getNameCString();
288
*value = header->getValueCString();
297
printf("Headers:\n");
298
printf("---------------------------------------\n");
299
while ( (header = ms_headers.getHeader(i++)) ) {
300
printf("%s : %s\n", header->getNameCString(), header->getValueCString());
303
printf("---------------------------------------\n");
306
void ms_addS3HeadersHeaders(CSVector *s3Headers);
308
CSString *ms_get_metadata(const char *name)
310
return ms_headers.getHeaderValue(name);
313
CSString *ms_get_alias() {return ms_get_metadata(MS_ALIAS_TAG);}
315
CSString *ms_get_checksum() {return ms_get_metadata(MS_CHECKSUM_TAG);}
317
void report_error(CSThread *self)
319
ms_errno = self->myException.getErrorCode();
320
cs_strcpy(MS_RESULT_MESSAGE_SIZE, ms_err_message, self->myException.getMessage());
323
void report_error(int err, const char *msg)
326
cs_strcpy(MS_RESULT_MESSAGE_SIZE, ms_err_message, msg);
329
void throw_http_reply_exception();
331
void check_reply_status()
333
switch (ms_replyStatus) {
335
//case 301: // Moved Permanently
336
//case 307: // Temporary Redirect
339
throw_http_reply_exception();
344
void ms_getCloudHeaders()
346
CSString *value = NULL;
351
value = ms_headers.getHeaderValue(MS_CLOUD_SERVER);
361
new_(ms_cloud, CSS3Protocol());
363
// Remove the cloud headers so they are not visable to the caller.
364
ms_headers.removeHeader(MS_CLOUD_SERVER);
366
ms_cloud->s3_setServer(value->getCString());
369
// Get the S3 public key
370
value = ms_headers.getHeaderValue(MS_CLOUD_KEY);
372
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Missing S3 public key in reply.");
375
ms_headers.removeHeader(MS_CLOUD_KEY);
376
ms_cloud->s3_setPublicKey(value->getCString());
384
// CURL callback functions:
385
////////////////////////////
386
static size_t receive_data(void *vptr, size_t objs, size_t obj_size, void *v_con)
388
PBMS_ConHandle *con = (PBMS_ConHandle*) v_con;
389
size_t data_len = objs * obj_size;
390
char *ptr = (char*)vptr;
392
CLOBBER_PROTECT(data_len);
394
if (con->ms_replyStatus >= 400) { // Collect the error reply.
397
if (!con->ms_errorReply)
398
con->ms_errorReply = new CSStringBuffer(50);
399
con->ms_errorReply->append(ptr, data_len);
402
con->ms_throw_error = true;
409
if (data_len > con->ms_DataToBeTransfered) { // This should never happen.
410
CSException::RecordException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Blob data overflow.");
411
con->ms_throw_error = true;
415
con->ms_DataToBeTransfered -= data_len;
417
return (con->ms_writeCB)(con->ms_getCBData, ptr, data_len, false);
419
memcpy(con->ms_getBuffer, ptr, data_len);
420
con->ms_getBuffer += data_len;
425
#define IS_REDIRECT(s) ((s >= 300) && (s < 400))
426
//----------------------
427
static size_t receive_header(void *header, size_t objs, size_t obj_size, void *v_con)
429
PBMS_ConHandle *con = (PBMS_ConHandle*) v_con;
430
size_t size = objs * obj_size;
431
char *end, *ptr = (char*) header, *value = NULL;
432
const char *name = NULL;
433
uint32_t name_len =0, value_len =0;
435
CLOBBER_PROTECT(size);
436
CLOBBER_PROTECT(ptr);
437
CLOBBER_PROTECT(value);
438
CLOBBER_PROTECT(name_len);
439
CLOBBER_PROTECT(value_len);
440
CLOBBER_PROTECT(name);
443
if (*(end -2) == '\r' && *(end -1) == '\n')
446
while ((end != ptr) && (*ptr == ' ')) ptr++;
450
// Get the reply status.
451
if (((!con->ms_replyStatus) || (con->ms_replyStatus == 100) || IS_REDIRECT(con->ms_replyStatus) ) && !strncasecmp(ptr, "HTTP", 4)) {
453
while ((end != ptr) && (*ptr != ' ')) ptr++; // skip HTTP stuff
454
while ((end != ptr) && (*ptr == ' ')) ptr++; // find the start of eh status code.
458
if (end < (ptr +3)) // expecting a 3 digit status code.
461
memcpy(status, ptr, 3);
464
con->ms_replyStatus = atoi(status);
468
while ((end != ptr) && (*ptr != ':')) ptr++;
471
name_len = ptr - name;
474
while ((end != ptr) && (*ptr == ' ')) ptr++;
479
value_len = end - value;
481
while (name[name_len-1] == ' ') name_len--;
482
while (value[value_len-1] == ' ') value_len--;
484
if (!strncasecmp(name, "Content-Length", 14)) {
487
memcpy(len, value, (value_len > 31)?31:value_len);
488
len[(value_len > 31)?31:value_len] = 0;
491
// If there is no callback then the data size is limited
492
// to the GetBuffer size.
493
if (con->ms_writeCB || (length < con->ms_getBufferSize))
494
con->ms_DataToBeTransfered = length;
501
if (!strncasecmp(name, "ETag", 4)) { // S3 checksum
502
name = MS_CHECKSUM_TAG;
503
name_len = strlen(MS_CHECKSUM_TAG);
508
if (value[value_len-1] == '"') {
511
con->ms_headers.removeHeader(MS_CHECKSUM_TAG);
513
con->ms_headers.addHeader(name, name_len, value, value_len);
517
con->ms_throw_error = true;
524
//----------------------
525
static size_t send_callback(void *ptr, size_t objs, size_t obj_size, void *v_con)
527
PBMS_ConHandle *con = (PBMS_ConHandle*) v_con;
528
char *buffer = (char*) ptr;
529
size_t data_sent = 0, buffer_size = objs * obj_size;
531
if (con->ms_putDataLen == 0)
535
data_sent = (con->ms_readCB)(con->ms_putCBData, buffer , buffer_size, false);
537
data_sent = (buffer_size < con->ms_putDataLen)? buffer_size: con->ms_putDataLen;
538
memcpy(buffer,con->ms_putData, data_sent);
539
con->ms_putData += data_sent;
541
con->ms_putDataLen -= data_sent;
546
#define CONTENT_TYPE "Content-Type"
547
//------------------------------------------------
548
void PBMS_ConHandle::ms_init_put_blob(curl_off_t size, const char *table, const char *alias, const char *checksum, bool use_cloud)
550
char buffer[MS_META_VALUE_SIZE + MS_META_NAME_SIZE +2];
551
int buffer_size = MS_META_VALUE_SIZE + MS_META_NAME_SIZE +2;
552
bool have_content_type = false;
556
ms_url_str->setLength(ms_url_base_len);
558
ms_url_str->append(ms_database->getCString());
560
ms_url_str->append("/");
561
ms_url_str->append(table);
564
// Remove any old headers
565
if (ms_header_list) {
566
curl_slist_free_all(ms_header_list);
567
ms_header_list = NULL;
570
// Add metadata headers.
573
while ( (header = ms_metadata_out.getHeader(i++)) ) {
574
cs_strcpy(buffer_size, buffer, header->getNameCString());
575
if (!strcasecmp( buffer, CONTENT_TYPE))
576
have_content_type = true;
577
cs_strcat(buffer_size, buffer, ':');
578
cs_strcat(buffer_size, buffer, header->getValueCString());
580
ms_header_list = curl_slist_append(ms_header_list, buffer);
582
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
585
if (!have_content_type) {
586
// Prevent CURLOPT_POST from adding a content type.
587
ms_header_list = curl_slist_append(ms_header_list, CONTENT_TYPE ":");
589
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
593
cs_strcpy(buffer_size, buffer, MS_CHECKSUM_TAG ":");
594
cs_strcat(buffer_size, buffer, checksum);
596
ms_header_list = curl_slist_append(ms_header_list, buffer);
598
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
602
#ifdef HAVE_ALIAS_SUPPORT
604
if (strlen(alias) > MS_META_VALUE_SIZE)
605
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "BLOB alias name too long.");
607
cs_strcpy(buffer_size, buffer, MS_ALIAS_TAG ":");
608
cs_strcat(buffer_size, buffer, alias);
610
ms_header_list = curl_slist_append(ms_header_list, buffer);
612
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
617
cs_strcpy(buffer_size, buffer, MS_BLOB_SIZE ":");
618
snprintf(buffer + strlen(buffer), buffer_size - strlen(buffer), "%"PRIu64"", size);
619
ms_header_list = curl_slist_append(ms_header_list, buffer);
621
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
623
size = 0; // The BLOB data is not being sent to the PBMS server
626
// Using CURLOPT_UPLOAD is VERY slow!
627
//THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_INFILESIZE_LARGE, size));
628
//THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_UPLOAD, 1L));
629
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_POSTFIELDSIZE_LARGE, size));
630
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_POST, 1L));
632
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, ms_header_list));
633
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_URL, ms_url_str->getCString() ));
636
ms_headers.clearHeaders();
638
ms_errorReply->setLength(0);
642
//------------------------------------------------
643
void PBMS_ConHandle::ms_init_get_blob(const char *ref, bool is_alias, bool info_only)
647
ms_url_str->setLength(ms_url_base_len);
649
#ifdef HAVE_ALIAS_SUPPORT
651
if (is_alias || !PBMSBlobURLTools::couldBeURL(ref, &blob)) {
652
ms_url_str->append(ms_database->getCString());
653
ms_url_str->append("/");
656
ms_url_str->append((char *) ref);
658
//THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_UPLOAD, 0L));
659
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_POST, 0L));
660
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, (info_only)?ms_info_only: NULL));
661
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_URL, ms_url_str->getCString()));
663
// NOTE: range 0-0 is valid, it returns the first byte.
664
if (ms_range_start <= ms_range_end) {
666
snprintf(range, 80, "%"PRIu64"-%"PRIu64"", ms_range_start, ms_range_end);
667
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_RANGE, range));
671
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_RANGE, NULL));
675
ms_headers.clearHeaders();
677
ms_errorReply->setLength(0);
680
//------------------------------------------------
681
void PBMS_ConHandle::ms_get_info(const char *ref, bool is_alias)
684
ms_init_get_blob(ref, is_alias, true);
686
if (curl_easy_perform(ms_curl) && !ms_throw_error)
687
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
692
check_reply_status();
697
//------------------------------------------------
698
void PBMS_ConHandle::ms_ping()
701
ms_url_str->setLength(ms_url_base_len);
703
// THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_UPLOAD, 0L));
704
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_POST, 0L));
705
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, ms_ping_header));
706
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_URL, ms_url_str->getCString()));
709
ms_headers.clearHeaders();
711
ms_errorReply->setLength(0);
713
if (curl_easy_perform(ms_curl) && !ms_throw_error)
714
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
719
check_reply_status();
721
// Check to see if the BLOBs in the database are stored in a cloud:
722
CSString *cloud_server;
723
cloud_server = ms_headers.getHeaderValue(MS_CLOUD_SERVER);
729
cloud_server->release();
731
new_(ms_cloud, CSS3Protocol());
737
//------------------------------------------------
738
void PBMS_ConHandle::throw_http_reply_exception()
740
CSString *reply = NULL, *error_text = NULL;
748
size = ms_errorReply->length();
751
error_text = CSString::newString("Missing HTTP reply: possible Media Stream engine connection failure.");
755
reply = CSString::newString(ms_errorReply);
757
ms_errorReply = NULL;
759
start = reply->locate(EXCEPTION_REPLY_MESSAGE_PREFIX_TAG, 1);
760
start += strlen(EXCEPTION_REPLY_MESSAGE_PREFIX_TAG);
761
end = reply->locate(EXCEPTION_REPLY_MESSAGE_SUFFIX_TAG, 1);
763
error_text = reply->substr(start, end - start);
770
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, error_text->getCString());
772
// We never get here.
776
//------------------------------------------------
777
pbms_bool PBMS_ConHandle::ms_downLoadData(const char *ref, u_char *buffer, size_t buffer_size, PBMS_WRITE_CALLBACK_FUNC cb, void *caller_data)
785
set_downLoadUserData(buffer, buffer_size, cb, caller_data);
786
ms_init_get_blob(ref, false, false);
787
if (curl_easy_perform(ms_curl) && !ms_throw_error)
788
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
793
check_reply_status();
806
//------------------------------------------------
807
void PBMS_ConHandle::ms_sendCloudBLOB(size_t size)
809
CSInputStream *input;
810
CSString *bucket, *object_key, *content_type, *signature, *signature_time;
816
bucket = ms_headers.getHeaderValue(MS_CLOUD_BUCKET);
818
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Missing S3 bucket header in reply.");
820
ms_headers.removeHeader(MS_CLOUD_BUCKET);
822
// Get the S3 object key
823
object_key = ms_headers.getHeaderValue(MS_CLOUD_OBJECT_KEY);
825
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Missing S3 object key header in reply.");
827
ms_headers.removeHeader(MS_CLOUD_OBJECT_KEY);
829
// Get the S3 blob signature
830
signature = ms_headers.getHeaderValue(MS_BLOB_SIGNATURE);
832
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Missing S3 blob signature in reply.");
835
ms_headers.removeHeader(MS_BLOB_SIGNATURE);
837
// Get the S3 blob signature date
838
signature_time = ms_headers.getHeaderValue(MS_BLOB_DATE);
840
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Missing S3 blob signature date in reply.");
841
push_(signature_time);
842
ms_headers.removeHeader(MS_BLOB_DATE);
844
content_type = ms_headers.getHeaderValue(CONTENT_TYPE);
849
input = CSCallbackInputStream::newStream(ms_readCB, ms_putCBData);
851
input = CSMemoryInputStream::newStream(ms_putData, ms_putDataLen);
853
// Send the BLOB to the cloud storage.
854
s3Headers = ms_cloud->s3_send(input, bucket->getCString(), object_key->getCString(), size, (content_type)?content_type->getCString():NULL, NULL, signature->getCString(), atol(signature_time->getCString()));
856
ms_addS3HeadersHeaders(s3Headers);
859
release_(content_type);
861
release_(signature_time);
863
release_(object_key);
869
//------------------------------------------------
870
void PBMS_ConHandle::ms_addS3HeadersHeaders(CSVector *s3Headers)
872
CSHTTPHeaders headers;
876
headers.setHeaders(s3Headers);
878
for (uint32_t i = 0; i < headers.numHeaders(); i++) {
879
CSHeader *h = headers.getHeader(i);
880
const char *name = h->getNameCString();
882
if (strcasecmp(name, "ETag") == 0){
883
const char *value = h->getValueCString();
884
uint32_t value_len = strlen(value);
890
if (value[value_len-1] == '"') {
894
ms_headers.removeHeader(MS_CHECKSUM_TAG);
896
ms_headers.addHeader(MS_CHECKSUM_TAG, CSString::newString(value, value_len));
899
ms_headers.removeHeader(name);
900
ms_headers.addHeader(h);
906
headers.clearHeaders();
910
headers.clearHeaders();
914
//------------------------------------------------
915
pbms_bool PBMS_ConHandle::ms_upLoadData(const char *table, const char *alias, const char *checksum, char *ref, size_t size, const u_char *data, PBMS_READ_CALLBACK_FUNC cb, void *caller_data)
917
pbms_bool ok = true, use_cloud = (ms_cloud != NULL);
926
ms_init_put_blob(size, table, alias, checksum, use_cloud);
927
set_upLoadUserData(data, size, cb, caller_data);
928
set_downLoadUserData((u_char*) ref, PBMS_BLOB_URL_SIZE -1); // Set things up to receive the BLOB ref back.
930
if (curl_easy_perform(ms_curl) && !ms_throw_error)
931
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
936
check_reply_status();
938
*ms_getBuffer =0; // null terminate the blob reference string.
939
ms_getCloudHeaders();
940
if (ms_cloud && use_cloud)
941
ms_sendCloudBLOB(size);
942
else if (use_cloud && !ms_cloud) {
943
// We were expecting to send the BLOB to the cloud but
944
// the server did not respond with cloud data. The retry will
945
// send the data to the PBMS server.
960
//------------------------------------------------
961
static void report_global_error(int err, const char *message, int line)
968
cs_strcpy(MS_RESULT_MESSAGE_SIZE, global_err_message, message);
969
snprintf(line_no, 32, ": line %d", line);
970
cs_strcat(MS_RESULT_MESSAGE_SIZE, global_err_message, line_no);
973
//------------------------------------------------
974
static void clear_global_error()
977
*global_err_message = 0;
980
//------------------------------------------------
981
pbms_bool pbms_library_init()
983
clear_global_error();
986
if (init_count == 1 ) {
989
CURLcode curl_code = curl_global_init(CURL_GLOBAL_ALL);
991
report_global_error(curl_code, curl_easy_strerror(curl_code) , __LINE__);
996
if (! CSThread::startUp()) {
997
report_global_error(ENOMEM, "CSThread::startUp() failed.", __LINE__);
1004
mslib_global_thread = new CSThread( NULL);
1005
CSThread::setSelf(mslib_global_thread);
1007
CSThread *self = mslib_global_thread;
1008
if (!mslib_global_thread) {
1009
report_global_error(ENOMEM, "new CSThread( NULL) failed.", __LINE__);
1011
CSThread::shutDown();
1016
pbms_thread_list = new CSThreadList();
1019
report_global_mse_error(self);
1021
mslib_global_thread->release();
1022
mslib_global_thread = NULL;
1024
CSThread::shutDown();
1029
return(init_count > 0);
1032
//------------------------------------------------
1033
void pbms_library_end()
1036
if (init_count == 1 ) {
1040
if (pbms_thread_list) {
1041
PBMS_ConHandle *con;
1042
while ((con = (PBMS_ConHandle *) pbms_thread_list->getFront())) {
1044
CSThread::detach(con);
1046
CSThread::setSelf(mslib_global_thread);
1047
pbms_thread_list->release();
1049
CSThread::setSelf(mslib_global_thread);
1052
mslib_global_thread->release();
1053
mslib_global_thread = NULL;
1055
CSThread::shutDown();
1057
curl_global_cleanup();
1064
//------------------------------------------------
1065
int pbms_errno(PBMS myhndl)
1067
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1070
return con->ms_errno;
1073
return global_errno;
1076
//------------------------------------------------
1077
const char *pbms_error(PBMS myhndl)
1079
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1082
return con->ms_err_message;
1085
return global_err_message;
1088
//------------------------------------------------
1089
PBMS pbms_connect(const char* host, unsigned int port, const char *database)
1091
PBMS_ConHandle *con = NULL;
1093
CLOBBER_PROTECT(con);
1095
clear_global_error();
1097
new_(con, PBMS_ConHandle());
1099
if ((!con) || !CSThread::attach(con)) {
1100
report_global_error(ENOMEM, "new PBMS_Ref() failed.", __LINE__);
1106
CSThread *self = con;
1109
con->ms_initConnection(host, port, database);
1113
report_global_mse_error(con);
1114
CSThread::detach(con);
1123
//------------------------------------------------
1124
void pbms_close(PBMS myhndl)
1126
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1128
// This will kill the 'self' thread so do not try and do any exception handling.
1130
CSThread::detach(con); // This will also release the connection.
1133
//------------------------------------------------
1134
pbms_bool pbms_is_blob_reference(PBMS myhndl, const char *ref)
1136
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1137
pbms_bool ok = false;
1144
if (PBMSBlobURLTools::couldBeURL(ref, &blob))
1153
//------------------------------------------------
1154
pbms_bool pbms_get_blob_size(PBMS myhndl, const char *ref, size_t *size)
1156
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1164
if (PBMSBlobURLTools::couldBeURL(ref, &blob)) {
1165
*size = blob.bu_blob_size;
1168
#ifdef HAVE_ALIAS_SUPPORT
1169
else { // Assume it is a BLOB alias
1170
CSVector *saved_metadata;
1171
CSString *data = NULL;
1173
saved_metadata = con->ms_headers.takeHeaders();
1175
con->ms_get_info(ref, true);
1176
data = con->ms_get_metadata(MS_BLOB_SIZE);
1177
*size = atol(data->getCString());
1182
con->report_error(self);
1184
con->ms_headers.setHeaders(saved_metadata);
1187
con->report_error(MS_ERR_INCORRECT_URL, "Invalid BLOB URL");
1196
* pbms_add_metadata() and pbms_clear_metadata() deal with metadata for outgoing BLOBs only.
1198
//------------------------------------------------
1199
pbms_bool pbms_add_metadata(PBMS myhndl, const char *name, const char *value)
1201
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1208
if (strlen(name) > MS_META_NAME_SIZE)
1209
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Metadata name too long.");
1210
if (strlen(value) > MS_META_VALUE_SIZE)
1211
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Metadata value too long.");
1213
con->ms_metadata_out.addHeader(name, value);
1219
con->report_error(self);
1227
//------------------------------------------------
1228
void pbms_clear_metadata(PBMS myhndl, const char *name)
1230
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1237
con->ms_metadata_out.removeHeader(name);
1239
con->ms_metadata_out.clearHeaders();
1243
con->report_error(self);
1251
* pbms_reset_metadata(), pbms_next_metadata() and pbms_get_metadata_value() deal with metadata for the last
1252
* BLOB received on the connection.
1254
//------------------------------------------------
1255
unsigned int pbms_reset_metadata(PBMS myhndl)
1257
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1258
unsigned int count = 0;
1264
count = con->ms_init_fetch();
1268
con->report_error(self);
1276
//------------------------------------------------
1277
pbms_bool pbms_next_metadata(PBMS myhndl, char *name, char *value, size_t *v_size)
1279
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1281
size_t null_size = MS_META_VALUE_SIZE;
1283
CLOBBER_PROTECT(v_size);
1289
v_size = &null_size;
1292
const char *m_name, *m_value;
1293
ok = con->ms_next(&m_name, &m_value);
1295
cs_strcpy(MS_META_NAME_SIZE, name, m_name);
1296
cs_strcpy(*v_size, value, m_value);
1298
if (*v_size <= strlen(m_value))
1299
*v_size = strlen(m_value) +1;
1305
con->report_error(self);
1313
//------------------------------------------------
1314
pbms_bool pbms_get_metadata_value(PBMS myhndl, const char *name, char *buffer, size_t *size)
1316
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1323
CSString *data = NULL;
1324
data = con->ms_get_metadata(name);
1327
cs_strcpy(*size, buffer, data->getCString());
1328
if (data->length() >= *size) {
1329
*size = data->length() +1;
1336
con->report_error(self);
1344
//------------------------------------------------
1345
pbms_bool pbms_get_md5_digest(PBMS myhndl, char *md5_digest)
1347
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1354
CSString *data = NULL;
1355
data = con->ms_get_checksum();
1357
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "No checksum found.");
1359
if (cs_hex_to_bin(16, md5_digest, data->length(), data->getCString()) != 16) {
1360
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Invalid MD5 Digest.");
1367
con->report_error(self);
1375
//------------------------------------------------
1376
//------------------------------------------------
1377
pbms_bool pbms_put_data(PBMS myhndl, const char *table, const char *checksum, char *ref, size_t size, const unsigned char *data)
1379
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1381
return con->ms_upLoadData(table, NULL, checksum, ref, size, data);
1383
//------------------------------------------------
1384
pbms_bool pbms_put_data_cb(PBMS myhndl, const char *table, const char *checksum, char *ref, size_t size, PBMS_READ_CALLBACK_FUNC cb, void *caller_data)
1386
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1388
return con->ms_upLoadData(table, NULL, checksum, ref, size, NULL, cb, caller_data);
1391
//------------------------------------------------
1392
pbms_bool pbms_get_data(PBMS myhndl, const char *ref, unsigned char *buffer, size_t buffer_size)
1394
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1396
return con->ms_downLoadData(ref, buffer, buffer_size);
1399
//------------------------------------------------
1400
pbms_bool pbms_get_data_range(PBMS myhndl, const char *ref, size_t start_offset, size_t end_offset, unsigned char *buffer, size_t buffer_size, size_t *data_size)
1402
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1404
con->ms_range_start = start_offset;
1405
con->ms_range_end = end_offset;
1407
if (con->ms_downLoadData(ref, buffer, buffer_size)) {
1409
*data_size = con->ms_getBuffer - buffer;
1416
//------------------------------------------------
1417
pbms_bool pbms_get_data_cb(PBMS myhndl, const char *ref, PBMS_WRITE_CALLBACK_FUNC cb, void *caller_data)
1419
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1421
return con->ms_downLoadData(ref, NULL, 0, cb, caller_data);
1424
//------------------------------------------------
1425
pbms_bool pbms_get_data_range_cb(PBMS myhndl, const char *ref, size_t start_offset, size_t end_offset, PBMS_WRITE_CALLBACK_FUNC cb, void *caller_data)
1427
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1429
con->ms_range_start = start_offset;
1430
con->ms_range_end = end_offset;
1432
return con->ms_downLoadData(ref, NULL, 0, cb, caller_data);
1435
//------------------------------------------------
1436
pbms_bool pbms_get_info(PBMS myhndl, const char *ref)
1438
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1439
pbms_bool ok = true;
1445
con->ms_get_info(ref, false);
1449
con->report_error(self);
1457
//------------------------------------------------
1458
pbms_bool pbms_set_option(PBMS myhndl, enum pbms_option option, const void *in_value)
1460
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1461
pbms_bool ok = true;
1469
case PBMS_OPTION_DATABASE: {
1470
CSString *database = CSString::newString((char *)in_value);
1471
con->ms_database->release();
1472
con->ms_database = database;
1476
case PBMS_OPTION_TRANSMITION_TIMEOUT:
1477
con->ms_transmition_timeout = *((unsigned int*)in_value);
1480
case PBMS_OPTION_HOST:
1481
case PBMS_OPTION_PORT:
1482
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Option is ReadOnly.");
1486
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Unknown Option.");
1492
con->report_error(self);
1498
//------------------------------------------------
1499
pbms_bool pbms_get_option(PBMS myhndl, enum pbms_option option, void *out_value)
1501
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1502
pbms_bool ok = true;
1510
case PBMS_OPTION_DATABASE:
1511
*((const char**)out_value) = con->ms_database->getCString();
1514
case PBMS_OPTION_TRANSMITION_TIMEOUT:
1515
*((unsigned int*)out_value) = con->ms_transmition_timeout;
1518
case PBMS_OPTION_HOST:
1519
*((const char**)out_value) = con->ms_host->getCString();
1522
case PBMS_OPTION_PORT:
1523
*((unsigned int*)out_value) = con->ms_port;
1527
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Unknown Option.");
1533
con->report_error(self);