1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream (PBMS)
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
* 2008-09-10 Barry Leslie
26
#include <curl/curl.h>
30
#include <libdrizzle/drizzle_client.h>
31
#define MYSQL drizzle_con_st
32
#define MYSQL_RES drizzle_result_st
33
#define MYSQL_ROW DRIZZLE_ROW
35
#define mysql_query drizzle_query
36
#define mysql_store_result drizzle_store_result
37
#define mysql_errno drizzle_errno
38
#define mysql_error drizzle_error
39
#define mysql_num_rows drizzle_num_rows
40
#define mysql_fetch_row drizzle_fetch_row
41
#define mysql_free_result drizzle_free_result
52
#include "CSStrUtil.h"
53
//#include "CSSocket.h"
54
#include "CSHTTPStream.h"
56
#include "CSS3Protocol.h"
58
#include "metadata_ms.h"
60
#define CLEAR_SELF() CSThread::setSelf(NULL)
61
#define MAX_STMT_SIZE 1024
63
static int global_errno;
64
static char global_err_message[MS_RESULT_MESSAGE_SIZE];
66
static unsigned long init_count = 0;
67
static CSThreadList *pbms_thread_list = NULL;
68
static CSThread *mslib_global_thread = NULL;
70
static void report_global_mse_error(CSThread *thd)
72
global_errno = thd->myException.getErrorCode();
73
cs_strcpy(MS_RESULT_MESSAGE_SIZE, global_err_message, thd->myException.getMessage());
76
#define DFLT_CONNECTION_TIMEOUT 10 // Changing this required a documentation update.
78
#define THROW_CURL_IF(v) { if (v) CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);}
79
//======================================
80
static size_t receive_data(void *ptr, size_t size, size_t nmemb, void *stream);
81
static size_t receive_header(void *ptr, size_t size, size_t nmemb, void *stream);
82
static size_t send_callback(void *ptr, size_t size, size_t nmemb, void *stream);
83
//--------------------------------------------
84
class PBMS_ConHandle:public CSThread {
87
// Fields that must be freed when the object is destroyed.
89
CSString *ms_database;
90
struct curl_slist *ms_header_list; // A curl list of headers to be sent with the next PUT.
91
struct curl_slist *ms_info_only; // A curl list of headers to be sent with Get.
92
struct curl_slist *ms_ping_header; // A curl list of headers to be sent with ping.
93
CSStringBuffer *ms_url_str;
95
CSStringBuffer *ms_buffer;
96
CSStringBuffer *ms_errorReply;
97
CSS3Protocol *ms_cloud;
98
uint64_t ms_range_start;
99
uint64_t ms_range_end;
102
CSThread(pbms_thread_list),
106
ms_transmition_timeout(0),
108
ms_header_list(NULL),
110
ms_ping_header(NULL),
116
ms_throw_error(false),
124
curl_easy_cleanup(ms_curl);
130
ms_database->release();
136
ms_buffer->release();
139
ms_errorReply->release();
142
ms_url_str->release();
145
curl_slist_free_all(ms_header_list);
148
curl_slist_free_all(ms_info_only);
151
curl_slist_free_all(ms_ping_header);
153
ms_headers.clearHeaders();
154
ms_metadata_out.clearHeaders();
157
unsigned int ms_replyStatus;
158
CSHTTPHeaders ms_headers;
159
CSHTTPHeaders ms_metadata_out;
160
u_int ms_next_header;
162
unsigned int ms_port;
163
unsigned int ms_transmition_timeout; // In the future this may have some effect but for now it is always be 0 (no timeout).
164
unsigned int ms_url_base_len;
165
bool ms_throw_error; // Gets set if an exception occurs in a callback.
168
char ms_err_message[MS_RESULT_MESSAGE_SIZE];
171
char ms_curl_error[CURL_ERROR_SIZE];
173
size_t ms_DataToBeTransfered;
174
// Get data caller parameters:
175
u_char *ms_getBuffer;
176
size_t ms_getBufferSize;
177
PBMS_WRITE_CALLBACK_FUNC ms_writeCB;
180
void set_downLoadUserData(u_char *buffer, size_t buffer_size, PBMS_WRITE_CALLBACK_FUNC cb = NULL, void *caller_data = NULL)
182
ms_DataToBeTransfered = buffer_size;
183
ms_getBuffer = buffer;
184
ms_getBufferSize = buffer_size;
186
ms_getCBData = caller_data;
189
// Put data caller parameters:
190
const u_char *ms_putData;
191
size_t ms_putDataLen;
192
size_t ms_putDataOffset;
193
PBMS_READ_CALLBACK_FUNC ms_readCB;
196
void set_upLoadUserData(const u_char *buffer, size_t size, PBMS_READ_CALLBACK_FUNC cb = NULL, void *caller_data = NULL)
198
ms_DataToBeTransfered = size;
200
ms_putDataLen = size;
203
ms_putCBData = caller_data;
207
void ms_initConnection(const char* host, unsigned int port, const char* database)
209
ms_curl = curl_easy_init();
211
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_easy_init() failed.");
213
if (curl_easy_setopt(ms_curl, CURLOPT_ERRORBUFFER, ms_curl_error))
214
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_easy_setopt(CURLOPT_ERRORBUFFER) failed.");
216
// Uncomment this line to trace network action during request. Very Usefull!!
217
//curl_easy_setopt(ms_curl, CURLOPT_VERBOSE, 1L);
219
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_TCP_NODELAY, 1L));
221
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_NOPROGRESS, 1L));
222
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_WRITEFUNCTION, receive_data));
223
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_READFUNCTION, send_callback));
224
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HEADERFUNCTION, receive_header));
226
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_WRITEDATA, this));
227
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_READDATA, this));
228
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_WRITEHEADER, this));
230
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_FOLLOWLOCATION, 1L)); // Follow redirects.
233
pbms_api_owner = true;
234
ms_host = CSString::newString(host);
235
ms_database = CSString::newString(database);
237
ms_url_str = new CSStringBuffer(50);
238
ms_url_str->append("http://");
239
ms_url_str->append(host);
240
ms_url_str->append(":");
241
ms_url_str->append(port);
242
ms_url_str->append("/");
243
ms_url_base_len = ms_url_str->length();
245
ms_buffer = new CSStringBuffer(50);
247
ms_info_only = curl_slist_append(ms_info_only, MS_BLOB_INFO_REQUEST ": yes");
249
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
251
ms_buffer->append(MS_PING_REQUEST": ");
252
ms_buffer->append(database);
253
ms_ping_header = curl_slist_append(ms_ping_header, ms_buffer->getCString());
255
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
257
ms_buffer->setLength(0);
268
void ms_init_put_blob(curl_off_t size, const char *table, const char *alias, const char *checksum, bool use_cloud);
270
void ms_init_get_blob(const char *ref, bool is_alias, bool info_only);
272
void ms_get_info(const char *ref, bool is_alias);
273
void ms_sendCloudBLOB(size_t size) ;
275
pbms_bool ms_downLoadData(const char *ref, u_char *buffer, size_t buffer_size, PBMS_WRITE_CALLBACK_FUNC cb = NULL, void *caller_data = NULL);
277
pbms_bool ms_upLoadData(const char *table, const char *alias, const char *checksum, char *ref, size_t size, const u_char *data, PBMS_READ_CALLBACK_FUNC cb = NULL, void *caller_data = NULL);
279
u_int ms_init_fetch() {ms_next_header =0; return ms_max_header = ms_headers.numHeaders();}
281
bool ms_next(const char **name, const char **value)
283
if (ms_next_header >= ms_max_header)
286
CSHeader *header = ms_headers.getHeader(ms_next_header++);
287
*name = header->getNameCString();
288
*value = header->getValueCString();
297
printf("Headers:\n");
298
printf("---------------------------------------\n");
299
while ( (header = ms_headers.getHeader(i++)) ) {
300
printf("%s : %s\n", header->getNameCString(), header->getValueCString());
303
printf("---------------------------------------\n");
306
void ms_addS3HeadersHeaders(CSVector *s3Headers);
308
CSString *ms_get_metadata(const char *name)
310
return ms_headers.getHeaderValue(name);
313
CSString *ms_get_alias() {return ms_get_metadata(MS_ALIAS_TAG);}
315
CSString *ms_get_checksum() {return ms_get_metadata(MS_CHECKSUM_TAG);}
317
void report_error(CSThread *self)
319
ms_errno = self->myException.getErrorCode();
320
cs_strcpy(MS_RESULT_MESSAGE_SIZE, ms_err_message, self->myException.getMessage());
323
void report_error(int err, const char *msg)
326
cs_strcpy(MS_RESULT_MESSAGE_SIZE, ms_err_message, msg);
329
void throw_http_reply_exception();
331
void check_reply_status()
333
switch (ms_replyStatus) {
335
//case 301: // Moved Permanently
336
//case 307: // Temporary Redirect
339
throw_http_reply_exception();
344
void ms_getCloudHeaders()
346
CSString *value = NULL;
351
value = ms_headers.getHeaderValue(MS_CLOUD_SERVER);
361
new_(ms_cloud, CSS3Protocol());
363
// Remove the cloud headers so they are not visable to the caller.
364
ms_headers.removeHeader(MS_CLOUD_SERVER);
366
ms_cloud->s3_setServer(value->getCString());
369
// Get the S3 public key
370
value = ms_headers.getHeaderValue(MS_CLOUD_KEY);
372
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Missing S3 public key in reply.");
375
ms_headers.removeHeader(MS_CLOUD_KEY);
376
ms_cloud->s3_setPublicKey(value->getCString());
384
// CURL callback functions:
385
////////////////////////////
386
static size_t receive_data(void *vptr, size_t objs, size_t obj_size, void *v_con)
388
PBMS_ConHandle *con = (PBMS_ConHandle*) v_con;
389
size_t data_len = objs * obj_size;
390
char *ptr = (char*)vptr;
392
if (con->ms_replyStatus >= 400) { // Collect the error reply.
395
if (!con->ms_errorReply)
396
con->ms_errorReply = new CSStringBuffer(50);
397
con->ms_errorReply->append(ptr, data_len);
400
con->ms_throw_error = true;
407
if (data_len > con->ms_DataToBeTransfered) { // This should never happen.
408
CSException::RecordException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Blob data overflow.");
409
con->ms_throw_error = true;
413
con->ms_DataToBeTransfered -= data_len;
415
return (con->ms_writeCB)(con->ms_getCBData, ptr, data_len, false);
417
memcpy(con->ms_getBuffer, ptr, data_len);
418
con->ms_getBuffer += data_len;
423
#define IS_REDIRECT(s) ((s >= 300) && (s < 400))
424
//----------------------
425
static size_t receive_header(void *header, size_t objs, size_t obj_size, void *v_con)
427
PBMS_ConHandle *con = (PBMS_ConHandle*) v_con;
428
size_t size = objs * obj_size;
429
char *end, *ptr = (char*) header, *value;
431
u_int name_len, value_len;
434
if (*(end -2) == '\r' && *(end -1) == '\n')
437
while ((end != ptr) && (*ptr == ' ')) ptr++;
441
// Get the reply status.
442
if (((!con->ms_replyStatus) || (con->ms_replyStatus == 100) || IS_REDIRECT(con->ms_replyStatus) ) && !strncasecmp(ptr, "HTTP", 4)) {
444
while ((end != ptr) && (*ptr != ' ')) ptr++; // skip HTTP stuff
445
while ((end != ptr) && (*ptr == ' ')) ptr++; // find the start of eh status code.
449
if (end < (ptr +3)) // expecting a 3 digit status code.
452
memcpy(status, ptr, 3);
455
con->ms_replyStatus = atoi(status);
459
while ((end != ptr) && (*ptr != ':')) ptr++;
462
name_len = ptr - name;
465
while ((end != ptr) && (*ptr == ' ')) ptr++;
470
value_len = end - value;
472
while (name[name_len-1] == ' ') name_len--;
473
while (value[value_len-1] == ' ') value_len--;
475
if (!strncasecmp(name, "Content-Length", 14)) {
478
memcpy(len, value, (value_len > 31)?31:value_len);
479
len[(value_len > 31)?31:value_len] = 0;
482
// If there is no callback then the data size is limited
483
// to the GetBuffer size.
484
if (con->ms_writeCB || (length < con->ms_getBufferSize))
485
con->ms_DataToBeTransfered = length;
492
if (!strncasecmp(name, "ETag", 4)) { // S3 checksum
493
name = MS_CHECKSUM_TAG;
494
name_len = strlen(MS_CHECKSUM_TAG);
499
if (value[value_len-1] == '"') {
502
con->ms_headers.removeHeader(MS_CHECKSUM_TAG);
504
con->ms_headers.addHeader(name, name_len, value, value_len);
508
con->ms_throw_error = true;
515
//----------------------
516
static size_t send_callback(void *ptr, size_t objs, size_t obj_size, void *v_con)
518
PBMS_ConHandle *con = (PBMS_ConHandle*) v_con;
519
char *buffer = (char*) ptr;
520
size_t data_sent = 0, buffer_size = objs * obj_size;
522
if (con->ms_putDataLen == 0)
526
data_sent = (con->ms_readCB)(con->ms_putCBData, buffer , buffer_size, false);
528
data_sent = (buffer_size < con->ms_putDataLen)? buffer_size: con->ms_putDataLen;
529
memcpy(buffer,con->ms_putData, data_sent);
530
con->ms_putData += data_sent;
532
con->ms_putDataLen -= data_sent;
537
#define CONTENT_TYPE "Content-Type"
538
//------------------------------------------------
539
void PBMS_ConHandle::ms_init_put_blob(curl_off_t size, const char *table, const char *alias, const char *checksum, bool use_cloud)
541
char buffer[MS_META_VALUE_SIZE + MS_META_NAME_SIZE +2];
542
int buffer_size = MS_META_VALUE_SIZE + MS_META_NAME_SIZE +2;
543
bool have_content_type = false;
545
ms_url_str->setLength(ms_url_base_len);
547
ms_url_str->append(ms_database->getCString());
549
ms_url_str->append("/");
550
ms_url_str->append(table);
553
// Remove any old headers
554
if (ms_header_list) {
555
curl_slist_free_all(ms_header_list);
556
ms_header_list = NULL;
559
// Add metadata headers.
562
while ( (header = ms_metadata_out.getHeader(i++)) ) {
563
cs_strcpy(buffer_size, buffer, header->getNameCString());
564
if (!strcasecmp( buffer, CONTENT_TYPE))
565
have_content_type = true;
566
cs_strcat(buffer_size, buffer, ':');
567
cs_strcat(buffer_size, buffer, header->getValueCString());
569
ms_header_list = curl_slist_append(ms_header_list, buffer);
571
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
574
if (!have_content_type) {
575
// Prevent CURLOPT_POST from adding a content type.
576
ms_header_list = curl_slist_append(ms_header_list, CONTENT_TYPE ":");
578
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
582
cs_strcpy(buffer_size, buffer, MS_CHECKSUM_TAG ":");
583
cs_strcat(buffer_size, buffer, checksum);
585
ms_header_list = curl_slist_append(ms_header_list, buffer);
587
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
591
#ifdef HAVE_ALIAS_SUPPORT
593
if (strlen(alias) > MS_META_VALUE_SIZE)
594
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "BLOB alias name too long.");
596
cs_strcpy(buffer_size, buffer, MS_ALIAS_TAG ":");
597
cs_strcat(buffer_size, buffer, alias);
599
ms_header_list = curl_slist_append(ms_header_list, buffer);
601
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
606
cs_strcpy(buffer_size, buffer, MS_BLOB_SIZE ":");
607
snprintf(buffer + strlen(buffer), buffer_size - strlen(buffer), "%"PRIu64"", size);
608
ms_header_list = curl_slist_append(ms_header_list, buffer);
610
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
612
size = 0; // The BLOB data is not being sent to the PBMS server
615
// Using CURLOPT_UPLOAD is VERY slow!
616
//THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_INFILESIZE_LARGE, size));
617
//THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_UPLOAD, 1L));
618
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_POSTFIELDSIZE_LARGE, size));
619
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_POST, 1L));
621
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, ms_header_list));
622
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_URL, ms_url_str->getCString() ));
625
ms_headers.clearHeaders();
627
ms_errorReply->setLength(0);
631
//------------------------------------------------
632
void PBMS_ConHandle::ms_init_get_blob(const char *ref, bool is_alias, bool info_only)
636
ms_url_str->setLength(ms_url_base_len);
638
#ifdef HAVE_ALIAS_SUPPORT
639
if (is_alias || !ms_parse_blob_url(&blob, ref)) {
640
ms_url_str->append(ms_database->getCString());
641
ms_url_str->append("/");
644
ms_url_str->append((char *) ref);
646
//THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_UPLOAD, 0L));
647
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_POST, 0L));
648
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, (info_only)?ms_info_only: NULL));
649
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_URL, ms_url_str->getCString()));
651
// NOTE: range 0-0 is valid, it returns the first byte.
652
if (ms_range_start <= ms_range_end) {
654
snprintf(range, 80, "%"PRIu64"-%"PRIu64"", ms_range_start, ms_range_end);
655
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_RANGE, range));
659
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_RANGE, NULL));
663
ms_headers.clearHeaders();
665
ms_errorReply->setLength(0);
668
//------------------------------------------------
669
void PBMS_ConHandle::ms_get_info(const char *ref, bool is_alias)
672
ms_init_get_blob(ref, is_alias, true);
674
if (curl_easy_perform(ms_curl) && !ms_throw_error)
675
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
680
check_reply_status();
685
//------------------------------------------------
686
void PBMS_ConHandle::ms_ping()
689
ms_url_str->setLength(ms_url_base_len);
691
// THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_UPLOAD, 0L));
692
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_POST, 0L));
693
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, ms_ping_header));
694
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_URL, ms_url_str->getCString()));
697
ms_headers.clearHeaders();
699
ms_errorReply->setLength(0);
701
if (curl_easy_perform(ms_curl) && !ms_throw_error)
702
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
707
check_reply_status();
709
// Check to see if the BLOBs in the database are stored in a cloud:
710
CSString *cloud_server;
711
cloud_server = ms_headers.getHeaderValue(MS_CLOUD_SERVER);
717
cloud_server->release();
719
new_(ms_cloud, CSS3Protocol());
725
//------------------------------------------------
726
void PBMS_ConHandle::throw_http_reply_exception()
728
CSString *reply = NULL, *error_text = NULL;
737
size = ms_errorReply->length();
740
error_text = CSString::newString("Missing HTTP reply: possible Media Stream engine connection failure.");
744
reply = CSString::newString(ms_errorReply);
745
ms_errorReply = NULL;
747
start = reply->locate(EXCEPTION_REPLY_MESSAGE_PREFIX_TAG, 1);
748
start += strlen(EXCEPTION_REPLY_MESSAGE_PREFIX_TAG);
749
end = reply->locate(EXCEPTION_REPLY_MESSAGE_SUFFIX_TAG, 1);
751
error_text = reply->substr(start, end - start);
757
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, error_text->getCString());
761
if (reply) reply->release();
762
if (error_text) error_text->release();
769
//------------------------------------------------
770
pbms_bool PBMS_ConHandle::ms_downLoadData(const char *ref, u_char *buffer, size_t buffer_size, PBMS_WRITE_CALLBACK_FUNC cb, void *caller_data)
778
set_downLoadUserData(buffer, buffer_size, cb, caller_data);
779
ms_init_get_blob(ref, false, false);
780
if (curl_easy_perform(ms_curl) && !ms_throw_error)
781
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
786
check_reply_status();
799
//------------------------------------------------
800
void PBMS_ConHandle::ms_sendCloudBLOB(size_t size)
802
CSInputStream *input;
803
CSString *bucket, *object_key, *content_type, *signature, *signature_time;
809
bucket = ms_headers.getHeaderValue(MS_CLOUD_BUCKET);
811
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Missing S3 bucket header in reply.");
813
ms_headers.removeHeader(MS_CLOUD_BUCKET);
815
// Get the S3 object key
816
object_key = ms_headers.getHeaderValue(MS_CLOUD_OBJECT_KEY);
818
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Missing S3 object key header in reply.");
820
ms_headers.removeHeader(MS_CLOUD_OBJECT_KEY);
822
// Get the S3 blob signature
823
signature = ms_headers.getHeaderValue(MS_BLOB_SIGNATURE);
825
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Missing S3 blob signature in reply.");
828
ms_headers.removeHeader(MS_BLOB_SIGNATURE);
830
// Get the S3 blob signature date
831
signature_time = ms_headers.getHeaderValue(MS_BLOB_DATE);
833
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Missing S3 blob signature date in reply.");
834
push_(signature_time);
835
ms_headers.removeHeader(MS_BLOB_DATE);
837
content_type = ms_headers.getHeaderValue(CONTENT_TYPE);
842
input = CSCallbackInputStream::newStream(ms_readCB, ms_putCBData);
844
input = CSMemoryInputStream::newStream(ms_putData, ms_putDataLen);
846
// Send the BLOB to the cloud storage.
847
s3Headers = ms_cloud->s3_send(input, bucket->getCString(), object_key->getCString(), size, (content_type)?content_type->getCString():NULL, NULL, signature->getCString(), atol(signature_time->getCString()));
849
ms_addS3HeadersHeaders(s3Headers);
852
release_(content_type);
854
release_(signature_time);
856
release_(object_key);
862
//------------------------------------------------
863
void PBMS_ConHandle::ms_addS3HeadersHeaders(CSVector *s3Headers)
865
CSHTTPHeaders headers;
869
headers.setHeaders(s3Headers);
871
for (u_int i = 0; i < headers.numHeaders(); i++) {
872
CSHeader *h = headers.getHeader(i);
873
const char *name = h->getNameCString();
875
if (strcasecmp(name, "ETag") == 0){
876
const char *value = h->getValueCString();
877
u_int value_len = strlen(value);
883
if (value[value_len-1] == '"') {
887
ms_headers.removeHeader(MS_CHECKSUM_TAG);
889
ms_headers.addHeader(MS_CHECKSUM_TAG, CSString::newString(value, value_len));
892
ms_headers.removeHeader(name);
893
ms_headers.addHeader(h);
899
headers.clearHeaders();
903
headers.clearHeaders();
907
//------------------------------------------------
908
pbms_bool PBMS_ConHandle::ms_upLoadData(const char *table, const char *alias, const char *checksum, char *ref, size_t size, const u_char *data, PBMS_READ_CALLBACK_FUNC cb, void *caller_data)
910
pbms_bool ok = true, use_cloud = (ms_cloud != NULL);
919
ms_init_put_blob(size, table, alias, checksum, use_cloud);
920
set_upLoadUserData(data, size, cb, caller_data);
921
set_downLoadUserData((u_char*) ref, PBMS_BLOB_URL_SIZE -1); // Set things up to receive the BLOB ref back.
923
if (curl_easy_perform(ms_curl) && !ms_throw_error)
924
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
929
check_reply_status();
931
*ms_getBuffer =0; // null terminate the blob reference string.
932
ms_getCloudHeaders();
933
if (ms_cloud && use_cloud)
934
ms_sendCloudBLOB(size);
935
else if (use_cloud && !ms_cloud) {
936
// We were expecting to send the BLOB to the cloud but
937
// the server did not respond with cloud data. The retry will
938
// send the data to the PBMS server.
953
//------------------------------------------------
954
static void report_global_error(int err, const char *message, int line)
961
cs_strcpy(MS_RESULT_MESSAGE_SIZE, global_err_message, message);
962
snprintf(line_no, 32, ": line %d", line);
963
cs_strcat(MS_RESULT_MESSAGE_SIZE, global_err_message, line_no);
966
//------------------------------------------------
967
static void clear_global_error()
970
*global_err_message = 0;
973
//------------------------------------------------
974
pbms_bool pbms_library_init()
976
clear_global_error();
979
if (init_count == 1 ) {
982
CURLcode curl_code = curl_global_init(CURL_GLOBAL_ALL);
984
report_global_error(curl_code, curl_easy_strerror(curl_code) , __LINE__);
989
if (! CSThread::startUp()) {
990
report_global_error(ENOMEM, "CSThread::startUp() failed.", __LINE__);
997
mslib_global_thread = new CSThread( NULL);
998
CSThread::setSelf(mslib_global_thread);
1000
CSThread *self = mslib_global_thread;
1001
if (!mslib_global_thread) {
1002
report_global_error(ENOMEM, "new CSThread( NULL) failed.", __LINE__);
1004
CSThread::shutDown();
1009
pbms_thread_list = new CSThreadList();
1012
report_global_mse_error(self);
1014
mslib_global_thread->release();
1015
mslib_global_thread = NULL;
1017
CSThread::shutDown();
1022
return(init_count > 0);
1025
//------------------------------------------------
1026
void pbms_library_end()
1029
if (init_count == 1 ) {
1033
if (pbms_thread_list) {
1034
PBMS_ConHandle *con;
1035
while (con = (PBMS_ConHandle *) pbms_thread_list->getFront()) {
1037
CSThread::detach(con);
1039
CSThread::setSelf(mslib_global_thread);
1040
pbms_thread_list->release();
1042
CSThread::setSelf(mslib_global_thread);
1045
mslib_global_thread->release();
1046
mslib_global_thread = NULL;
1048
CSThread::shutDown();
1050
curl_global_cleanup();
1057
//------------------------------------------------
1058
int pbms_errno(PBMS myhndl)
1060
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1063
return con->ms_errno;
1066
return global_errno;
1069
//------------------------------------------------
1070
const char *pbms_error(PBMS myhndl)
1072
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1075
return con->ms_err_message;
1078
return global_err_message;
1081
//------------------------------------------------
1082
PBMS pbms_connect(const char* host, unsigned int port, const char *database)
1084
PBMS_ConHandle *con = NULL;
1087
clear_global_error();
1089
new_(con, PBMS_ConHandle());
1091
if ((!con) || !CSThread::attach(con)) {
1092
report_global_error(ENOMEM, "new PBMS_Ref() failed.", __LINE__);
1098
CSThread *self = con;
1101
con->ms_initConnection(host, port, database);
1105
report_global_mse_error(con);
1106
CSThread::detach(con);
1115
//------------------------------------------------
1116
void pbms_close(PBMS myhndl)
1118
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1120
// This will kill the 'self' thread so do not try and do any exception handling.
1122
CSThread::detach(con); // This will also release the connection.
1125
//------------------------------------------------
1126
pbms_bool pbms_is_blob_reference(PBMS myhndl, const char *ref)
1128
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1129
pbms_bool ok = false;
1136
if (ms_parse_blob_url(&blob, (char *)ref))
1145
//------------------------------------------------
1146
pbms_bool pbms_get_blob_size(PBMS myhndl, const char *ref, size_t *size)
1148
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1156
if (ms_parse_blob_url(&blob, (char *)ref)) {
1157
*size = blob.bu_blob_size;
1160
#ifdef HAVE_ALIAS_SUPPORT
1161
else { // Assume it is a BLOB alias
1162
CSVector *saved_metadata;
1163
CSString *data = NULL;
1165
saved_metadata = con->ms_headers.takeHeaders();
1167
con->ms_get_info(ref, true);
1168
data = con->ms_get_metadata(MS_BLOB_SIZE);
1169
*size = atol(data->getCString());
1174
con->report_error(self);
1176
con->ms_headers.setHeaders(saved_metadata);
1179
con->report_error(MS_ERR_INCORRECT_URL, "Invalid BLOB URL");
1188
* pbms_add_metadata() and pbms_clear_metadata() deal with metadata for outgoing BLOBs only.
1190
//------------------------------------------------
1191
pbms_bool pbms_add_metadata(PBMS myhndl, const char *name, const char *value)
1193
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1200
if (strlen(name) > MS_META_NAME_SIZE)
1201
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Metadata name too long.");
1202
if (strlen(value) > MS_META_VALUE_SIZE)
1203
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Metadata value too long.");
1205
con->ms_metadata_out.addHeader(name, value);
1211
con->report_error(self);
1219
//------------------------------------------------
1220
void pbms_clear_metadata(PBMS myhndl, const char *name)
1222
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1229
con->ms_metadata_out.removeHeader(name);
1231
con->ms_metadata_out.clearHeaders();
1235
con->report_error(self);
1243
* pbms_reset_metadata(), pbms_next_metadata() and pbms_get_metadata_value() deal with metadata for the last
1244
* BLOB received on the connection.
1246
//------------------------------------------------
1247
unsigned int pbms_reset_metadata(PBMS myhndl)
1249
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1250
unsigned int count = 0;
1256
count = con->ms_init_fetch();
1260
con->report_error(self);
1268
//------------------------------------------------
1269
pbms_bool pbms_next_metadata(PBMS myhndl, char *name, char *value, size_t *v_size)
1271
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1273
size_t null_size = MS_META_VALUE_SIZE;
1279
v_size = &null_size;
1282
const char *m_name, *m_value;
1283
ok = con->ms_next(&m_name, &m_value);
1285
cs_strcpy(MS_META_NAME_SIZE, name, m_name);
1286
cs_strcpy(*v_size, value, m_value);
1288
if (*v_size <= strlen(m_value))
1289
*v_size = strlen(m_value) +1;
1295
con->report_error(self);
1303
//------------------------------------------------
1304
pbms_bool pbms_get_metadata_value(PBMS myhndl, const char *name, char *buffer, size_t *size)
1306
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1313
CSString *data = NULL;
1314
data = con->ms_get_metadata(name);
1317
cs_strcpy(*size, buffer, data->getCString());
1318
if (data->length() >= *size) {
1319
*size = data->length() +1;
1326
con->report_error(self);
1334
//------------------------------------------------
1335
pbms_bool pbms_get_md5_digest(PBMS myhndl, char *md5_digest)
1337
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1344
CSString *data = NULL;
1345
data = con->ms_get_checksum();
1347
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "No checksum found.");
1349
if (cs_hex_to_bin(16, md5_digest, data->length(), data->getCString()) != 16) {
1350
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Invalid MD5 Digest.");
1357
con->report_error(self);
1365
//------------------------------------------------
1366
//------------------------------------------------
1367
pbms_bool pbms_put_data(PBMS myhndl, const char *table, const char *checksum, char *ref, size_t size, const unsigned char *data)
1369
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1371
return con->ms_upLoadData(table, NULL, checksum, ref, size, data);
1373
//------------------------------------------------
1374
pbms_bool pbms_put_data_cb(PBMS myhndl, const char *table, const char *checksum, char *ref, size_t size, PBMS_READ_CALLBACK_FUNC cb, void *caller_data)
1376
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1378
return con->ms_upLoadData(table, NULL, checksum, ref, size, NULL, cb, caller_data);
1381
//------------------------------------------------
1382
pbms_bool pbms_get_data(PBMS myhndl, const char *ref, unsigned char *buffer, size_t buffer_size)
1384
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1386
return con->ms_downLoadData(ref, buffer, buffer_size);
1389
//------------------------------------------------
1390
pbms_bool pbms_get_data_range(PBMS myhndl, const char *ref, size_t start_offset, size_t end_offset, unsigned char *buffer, size_t buffer_size, size_t *data_size)
1392
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1394
con->ms_range_start = start_offset;
1395
con->ms_range_end = end_offset;
1397
if (con->ms_downLoadData(ref, buffer, buffer_size)) {
1399
*data_size = con->ms_getBuffer - buffer;
1406
//------------------------------------------------
1407
pbms_bool pbms_get_data_cb(PBMS myhndl, const char *ref, PBMS_WRITE_CALLBACK_FUNC cb, void *caller_data)
1409
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1411
return con->ms_downLoadData(ref, NULL, 0, cb, caller_data);
1414
//------------------------------------------------
1415
pbms_bool pbms_get_data_range_cb(PBMS myhndl, const char *ref, size_t start_offset, size_t end_offset, PBMS_WRITE_CALLBACK_FUNC cb, void *caller_data)
1417
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1419
con->ms_range_start = start_offset;
1420
con->ms_range_end = end_offset;
1422
return con->ms_downLoadData(ref, NULL, 0, cb, caller_data);
1425
//------------------------------------------------
1426
pbms_bool pbms_get_info(PBMS myhndl, const char *ref)
1428
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1429
pbms_bool ok = true;
1435
con->ms_get_info(ref, false);
1439
con->report_error(self);
1447
//------------------------------------------------
1448
pbms_bool pbms_set_option(PBMS myhndl, enum pbms_option option, const void *in_value)
1450
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1451
pbms_bool ok = true;
1459
case PBMS_OPTION_DATABASE: {
1460
CSString *database = CSString::newString((char *)in_value);
1461
con->ms_database->release();
1462
con->ms_database = database;
1466
case PBMS_OPTION_TRANSMITION_TIMEOUT:
1467
con->ms_transmition_timeout = *((unsigned int*)in_value);
1470
case PBMS_OPTION_HOST:
1471
case PBMS_OPTION_PORT:
1472
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Option is ReadOnly.");
1476
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Unknown Option.");
1482
con->report_error(self);
1488
//------------------------------------------------
1489
pbms_bool pbms_get_option(PBMS myhndl, enum pbms_option option, void *out_value)
1491
PBMS_ConHandle *con = (PBMS_ConHandle*) myhndl;
1492
pbms_bool ok = true;
1500
case PBMS_OPTION_DATABASE:
1501
*((const char**)out_value) = con->ms_database->getCString();
1504
case PBMS_OPTION_TRANSMITION_TIMEOUT:
1505
*((unsigned int*)out_value) = con->ms_transmition_timeout;
1508
case PBMS_OPTION_HOST:
1509
*((const char**)out_value) = con->ms_host->getCString();
1512
case PBMS_OPTION_PORT:
1513
*((unsigned int*)out_value) = con->ms_port;
1517
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Unknown Option.");
1523
con->report_error(self);