1
/* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
* Created by Barry Leslie on 10/02/09.
26
#include <curl/curl.h>
30
#include "CSStrUtil.h"
32
#include "CSS3Protocol.h"
36
//#define SHOW_SIGNING
37
// Uncomment this line to trace network action during request. Very Usefull!!
43
//#define SHOW_SIGNING
45
#define HEX_CHECKSUM_VALUE_SIZE (2 *CHECKSUM_VALUE_SIZE)
47
#define THROW_CURL_IF(v) { if (v) CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);}
49
//-------------------------------
50
static const char *retryCodes[] = {
59
//======================================
60
static size_t receive_data(void *ptr, size_t size, size_t nmemb, void *stream);
61
static size_t receive_header(void *ptr, size_t size, size_t nmemb, void *stream);
62
static size_t send_callback(void *ptr, size_t size, size_t nmemb, void *stream);
64
class S3ProtocolCon : CSXMLBuffer, public CSObject {
68
virtual bool openNode(char *path, char *value) {
69
if (value && *value && (strcmp(path,"/error/code/") == 0)) {
70
printf("S3 ERROR Code: %s\n", value);
71
for (int i = 0; retryCodes[i] && !ms_retry; i++)
72
ms_retry = (strcmp(value, retryCodes[i]) == 0);
74
if (ms_retry && !strcmp("slowdown", value))
76
} else if (value && *value && (strcmp(path,"/error/message/") == 0)) {
77
printf("S3 ERROR MESSAGE: %s\n", value);
82
virtual bool closeNode(char *path) {
87
virtual bool addAttribute(char *path, char *name, char *value) {
94
//-------------------------------
100
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Missing HTTP reply: possible S3 connection failure.");
103
printf("ms_errorReply:\n===========\n%s\n===========\n", ms_errorReply->getCString());
106
if (!parseData(ms_errorReply->getCString(), ms_errorReply->length(), 0)){
110
getError(&err, &msg);
111
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, msg);
119
CSHTTPHeaders ms_reply_headers;
120
CSStringBuffer ms_buffer; // A scratch buffer
123
struct curl_slist *ms_header_list; // A curl list of headers to be sent with the next request.
125
CSInputStream *ms_inputStream;
126
CSOutputStream *ms_outputStream;
129
char ms_s3Checksum[HEX_CHECKSUM_VALUE_SIZE +1];
130
bool ms_calculate_md5;
132
bool ms_notFound; // True if the object could not be found
133
bool ms_retry; // True if the request failed with a retry error.
136
CSStringBuffer *ms_errorReply;
137
char ms_curl_error[CURL_ERROR_SIZE];
139
off64_t ms_data_size;
141
unsigned int ms_replyStatus;
142
bool ms_throw_error; // Gets set if an exception occurs in a callback.
145
time_t ms_last_modified;
149
ms_header_list(NULL),
150
ms_inputStream(NULL),
151
ms_outputStream(NULL),
152
ms_calculate_md5(false),
159
ms_throw_error(false),
160
ms_old_libcurl(false),
165
ms_curl = curl_easy_init();
167
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_easy_init() failed.");
169
curl_version_info_data *curl_ver = curl_version_info(CURLVERSION_NOW);
171
// libCurl versions prior to 7.17.0 did not make copies of strings passed into curl_easy_setopt()
172
// If this version requirement is a problem I can do this myself, if I have to, I guess. :(
173
if (curl_ver->version_num < 0X071700 ) {
174
ms_old_libcurl = true;
177
//snprintf(msg, 200, "libcurl version %s is too old, require version 7.17.0 or newer.", curl_ver->version);
178
//CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, msg);
181
if (curl_easy_setopt(ms_curl, CURLOPT_ERRORBUFFER, ms_curl_error))
182
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_easy_setopt(CURLOPT_ERRORBUFFER) failed.");
185
curl_easy_setopt(ms_curl, CURLOPT_VERBOSE, 1L);
187
//THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_TCP_NODELAY, 1L));
190
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_NOPROGRESS, 1L));
191
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_WRITEFUNCTION, receive_data));
192
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_READFUNCTION, send_callback));
193
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HEADERFUNCTION, receive_header));
194
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_WRITEDATA, this));
195
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_READDATA, this));
196
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_WRITEHEADER, this));
199
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_FOLLOWLOCATION, 1L)); // Follow redirects.
206
curl_easy_cleanup(ms_curl);
208
curl_slist_free_all(ms_header_list);
210
ms_inputStream->release();
212
ms_outputStream->release();
214
ms_errorReply->release();
216
ms_reply_headers.clearHeaders();
219
cs_free(ms_safe_url);
222
inline void check_reply_status()
224
if (ms_replyStatus > 199 && ms_replyStatus < 300)
229
switch (ms_replyStatus) {
231
case 204: // No Content
232
//case 301: // Moved Permanently
233
//case 307: // Temporary Redirect
235
case 404: // Not Found
236
case 403: // Forbidden (S3 object not found)
249
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_errorReply->getCString());
251
} else if (ms_slowDown) {
253
CSException::logException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 slow down request.");
254
self->sleep(10); // sleep for 1/100 second.
263
inline void ms_reset()
265
// Remove any old headers
266
if (ms_header_list) {
267
curl_slist_free_all(ms_header_list);
268
ms_header_list = NULL;
271
ms_reply_headers.clearHeaders();
273
ms_throw_error = false;
275
ms_errorReply->setLength(0);
277
ms_s3Checksum[0] = 0;
281
if (ms_outputStream) {
282
ms_outputStream->release();
283
ms_outputStream = NULL;
285
if (ms_inputStream) {
286
ms_inputStream->release();
287
ms_inputStream = NULL;
291
cs_free(ms_safe_url);
296
inline void ms_setHeader(const char *header)
298
ms_header_list = curl_slist_append(ms_header_list, header);
300
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
305
inline const char *safe_url(const char *url)
307
if (ms_old_libcurl == false)
311
cs_free(ms_safe_url);
314
ms_safe_url = cs_strdup(url);
319
inline void ms_setURL(const char *url)
321
//printf("URL: \"%s\n", url);
322
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_URL, safe_url(url)));
325
inline void ms_execute_delete_request()
329
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, ms_header_list));
330
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_CUSTOMREQUEST, "DELETE"));
332
rtc = curl_easy_perform(ms_curl);
334
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_CUSTOMREQUEST, NULL)); // IMPORTANT: Reset this to it's default value
336
if (rtc && !ms_throw_error)
337
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
339
if (ms_throw_error) {
345
check_reply_status();
348
inline void ms_execute_copy_request()
352
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, ms_header_list));
353
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_INFILESIZE_LARGE, 0));
354
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_UPLOAD, 1L));
356
rtc = curl_easy_perform(ms_curl);
358
if (rtc && !ms_throw_error)
359
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
361
if (ms_throw_error) {
367
check_reply_status();
370
inline void ms_execute_get_request(CSOutputStream *output)
376
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPGET, 1L));
378
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_NOBODY, 1L));
382
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_FILETIME, 1L));
383
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, ms_header_list));
384
// Ask curl to parse the Last-Modified header. This is easier than
385
// parsing it ourselves.
387
ms_outputStream = output;
388
if (curl_easy_perform(ms_curl) && !ms_throw_error) {
389
ms_outputStream = NULL;
390
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
392
ms_outputStream = NULL;
400
check_reply_status();
401
curl_easy_getinfo(ms_curl, CURLINFO_FILETIME, &ms_last_modified);
404
inline void ms_execute_put_request(CSInputStream *input, off64_t size)
409
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, ms_header_list));
410
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_INFILESIZE_LARGE, size));
411
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_UPLOAD, 1L));
412
//THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_POSTFIELDSIZE_LARGE, size));
413
//THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_POST, 1L));
418
ms_inputStream = input;
419
if (curl_easy_perform(ms_curl) && !ms_throw_error) {
420
ms_inputStream = NULL;
421
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
423
ms_inputStream = NULL;
430
check_reply_status();
432
if (ms_calculate_md5) {
433
// If the data was not sent with an md5 checksum then verify
434
// the server's md5 value with the one calculated during the send.
435
char checksum[HEX_CHECKSUM_VALUE_SIZE +1];
438
ms_md5.md5_get_digest(&digest);
439
cs_bin_to_hex(HEX_CHECKSUM_VALUE_SIZE +1, checksum, CHECKSUM_VALUE_SIZE, digest.val);
441
cs_strToUpper(ms_s3Checksum);
442
if (strcmp(checksum, ms_s3Checksum)) {
443
// The request should be restarted in this case.
445
CSException::logException(CS_CONTEXT, CS_ERR_CHECKSUM_ERROR, "Calculated checksum did not match S3 checksum");
454
//======================================
459
//======================================
460
CSS3Protocol::CSS3Protocol():
463
s3_private_key(NULL),
467
new_(s3_server, CSStringBuffer());
468
s3_server->append("s3.amazonaws.com/");
470
s3_public_key = CSString::newString("");
471
s3_private_key = CSString::newString("");
476
CSS3Protocol::~CSS3Protocol()
479
s3_server->release();
482
s3_public_key->release();
485
s3_private_key->release();
489
CSString *CSS3Protocol::s3_getSignature(const char *verb,
491
const char *content_type,
498
CSStringBuffer *s3_buffer;
503
new_(s3_buffer, CSStringBuffer());
506
s3_buffer->setLength(0);
507
s3_buffer->append(verb);
508
s3_buffer->append("\n");
509
if (md5) s3_buffer->append(md5);
510
s3_buffer->append("\n");
511
if (content_type) s3_buffer->append(content_type);
512
s3_buffer->append("\n");
513
s3_buffer->append(date);
515
// Note: headers are assumed to be in lower case, sorted, and containing no white space.
516
s3_buffer->append("\n");
517
s3_buffer->append(headers->getCString());
519
s3_buffer->append("\n/");
520
s3_buffer->append(bucket);
521
s3_buffer->append("/");
522
s3_buffer->append(key);
525
printf("signing:\n=================\n%s\n=================\n", s3_buffer->getCString());
526
printf("Public Key:\"%s\"\n", s3_public_key->getCString());
527
printf("Private Key:\"%s\"\n", s3_private_key->getCString());
529
const char *ptr = s3_buffer->getCString();
531
printf("%x ", *ptr); ptr++;
537
CSString *sig = signature(s3_buffer->getCString(), s3_private_key->getCString());
544
//----------------------
545
// CURL callback functions:
546
////////////////////////////
547
//----------------------
549
static bool try_ReadStream(CSThread *self, S3ProtocolCon *con, unsigned char *ptr, size_t buffer_size, size_t *data_sent)
551
volatile bool rtc = true;
553
*data_sent = con->ms_inputStream->read((char*)ptr, buffer_size);
554
if (*data_sent <= con->ms_data_size) {
555
con->ms_data_size -= *data_sent;
557
con->ms_md5.md5_append(ptr, *data_sent); // Calculating the checksum for the data sent.
558
} else if (*data_sent > con->ms_data_size)
559
CSException::RecordException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Blob larger than expected.");
560
else if (con->ms_data_size && !*data_sent)
561
CSException::RecordException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Blob smaller than expected.");
570
//----------------------
571
static size_t send_callback(void *ptr, size_t objs, size_t obj_size, void *v_con)
573
S3ProtocolCon *con = (S3ProtocolCon*) v_con;
574
size_t data_sent, buffer_size = objs * obj_size;
576
if (!con->ms_data_size)
580
if (try_ReadStream(self, con, (unsigned char*)ptr, buffer_size, &data_sent)) {
581
con->ms_throw_error = true;
582
data_sent = (size_t)-1;
589
static bool try_WriteStream(CSThread *self, S3ProtocolCon *con, char *ptr, size_t data_len)
591
volatile bool rtc = true;
593
if (con->ms_replyStatus >= 400) { // Collect the error reply.
594
if (!con->ms_errorReply)
595
con->ms_errorReply = new CSStringBuffer(50);
596
con->ms_errorReply->append(ptr, data_len);
597
} else if ( con->ms_outputStream)
598
con->ms_outputStream->write(ptr, data_len);
607
//----------------------
608
static size_t receive_data(void *vptr, size_t objs, size_t obj_size, void *v_con)
610
S3ProtocolCon *con = (S3ProtocolCon*) v_con;
611
size_t data_len = objs * obj_size;
614
if (try_WriteStream(self, con, (char*)vptr, data_len)) {
615
con->ms_throw_error = true;
616
data_len = (size_t)-1;
622
#define IS_REDIRECT(s) ((s >= 300) && (s < 400))
623
//----------------------
624
static bool try_addHeader(CSThread *self, S3ProtocolCon *con, char *name, uint32_t name_len, char *value, uint32_t value_len)
626
volatile bool rtc = true;
629
con->ms_reply_headers.addHeader(name, name_len, value, value_len);
638
//----------------------
639
static size_t receive_header(void *header, size_t objs, size_t obj_size, void *v_con)
641
S3ProtocolCon *con = (S3ProtocolCon*) v_con;
642
size_t size = objs * obj_size;
643
char *end, *ptr = (char*) header, *name, *value = NULL;
644
uint32_t name_len =0, value_len = 0;
646
//printf( "receive_header: %s\n", ptr);
648
if (*(end -2) == '\r' && *(end -1) == '\n')
651
while ((end != ptr) && (*ptr == ' ')) ptr++;
655
// Get the reply status.
656
// Status 100 = Continue
657
if (((!con->ms_replyStatus) || (con->ms_replyStatus == 100) || IS_REDIRECT(con->ms_replyStatus) )
658
&& !strncasecmp(ptr, "HTTP", 4)
661
while ((end != ptr) && (*ptr != ' ')) ptr++; // skip HTTP stuff
662
while ((end != ptr) && (*ptr == ' ')) ptr++; // find the start of eh status code.
666
if (end < (ptr +3)) // expecting a 3 digit status code.
669
memcpy(status, ptr, 3);
672
con->ms_replyStatus = atoi(status);
676
while ((end != ptr) && (*ptr != ':')) ptr++;
679
name_len = ptr - name;
682
while ((end != ptr) && (*ptr == ' ')) ptr++;
687
value_len = end - value;
689
while (name[name_len-1] == ' ') name_len--;
690
while (value[value_len-1] == ' ') value_len--;
692
if (!strncasecmp(name, "ETag", 4)) {
694
value++; value_len -=2; // Strip quotation marks from checksum string.
696
if (value_len == HEX_CHECKSUM_VALUE_SIZE) {
697
memcpy(con->ms_s3Checksum, value, value_len);
698
con->ms_s3Checksum[value_len] = 0;
703
if (try_addHeader(self, con, name, name_len, value, value_len)) {
704
con->ms_throw_error = true;
710
//----------------------
712
#define SET_DATE_FROM_TIME(t, d) {strftime(d, sizeof(d), "%a, %d %b %Y %H:%M:%S GMT", gmtime(&t));}
713
#define SET_DATE(d) {time_t t = time(NULL); SET_DATE_FROM_TIME(t, d);}
715
bool CSS3Protocol::s3_delete(const char *bucket, const char *key)
717
CSStringBuffer *s3_buffer;
719
CSString *signed_str;
720
uint32_t retry_count = 0;
721
S3ProtocolCon *con_data;
725
new_(s3_buffer, CSStringBuffer());
728
new_(con_data, S3ProtocolCon());
732
// Clear old settings.
733
con_data->ms_reset();
738
s3_buffer->setLength(0);
739
s3_buffer->append("http://");
740
s3_buffer->append(bucket);
741
s3_buffer->append(".");
742
s3_buffer->append(s3_server->getCString());
743
s3_buffer->append(key);
745
con_data->ms_setURL(s3_buffer->getCString());
747
// Add the 'DATE' header
748
s3_buffer->setLength(0);
749
s3_buffer->append("Date: ");
750
s3_buffer->append(date);
751
con_data->ms_setHeader(s3_buffer->getCString());
753
// Create the authentication signature and add the 'Authorization' header
754
signed_str = s3_getSignature("DELETE", NULL, NULL, date, bucket, key);
756
s3_buffer->setLength(0);
757
s3_buffer->append("Authorization: AWS ");
758
s3_buffer->append(s3_public_key->getCString());
759
s3_buffer->append(":");
760
s3_buffer->append(signed_str->getCString());
761
release_(signed_str); signed_str = NULL;
763
con_data->ms_setHeader(s3_buffer->getCString());
765
con_data->ms_execute_delete_request();
767
if (con_data->ms_retry) {
768
if (retry_count == s3_maxRetries) {
769
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
771
//printf("RETRY: s3_delete()\n");
773
self->sleep(s3_sleepTime);
777
bool notFound = con_data->ms_notFound;
784
//-------------------------------
785
void CSS3Protocol::s3_copy(const char *dest_server, const char *dest_bucket, const char *dest_key, const char *src_bucket, const char *src_key)
787
CSStringBuffer *s3_buffer;
789
CSString *signed_str;
790
uint32_t retry_count = 0;
791
S3ProtocolCon *con_data;
795
new_(s3_buffer, CSStringBuffer());
798
new_(con_data, S3ProtocolCon());
802
dest_server = s3_server->getCString();
805
// Clear old settings.
806
con_data->ms_reset();
811
s3_buffer->setLength(0);
812
s3_buffer->append("http://");
813
s3_buffer->append(dest_bucket);
814
s3_buffer->append(".");
815
s3_buffer->append(s3_server->getCString());
816
s3_buffer->append(dest_key);
818
con_data->ms_setURL(s3_buffer->getCString());
820
// Add the destination location
821
s3_buffer->setLength(0);
822
s3_buffer->append("Host: ");
823
s3_buffer->append(dest_bucket);
824
s3_buffer->append(".");
825
s3_buffer->append(dest_server);
826
s3_buffer->setLength(s3_buffer->length() -1); // trim the '/'
827
con_data->ms_setHeader(s3_buffer->getCString());
829
// Add the source location
830
s3_buffer->setLength(0);
831
s3_buffer->append("x-amz-copy-source:");
832
s3_buffer->append(src_bucket);
833
s3_buffer->append("/");
834
s3_buffer->append(src_key);
835
con_data->ms_setHeader(s3_buffer->getCString());
837
// Create the authentication signature and add the 'Authorization' header
838
signed_str = s3_getSignature("PUT", NULL, NULL, date, dest_bucket, dest_key, CSString::newString(s3_buffer->getCString()));
841
// Add the 'DATE' header
842
s3_buffer->setLength(0);
843
s3_buffer->append("Date: ");
844
s3_buffer->append(date);
845
con_data->ms_setHeader(s3_buffer->getCString());
848
s3_buffer->setLength(0);
849
s3_buffer->append("Authorization: AWS ");
850
s3_buffer->append(s3_public_key->getCString());
851
s3_buffer->append(":");
852
s3_buffer->append(signed_str->getCString());
853
release_(signed_str); signed_str = NULL;
854
con_data->ms_setHeader(s3_buffer->getCString());
856
con_data->ms_execute_copy_request();
858
if (con_data->ms_notFound) {
859
s3_buffer->setLength(0);
860
s3_buffer->append("Cloud copy failed, object not found: ");
861
s3_buffer->append(src_bucket);
862
s3_buffer->append(" ");
863
s3_buffer->append(src_key);
864
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, s3_buffer->getCString());
867
if (con_data->ms_retry) {
868
if (retry_count == s3_maxRetries) {
869
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
871
//printf("RETRY: s3_copy()\n");
873
self->sleep(s3_sleepTime);
884
//-------------------------------
885
CSVector *CSS3Protocol::s3_receive(CSOutputStream *output, const char *bucket, const char *key, bool *found, S3RangePtr range, time_t *last_modified)
887
CSStringBuffer *s3_buffer;
889
CSString *signed_str;
890
uint32_t retry_count = 0;
891
S3ProtocolCon *con_data;
892
CSVector *replyHeaders;
893
CSString *range_header = NULL;
904
new_(s3_buffer, CSStringBuffer());
907
new_(con_data, S3ProtocolCon());
911
// Clear old settings.
912
con_data->ms_reset();
917
s3_buffer->setLength(0);
918
s3_buffer->append("http://");
919
s3_buffer->append(bucket);
920
s3_buffer->append(".");
921
s3_buffer->append(s3_server->getCString());
922
s3_buffer->append(key);
924
con_data->ms_setURL(s3_buffer->getCString());
926
// Add the 'DATE' header
927
s3_buffer->setLength(0);
928
s3_buffer->append("Date: ");
929
s3_buffer->append(date);
930
con_data->ms_setHeader(s3_buffer->getCString());
934
snprintf(buffer, 80,"Range: bytes=%"PRIu64"-%"PRIu64, range->startByte, range->endByte);
936
range_header = CSString::newString(buffer);
938
// Create the authentication signature and add the 'Authorization' header
940
con_data->ms_setHeader(range_header->getCString());
941
signed_str = s3_getSignature(http_op, NULL, NULL, date, bucket, key, NULL);
943
s3_buffer->setLength(0);
944
s3_buffer->append("Authorization: AWS ");
945
s3_buffer->append(s3_public_key->getCString());
946
s3_buffer->append(":");
947
s3_buffer->append(signed_str->getCString());
948
release_(signed_str); signed_str = NULL;
949
con_data->ms_setHeader(s3_buffer->getCString());
951
if (output) output->retain();
952
con_data->ms_execute_get_request(output);
954
if (con_data->ms_retry) {
955
if (retry_count == s3_maxRetries) {
956
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
958
//printf("RETRY: s3_receive()\n");
961
self->sleep(s3_sleepTime);
966
*last_modified = con_data->ms_last_modified;
967
*found = !con_data->ms_notFound;
968
replyHeaders = con_data->ms_reply_headers.takeHeaders();
974
return_(replyHeaders);
977
class S3ListParser : public CSXMLBuffer {
983
bool parseListData(const char *data, size_t len, CSVector *keys)
986
return parseData(data, len, 0);
990
virtual bool openNode(char *path, char *value) {
991
if (value && *value && (strcmp(path,"/listbucketresult/contents/key/") == 0))
992
list->add(CSString::newString(value));
996
virtual bool closeNode(char *path) {
1001
virtual bool addAttribute(char *path, char *name, char *value) {
1010
//-------------------------------
1011
static CSVector *parse_s3_list(CSMemoryOutputStream *output)
1013
S3ListParser s3ListParser;
1022
new_(vector, CSVector(10));
1025
data = (const char *) output->getMemory(&len);
1026
if (!s3ListParser.parseListData(data, len, vector)) {
1030
s3ListParser.getError(&err, &msg);
1031
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, msg);
1040
//-------------------------------
1041
CSVector *CSS3Protocol::s3_list(const char *bucket, const char *key_prefix, uint32_t max)
1043
CSStringBuffer *s3_buffer;
1045
CSString *signed_str;
1046
CSMemoryOutputStream *output;
1047
uint32_t retry_count = 0;
1048
S3ProtocolCon *con_data;
1051
new_(s3_buffer, CSStringBuffer());
1054
output = CSMemoryOutputStream::newStream(1024, 1024);
1057
new_(con_data, S3ProtocolCon());
1062
// Clear old settings.
1063
con_data->ms_reset();
1068
s3_buffer->setLength(0);
1069
s3_buffer->append("http://");
1070
s3_buffer->append(bucket);
1071
s3_buffer->append(".");
1072
s3_buffer->append(s3_server->getCString());
1073
//s3_buffer->append("/");
1074
//s3_buffer->append(bucket);
1076
s3_buffer->append("?prefix=");
1077
s3_buffer->append(key_prefix);
1082
s3_buffer->append("&max-keys=");
1084
s3_buffer->append("?max-keys=");
1085
s3_buffer->append(max);
1088
con_data->ms_setURL(s3_buffer->getCString());
1090
// Add the 'DATE' header
1091
s3_buffer->setLength(0);
1092
s3_buffer->append("Date: ");
1093
s3_buffer->append(date);
1094
con_data->ms_setHeader(s3_buffer->getCString());
1096
// Create the authentication signature and add the 'Authorization' header
1097
signed_str = s3_getSignature("GET", NULL, NULL, date, bucket, "");
1099
s3_buffer->setLength(0);
1100
s3_buffer->append("Authorization: AWS ");
1101
s3_buffer->append(s3_public_key->getCString());
1102
s3_buffer->append(":");
1103
s3_buffer->append(signed_str->getCString());
1104
release_(signed_str); signed_str = NULL;
1105
con_data->ms_setHeader(s3_buffer->getCString());
1107
con_data->ms_execute_get_request(RETAIN(output));
1109
if (con_data->ms_retry) {
1110
if (retry_count == s3_maxRetries) {
1111
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
1113
//printf("RETRY: s3_list()\n");
1116
self->sleep(s3_sleepTime);
1122
release_(s3_buffer);
1123
return_(parse_s3_list(output));
1126
//-------------------------------
1127
CSString *CSS3Protocol::s3_getAuthorization(const char *bucket, const char *key, const char *content_type, uint32_t *s3AuthorizationTime)
1130
CSString *signed_str;
1136
content_type = "binary/octet-stream";
1138
sys_time = time(NULL);
1140
*s3AuthorizationTime = (uint32_t)sys_time;
1142
SET_DATE_FROM_TIME(sys_time, date);
1143
signed_str = s3_getSignature("PUT", NULL, content_type, date, bucket, key);
1144
return_(signed_str);
1147
//-------------------------------
1148
CSVector *CSS3Protocol::s3_send(CSInputStream *input, const char *bucket, const char *key, off64_t size, const char *content_type, Md5Digest *digest, const char *s3Authorization, time_t s3AuthorizationTime)
1150
CSStringBuffer *s3_buffer;
1152
CSString *signed_str;
1153
uint32_t retry_count = 0;
1154
S3ProtocolCon *con_data;
1155
CSVector *replyHeaders;
1156
char checksum[32], *md5 = NULL;
1161
new_(s3_buffer, CSStringBuffer());
1164
new_(con_data, S3ProtocolCon());
1168
content_type = "binary/octet-stream";
1172
// Clear old settings.
1173
con_data->ms_reset();
1175
if (s3Authorization) {
1176
SET_DATE_FROM_TIME(s3AuthorizationTime, date);
1182
s3_buffer->setLength(0);
1183
s3_buffer->append("http://");
1184
s3_buffer->append(bucket);
1185
s3_buffer->append(".");
1186
s3_buffer->append(s3_server->getCString());
1187
s3_buffer->append(key);
1189
con_data->ms_setURL(s3_buffer->getCString());
1191
// Add the 'DATE' header
1192
s3_buffer->setLength(0);
1193
s3_buffer->append("Date: ");
1194
s3_buffer->append(date);
1195
con_data->ms_setHeader(s3_buffer->getCString());
1197
// Add the 'Content-Type' header
1198
s3_buffer->setLength(0);
1199
s3_buffer->append("Content-Type: ");
1200
s3_buffer->append(content_type);
1201
con_data->ms_setHeader(s3_buffer->getCString());
1204
// Add the Md5 checksum header
1206
memset(checksum, 0, 32);
1207
base64Encode(digest->val, 16, checksum, 32);
1209
s3_buffer->setLength(0);
1210
s3_buffer->append("Content-MD5: ");
1211
s3_buffer->append(checksum);
1212
con_data->ms_setHeader(s3_buffer->getCString());
1213
con_data->ms_calculate_md5 = false;
1215
con_data->ms_calculate_md5 = true;
1218
// Create the authentication signature and add the 'Authorization' header
1219
if (!s3Authorization)
1220
signed_str = s3_getSignature("PUT", md5, content_type, date, bucket, key);
1222
signed_str = CSString::newString(s3Authorization);
1224
s3_buffer->setLength(0);
1225
s3_buffer->append("Authorization: AWS ");
1226
s3_buffer->append(s3_public_key->getCString());
1227
s3_buffer->append(":");
1228
s3_buffer->append(signed_str->getCString());
1229
release_(signed_str); signed_str = NULL;
1230
con_data->ms_setHeader(s3_buffer->getCString());
1232
con_data->ms_execute_put_request(RETAIN(input), size);
1234
if (con_data->ms_retry) {
1235
if (retry_count == s3_maxRetries) {
1236
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
1238
//printf("RETRY: s3_send()\n");
1241
self->sleep(s3_sleepTime);
1245
replyHeaders = con_data->ms_reply_headers.takeHeaders();
1248
release_(s3_buffer);
1250
return_(replyHeaders);
1253
//-------------------------------
1254
CSString *CSS3Protocol::s3_getDataURL(const char *bucket, const char *key, uint32_t keep_alive)
1256
CSStringBuffer *s3_buffer;
1258
CSString *signed_str;
1261
new_(s3_buffer, CSStringBuffer());
1264
snprintf(timeout, 32, "%"PRId32"", ((uint32_t)time(NULL)) + keep_alive);
1266
signed_str = s3_getSignature("GET", NULL, NULL, timeout, bucket, key);
1267
//printf("Unsafe: \"%s\"\n", signed_str->getCString());
1268
signed_str = urlEncode(signed_str); // Because the signature is in the URL it must be URL encoded.
1269
//printf(" Safe: \"%s\"\n", signed_str->getCString());
1272
s3_buffer->setLength(0);
1273
s3_buffer->append("http://");
1274
s3_buffer->append(bucket);
1275
s3_buffer->append(".");
1276
s3_buffer->append(s3_server->getCString());
1277
s3_buffer->append(key);
1279
s3_buffer->append("?AWSAccessKeyId=");
1280
s3_buffer->append(s3_public_key->getCString());
1281
s3_buffer->append("&Expires=");
1282
s3_buffer->append(timeout);
1283
s3_buffer->append("&Signature=");
1284
s3_buffer->append(signed_str->getCString());
1286
release_(signed_str);
1289
CSString *str = CSString::newString(s3_buffer);
1293
//#define S3_UNIT_TEST
1295
static void show_help_info(const char *cmd)
1297
printf("Get authenticated query string:\n\t%s q <bucket> <object_key> <timeout>\n", cmd);
1298
printf("Delete object:\n\t%s d <bucket> <object_key>\n", cmd);
1299
printf("Delete all object with a given prefix:\n\t%s D <bucket> <object_prefix>\n", cmd);
1300
printf("Get object, data will be written to 'prottest.out':\n\t%s g <bucket> <object_key> <timeout>\n", cmd);
1301
printf("Get object header only:\n\t%s h <bucket> <object_key> <timeout>\n", cmd);
1302
printf("Put (Upload) an object:\n\t%s p <bucket> <object_key> <file>\n", cmd);
1303
printf("List objects in the bucket:\n\t%s l <bucket> [<object_prefix> [max_list_size]]\n", cmd);
1304
printf("Copy object:\n\t%s c <src_bucket> <src_object_key> <dst_bucket> <dst_object_key> \n", cmd);
1305
printf("Copy all object with a given prefix:\n\t%s C <src_bucket> <object_key_prefix> <dst_bucket> \n", cmd);
1308
void dump_headers(CSVector *header_array)
1310
CSHTTPHeaders headers;
1312
headers.setHeaders(header_array);
1313
printf("Reply Headers:\n");
1314
printf("--------------\n");
1316
for (uint32_t i = 0; i < headers.numHeaders(); i++) {
1317
CSHeader *h = headers.getHeader(i);
1319
printf("%s : %s\n", h->getNameCString(), h->getValueCString());
1322
printf("--------------\n");
1323
headers.clearHeaders();
1326
int main(int argc, char **argv)
1328
CSThread *main_thread;
1329
const char *pub_key;
1330
const char *priv_key;
1332
CSS3Protocol *prot = NULL;
1335
show_help_info(argv[0]);
1339
if (! CSThread::startUp()) {
1340
CSException::throwException(CS_CONTEXT, ENOMEM, "CSThread::startUp() failed.");
1346
main_thread = new CSThread( NULL);
1347
CSThread::setSelf(main_thread);
1352
pub_key = getenv("S3_ACCESS_KEY_ID");
1353
priv_key = getenv("S3_SECRET_ACCESS_KEY");
1354
new_(prot, CSS3Protocol());
1357
server = getenv("S3_SERVER");
1358
if ((server == NULL) || (*server == 0))
1359
server = "s3.amazonaws.com/";
1360
prot->s3_setServer(server);
1361
prot->s3_setPublicKey(pub_key);
1362
prot->s3_setPrivateKey(priv_key);
1363
prot->s3_setMaxRetries(0);
1365
switch (argv[1][0]) {
1366
case 'q': // Get the query string
1368
CSString *qstr = prot->s3_getDataURL(argv[2], argv[3], atoi(argv[4]));
1369
printf("To test call:\ncurl -L -D - \"%s\"\n", qstr->getCString());
1372
printf("Bad command: q <bucket> <object_key> <timeout>\n");
1375
case 'd': // Delete the object
1377
printf("delete %s %s\n", argv[2], argv[3]);
1378
if (!prot->s3_delete(argv[2], argv[3]))
1379
printf("%s/%s could not be found.\n", argv[2], argv[3]);
1382
printf("Bad command: d <bucket> <object_key>\n");
1385
case 'D': // Delete objects like
1390
list = prot->s3_list(argv[2], argv[3]);
1392
while (key = (CSString*) list->take(0)) {
1393
printf("Deleting %s\n", key->getCString());
1394
prot->s3_delete(argv[2], key->getCString());
1400
printf("Bad command: D <bucket> <object_key_prefix>\n");
1403
case 'g': // Get the object
1404
if ((argc == 4) || (argc == 6)) {
1408
S3RangeRec *range_ptr = NULL, range = {0,0};
1411
range.startByte = atoi(argv[4]);
1412
range.endByte = atoi(argv[5]);
1416
output = CSFile::newFile("prottest.out");
1418
output->open(CSFile::CREATE | CSFile::TRUNCATE);
1419
headers = prot->s3_receive(output->getOutputStream(), argv[2], argv[3], &found, range_ptr);
1421
printf("%s/%s could not be found.\n", argv[2], argv[3]);
1423
dump_headers(headers);
1427
printf("Bad command: g <bucket> <object_key>\n");
1431
case 'h': // Get the object header
1435
S3RangeRec range = {0,0};
1437
headers = prot->s3_receive(NULL, argv[2], argv[3], &found);
1439
printf("%s/%s could not be found.\n", argv[2], argv[3]);
1441
dump_headers(headers);
1444
printf("Bad command: h <bucket> <object_key>\n");
1448
case 'p': // Put (Upload) the object
1454
input = CSFile::newFile(argv[4]);
1456
input->open(CSFile::READONLY);
1457
input->md5Digest(&digest);
1458
headers = prot->s3_send(input->getInputStream(), argv[2], argv[3], input->myFilePath->getSize(), NULL, &digest);
1459
dump_headers(headers);
1462
printf("Bad command: p <bucket> <object_key> <file> \n");
1466
case 'c': // Copy the object
1468
prot->s3_copy(NULL, argv[4], argv[5], argv[2], argv[3]);
1470
printf("Bad command: c <src_bucket> <src_object_key> <dst_bucket> <dst_object_key>\n");
1474
case 'C': // Copy objects like
1479
list = prot->s3_list(argv[2], argv[3]);
1481
while (key = (CSString*) list->take(0)) {
1482
printf("Copying %s\n", key->getCString());
1483
prot->s3_copy(NULL, argv[4], key->getCString(), argv[2], key->getCString());
1489
printf("Bad command: C <src_bucket> <object_key_prefix> <dst_bucket>\n");
1492
case 'l': // List the object
1493
if ((argc == 3) || (argc == 4) || (argc == 5)) {
1495
char *prefix = NULL;
1501
if (!strlen(prefix))
1506
max = atol(argv[4]);
1508
list = prot->s3_list(argv[2], prefix, max);
1510
while (key = (CSString*) list->take(0)) {
1511
printf("%s\n", key->getCString());
1517
printf("Bad command: l <bucket> [<object_prefix> [max_list_size]] \n");
1521
printf("Unknown command.\n");
1522
show_help_info(argv[0]);
1529
self->logException();
1534
main_thread->release();
1536
CSThread::shutDown();