1
/* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
3
* PrimeBase Media Stream for MySQL
5
* This program is free software; you can redistribute it and/or modify
6
* it under the terms of the GNU General Public License as published by
7
* the Free Software Foundation; either version 2 of the License, or
8
* (at your option) any later version.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU General Public License for more details.
15
* You should have received a copy of the GNU General Public License
16
* along with this program; if not, write to the Free Software
17
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19
* Created by Barry Leslie on 10/02/09.
26
#include <curl/curl.h>
30
#include "CSStrUtil.h"
32
#include "CSS3Protocol.h"
36
//#define SHOW_SIGNING
37
// Uncomment this line to trace network action during request. Very Usefull!!
43
//#define SHOW_SIGNING
45
#define HEX_CHECKSUM_VALUE_SIZE (2 *CHECKSUM_VALUE_SIZE)
47
#define THROW_CURL_IF(v) { if (v) CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);}
49
//-------------------------------
50
static const char *retryCodes[] = {
59
//======================================
60
static size_t receive_data(void *ptr, size_t size, size_t nmemb, void *stream);
61
static size_t receive_header(void *ptr, size_t size, size_t nmemb, void *stream);
62
static size_t send_callback(void *ptr, size_t size, size_t nmemb, void *stream);
64
class S3ProtocolCon : CSXMLBuffer, public CSObject {
68
virtual bool openNode(char *path, char *value) {
69
if (value && *value && (strcmp(path,"/error/code/") == 0)) {
70
printf("S3 ERROR Code: %s\n", value);
71
for (int i = 0; retryCodes[i] && !ms_retry; i++)
72
ms_retry = (strcmp(value, retryCodes[i]) == 0);
74
if (ms_retry && !strcmp("slowdown", value))
76
} else if (value && *value && (strcmp(path,"/error/message/") == 0)) {
77
printf("S3 ERROR MESSAGE: %s\n", value);
82
virtual bool closeNode(char *path) {
87
virtual bool addAttribute(char *path, char *name, char *value) {
94
//-------------------------------
100
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Missing HTTP reply: possible S3 connection failure.");
103
printf("ms_errorReply:\n===========\n%s\n===========\n", ms_errorReply->getCString());
106
if (!parseData(ms_errorReply->getCString(), ms_errorReply->length(), 0)){
110
getError(&err, &msg);
111
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, msg);
119
CSHTTPHeaders ms_reply_headers;
120
CSStringBuffer ms_buffer; // A scratch buffer
123
struct curl_slist *ms_header_list; // A curl list of headers to be sent with the next request.
125
CSInputStream *ms_inputStream;
126
CSOutputStream *ms_outputStream;
129
char ms_s3Checksum[HEX_CHECKSUM_VALUE_SIZE +1];
130
bool ms_calculate_md5;
132
bool ms_notFound; // True if the object could not be found
133
bool ms_retry; // True if the request failed with a retry error.
136
CSStringBuffer *ms_errorReply;
137
char ms_curl_error[CURL_ERROR_SIZE];
139
off64_t ms_data_size;
141
unsigned int ms_replyStatus;
142
bool ms_throw_error; // Gets set if an exception occurs in a callback.
145
time_t ms_last_modified;
149
ms_header_list(NULL),
150
ms_inputStream(NULL),
151
ms_outputStream(NULL),
152
ms_calculate_md5(false),
159
ms_throw_error(false),
160
ms_old_libcurl(false),
165
ms_curl = curl_easy_init();
167
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_easy_init() failed.");
169
curl_version_info_data *curl_ver = curl_version_info(CURLVERSION_NOW);
171
// libCurl versions prior to 7.17.0 did not make copies of strings passed into curl_easy_setopt()
172
// If this version requirement is a problem I can do this myself, if I have to, I guess. :(
173
if (curl_ver->version_num < 0X071700 ) {
174
ms_old_libcurl = true;
177
//snprintf(msg, 200, "libcurl version %s is too old, require version 7.17.0 or newer.", curl_ver->version);
178
//CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, msg);
181
if (curl_easy_setopt(ms_curl, CURLOPT_ERRORBUFFER, ms_curl_error))
182
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_easy_setopt(CURLOPT_ERRORBUFFER) failed.");
185
curl_easy_setopt(ms_curl, CURLOPT_VERBOSE, 1L);
187
//THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_TCP_NODELAY, 1L));
190
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_NOPROGRESS, 1L));
191
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_WRITEFUNCTION, receive_data));
192
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_READFUNCTION, send_callback));
193
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HEADERFUNCTION, receive_header));
194
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_WRITEDATA, this));
195
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_READDATA, this));
196
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_WRITEHEADER, this));
199
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_FOLLOWLOCATION, 1L)); // Follow redirects.
206
curl_easy_cleanup(ms_curl);
208
curl_slist_free_all(ms_header_list);
210
ms_inputStream->release();
212
ms_outputStream->release();
214
ms_errorReply->release();
216
ms_reply_headers.clearHeaders();
219
cs_free(ms_safe_url);
222
inline void check_reply_status()
224
if (ms_replyStatus > 199 && ms_replyStatus < 300)
229
switch (ms_replyStatus) {
231
case 204: // No Content
232
//case 301: // Moved Permanently
233
//case 307: // Temporary Redirect
235
case 404: // Not Found
236
case 403: // Forbidden (S3 object not found)
249
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_errorReply->getCString());
251
} else if (ms_slowDown) {
253
CSException::logException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 slow down request.");
254
self->sleep(10); // sleep for 1/100 second.
263
inline void ms_reset()
265
// Remove any old headers
266
if (ms_header_list) {
267
curl_slist_free_all(ms_header_list);
268
ms_header_list = NULL;
271
ms_reply_headers.clearHeaders();
273
ms_throw_error = false;
275
ms_errorReply->setLength(0);
277
ms_s3Checksum[0] = 0;
281
if (ms_outputStream) {
282
ms_outputStream->release();
283
ms_outputStream = NULL;
285
if (ms_inputStream) {
286
ms_inputStream->release();
287
ms_inputStream = NULL;
291
cs_free(ms_safe_url);
296
inline void ms_setHeader(const char *header)
298
ms_header_list = curl_slist_append(ms_header_list, header);
300
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
305
inline const char *safe_url(const char *url)
307
if (ms_old_libcurl == false)
311
cs_free(ms_safe_url);
314
ms_safe_url = cs_strdup(url);
319
inline void ms_setURL(const char *url)
321
//printf("URL: \"%s\n", url);
322
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_URL, safe_url(url)));
325
inline void ms_execute_delete_request()
329
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, ms_header_list));
330
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_CUSTOMREQUEST, "DELETE"));
332
rtc = curl_easy_perform(ms_curl);
334
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_CUSTOMREQUEST, NULL)); // IMPORTANT: Reset this to it's default value
336
if (rtc && !ms_throw_error)
337
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
339
if (ms_throw_error) {
345
check_reply_status();
348
inline void ms_execute_copy_request()
352
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, ms_header_list));
353
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_INFILESIZE_LARGE, 0));
354
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_UPLOAD, 1L));
356
rtc = curl_easy_perform(ms_curl);
358
if (rtc && !ms_throw_error)
359
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
361
if (ms_throw_error) {
367
check_reply_status();
370
inline void ms_execute_get_request(CSOutputStream *output)
376
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPGET, 1L));
378
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_NOBODY, 1L));
382
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_FILETIME, 1L));
383
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, ms_header_list));
384
// Ask curl to parse the Last-Modified header. This is easier than
385
// parsing it ourselves.
387
ms_outputStream = output;
388
if (curl_easy_perform(ms_curl) && !ms_throw_error) {
389
ms_outputStream = NULL;
390
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
392
ms_outputStream = NULL;
400
check_reply_status();
401
curl_easy_getinfo(ms_curl, CURLINFO_FILETIME, &ms_last_modified);
405
inline void ms_execute_put_request(CSInputStream *input, off64_t size)
410
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, ms_header_list));
411
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_INFILESIZE_LARGE, size));
412
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_UPLOAD, 1L));
413
//THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_POSTFIELDSIZE_LARGE, size));
414
//THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_POST, 1L));
419
ms_inputStream = input;
420
if (curl_easy_perform(ms_curl) && !ms_throw_error) {
421
ms_inputStream = NULL;
422
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
424
ms_inputStream = NULL;
431
check_reply_status();
433
if (ms_calculate_md5) {
434
// If the data was not sent with an md5 checksum then verify
435
// the server's md5 value with the one calculated during the send.
436
char checksum[HEX_CHECKSUM_VALUE_SIZE +1];
439
ms_md5.md5_get_digest(&digest);
440
cs_bin_to_hex(HEX_CHECKSUM_VALUE_SIZE, checksum, CHECKSUM_VALUE_SIZE, digest.val);
441
checksum[HEX_CHECKSUM_VALUE_SIZE] = 0;
443
cs_strToUpper(ms_s3Checksum);
444
if (strcmp(checksum, ms_s3Checksum)) {
445
// The request should be restarted in this case.
447
CSException::logException(CS_CONTEXT, CS_ERR_CHECKSUM_ERROR, "Calculated checksum did not match S3 checksum");
456
//======================================
461
//======================================
462
CSS3Protocol::CSS3Protocol():
465
s3_private_key(NULL),
469
new_(s3_server, CSStringBuffer());
470
s3_server->append("s3.amazonaws.com/");
472
s3_public_key = CSString::newString("");
473
s3_private_key = CSString::newString("");
478
CSS3Protocol::~CSS3Protocol()
481
s3_server->release();
484
s3_public_key->release();
487
s3_private_key->release();
491
CSString *CSS3Protocol::s3_getSignature(const char *verb,
493
const char *content_type,
500
CSStringBuffer *s3_buffer;
505
new_(s3_buffer, CSStringBuffer());
508
s3_buffer->setLength(0);
509
s3_buffer->append(verb);
510
s3_buffer->append("\n");
511
if (md5) s3_buffer->append(md5);
512
s3_buffer->append("\n");
513
if (content_type) s3_buffer->append(content_type);
514
s3_buffer->append("\n");
515
s3_buffer->append(date);
517
// Note: headers are assumed to be in lower case, sorted, and containing no white space.
518
s3_buffer->append("\n");
519
s3_buffer->append(headers->getCString());
521
s3_buffer->append("\n/");
522
s3_buffer->append(bucket);
523
s3_buffer->append("/");
524
s3_buffer->append(key);
527
printf("signing:\n=================\n%s\n=================\n", s3_buffer->getCString());
529
const char *ptr = s3_buffer->getCString();
531
printf("%x ", *ptr); ptr++;
537
CSString *sig = signature(s3_buffer->getCString(), s3_private_key->getCString());
544
//----------------------
545
// CURL callback functions:
546
////////////////////////////
547
//----------------------
549
static bool try_ReadStream(CSThread *self, S3ProtocolCon *con, unsigned char *ptr, size_t buffer_size, size_t *data_sent)
551
volatile bool rtc = true;
553
*data_sent = con->ms_inputStream->read((char*)ptr, buffer_size);
554
if (*data_sent <= con->ms_data_size) {
555
con->ms_data_size -= *data_sent;
557
con->ms_md5.md5_append(ptr, *data_sent); // Calculating the checksum for the data sent.
558
} else if (*data_sent > con->ms_data_size)
559
CSException::RecordException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Blob larger than expected.");
560
else if (con->ms_data_size && !*data_sent)
561
CSException::RecordException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Blob smaller than expected.");
570
//----------------------
571
static size_t send_callback(void *ptr, size_t objs, size_t obj_size, void *v_con)
573
S3ProtocolCon *con = (S3ProtocolCon*) v_con;
574
size_t data_sent, buffer_size = objs * obj_size;
576
if (!con->ms_data_size)
580
if (try_ReadStream(self, con, (unsigned char*)ptr, buffer_size, &data_sent)) {
581
con->ms_throw_error = true;
582
data_sent = (size_t)-1;
589
static bool try_WriteStream(CSThread *self, S3ProtocolCon *con, char *ptr, size_t data_len)
591
volatile bool rtc = true;
593
if (con->ms_replyStatus >= 400) { // Collect the error reply.
594
if (!con->ms_errorReply)
595
con->ms_errorReply = new CSStringBuffer(50);
596
con->ms_errorReply->append(ptr, data_len);
597
} else if ( con->ms_outputStream)
598
con->ms_outputStream->write(ptr, data_len);
607
//----------------------
608
static size_t receive_data(void *vptr, size_t objs, size_t obj_size, void *v_con)
610
S3ProtocolCon *con = (S3ProtocolCon*) v_con;
611
size_t data_len = objs * obj_size;
614
if (try_WriteStream(self, con, (char*)vptr, data_len)) {
615
con->ms_throw_error = true;
616
data_len = (size_t)-1;
622
#define IS_REDIRECT(s) ((s >= 300) && (s < 400))
623
//----------------------
624
static bool try_addHeader(CSThread *self, S3ProtocolCon *con, char *name, uint32_t name_len, char *value, uint32_t value_len)
627
con->ms_reply_headers.addHeader(name, name_len, value, value_len);
636
//----------------------
637
static size_t receive_header(void *header, size_t objs, size_t obj_size, void *v_con)
639
S3ProtocolCon *con = (S3ProtocolCon*) v_con;
640
size_t size = objs * obj_size;
641
char *end, *ptr = (char*) header, *name, *value = NULL;
642
uint32_t name_len =0, value_len = 0;
644
//printf( "receive_header: %s\n", ptr);
646
if (*(end -2) == '\r' && *(end -1) == '\n')
649
while ((end != ptr) && (*ptr == ' ')) ptr++;
653
// Get the reply status.
654
// Status 100 = Continue
655
if (((!con->ms_replyStatus) || (con->ms_replyStatus == 100) || IS_REDIRECT(con->ms_replyStatus) )
656
&& !strncasecmp(ptr, "HTTP", 4)
659
while ((end != ptr) && (*ptr != ' ')) ptr++; // skip HTTP stuff
660
while ((end != ptr) && (*ptr == ' ')) ptr++; // find the start of eh status code.
664
if (end < (ptr +3)) // expecting a 3 digit status code.
667
memcpy(status, ptr, 3);
670
con->ms_replyStatus = atoi(status);
674
while ((end != ptr) && (*ptr != ':')) ptr++;
677
name_len = ptr - name;
680
while ((end != ptr) && (*ptr == ' ')) ptr++;
685
value_len = end - value;
687
while (name[name_len-1] == ' ') name_len--;
688
while (value[value_len-1] == ' ') value_len--;
690
if (!strncasecmp(name, "ETag", 4)) {
692
value++; value_len -=2; // Strip quotation marks from checksum string.
694
if (value_len == HEX_CHECKSUM_VALUE_SIZE) {
695
memcpy(con->ms_s3Checksum, value, value_len);
696
con->ms_s3Checksum[value_len] = 0;
701
if (try_addHeader(self, con, name, name_len, value, value_len)) {
702
con->ms_throw_error = true;
708
//----------------------
710
#define SET_DATE_FROM_TIME(t, d) {strftime(d, sizeof(d), "%a, %d %b %Y %H:%M:%S GMT", gmtime(&t));}
711
#define SET_DATE(d) {time_t t = time(NULL); SET_DATE_FROM_TIME(t, d);}
713
bool CSS3Protocol::s3_delete(const char *bucket, const char *key)
715
CSStringBuffer *s3_buffer;
717
CSString *signed_str;
718
uint32_t retry_count = 0;
719
S3ProtocolCon *con_data;
723
new_(s3_buffer, CSStringBuffer());
726
new_(con_data, S3ProtocolCon());
730
// Clear old settings.
731
con_data->ms_reset();
736
s3_buffer->setLength(0);
737
s3_buffer->append("http://");
738
s3_buffer->append(bucket);
739
s3_buffer->append(".");
740
s3_buffer->append(s3_server->getCString());
741
s3_buffer->append(key);
743
con_data->ms_setURL(s3_buffer->getCString());
745
// Add the 'DATE' header
746
s3_buffer->setLength(0);
747
s3_buffer->append("Date: ");
748
s3_buffer->append(date);
749
con_data->ms_setHeader(s3_buffer->getCString());
751
// Create the authentication signature and add the 'Authorization' header
752
signed_str = s3_getSignature("DELETE", NULL, NULL, date, bucket, key);
754
s3_buffer->setLength(0);
755
s3_buffer->append("Authorization: AWS ");
756
s3_buffer->append(s3_public_key->getCString());
757
s3_buffer->append(":");
758
s3_buffer->append(signed_str->getCString());
759
release_(signed_str); signed_str = NULL;
761
con_data->ms_setHeader(s3_buffer->getCString());
763
con_data->ms_execute_delete_request();
765
if (con_data->ms_retry) {
766
if (retry_count == s3_maxRetries) {
767
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
769
//printf("RETRY: s3_delete()\n");
771
self->sleep(s3_sleepTime);
775
bool notFound = con_data->ms_notFound;
782
//-------------------------------
783
void CSS3Protocol::s3_copy(const char *dest_server, const char *dest_bucket, const char *dest_key, const char *src_bucket, const char *src_key)
785
CSStringBuffer *s3_buffer;
787
CSString *signed_str;
788
uint32_t retry_count = 0;
789
S3ProtocolCon *con_data;
793
new_(s3_buffer, CSStringBuffer());
796
new_(con_data, S3ProtocolCon());
800
dest_server = s3_server->getCString();
803
// Clear old settings.
804
con_data->ms_reset();
809
s3_buffer->setLength(0);
810
s3_buffer->append("http://");
811
s3_buffer->append(dest_bucket);
812
s3_buffer->append(".");
813
s3_buffer->append(s3_server->getCString());
814
s3_buffer->append(dest_key);
816
con_data->ms_setURL(s3_buffer->getCString());
818
// Add the destination location
819
s3_buffer->setLength(0);
820
s3_buffer->append("Host: ");
821
s3_buffer->append(dest_bucket);
822
s3_buffer->append(".");
823
s3_buffer->append(dest_server);
824
s3_buffer->setLength(s3_buffer->length() -1); // trim the '/'
825
con_data->ms_setHeader(s3_buffer->getCString());
827
// Add the source location
828
s3_buffer->setLength(0);
829
s3_buffer->append("x-amz-copy-source:");
830
s3_buffer->append(src_bucket);
831
s3_buffer->append("/");
832
s3_buffer->append(src_key);
833
con_data->ms_setHeader(s3_buffer->getCString());
835
// Create the authentication signature and add the 'Authorization' header
836
signed_str = s3_getSignature("PUT", NULL, NULL, date, dest_bucket, dest_key, CSString::newString(s3_buffer->getCString()));
839
// Add the 'DATE' header
840
s3_buffer->setLength(0);
841
s3_buffer->append("Date: ");
842
s3_buffer->append(date);
843
con_data->ms_setHeader(s3_buffer->getCString());
846
s3_buffer->setLength(0);
847
s3_buffer->append("Authorization: AWS ");
848
s3_buffer->append(s3_public_key->getCString());
849
s3_buffer->append(":");
850
s3_buffer->append(signed_str->getCString());
851
release_(signed_str); signed_str = NULL;
852
con_data->ms_setHeader(s3_buffer->getCString());
854
con_data->ms_execute_copy_request();
856
if (con_data->ms_notFound) {
857
s3_buffer->setLength(0);
858
s3_buffer->append("Cloud copy failed, object not found: ");
859
s3_buffer->append(src_bucket);
860
s3_buffer->append(" ");
861
s3_buffer->append(src_key);
862
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, s3_buffer->getCString());
865
if (con_data->ms_retry) {
866
if (retry_count == s3_maxRetries) {
867
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
869
//printf("RETRY: s3_copy()\n");
871
self->sleep(s3_sleepTime);
882
//-------------------------------
883
CSVector *CSS3Protocol::s3_receive(CSOutputStream *output, const char *bucket, const char *key, bool *found, S3RangePtr range, time_t *last_modified)
885
CSStringBuffer *s3_buffer;
887
CSString *signed_str;
888
uint32_t retry_count = 0;
889
S3ProtocolCon *con_data;
890
CSVector *replyHeaders;
891
CSString *range_header = NULL;
902
new_(s3_buffer, CSStringBuffer());
905
new_(con_data, S3ProtocolCon());
909
// Clear old settings.
910
con_data->ms_reset();
915
s3_buffer->setLength(0);
916
s3_buffer->append("http://");
917
s3_buffer->append(bucket);
918
s3_buffer->append(".");
919
s3_buffer->append(s3_server->getCString());
920
s3_buffer->append(key);
922
con_data->ms_setURL(s3_buffer->getCString());
924
// Add the 'DATE' header
925
s3_buffer->setLength(0);
926
s3_buffer->append("Date: ");
927
s3_buffer->append(date);
928
con_data->ms_setHeader(s3_buffer->getCString());
932
snprintf(buffer, 80,"Range: bytes=%"PRIu64"-%"PRIu64, range->startByte, range->endByte);
934
range_header = CSString::newString(buffer);
936
// Create the authentication signature and add the 'Authorization' header
938
con_data->ms_setHeader(range_header->getCString());
939
signed_str = s3_getSignature(http_op, NULL, NULL, date, bucket, key, NULL);
941
s3_buffer->setLength(0);
942
s3_buffer->append("Authorization: AWS ");
943
s3_buffer->append(s3_public_key->getCString());
944
s3_buffer->append(":");
945
s3_buffer->append(signed_str->getCString());
946
release_(signed_str); signed_str = NULL;
947
con_data->ms_setHeader(s3_buffer->getCString());
949
if (output) output->retain();
950
con_data->ms_execute_get_request(output);
952
if (con_data->ms_retry) {
953
if (retry_count == s3_maxRetries) {
954
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
956
//printf("RETRY: s3_receive()\n");
959
self->sleep(s3_sleepTime);
964
*last_modified = con_data->ms_last_modified;
965
*found = !con_data->ms_notFound;
966
replyHeaders = con_data->ms_reply_headers.takeHeaders();
972
return_(replyHeaders);
975
class S3ListParser : public CSXMLBuffer {
981
bool parseListData(const char *data, size_t len, CSVector *keys)
984
return parseData(data, len, 0);
988
virtual bool openNode(char *path, char *value) {
989
if (value && *value && (strcmp(path,"/listbucketresult/contents/key/") == 0))
990
list->add(CSString::newString(value));
994
virtual bool closeNode(char *path) {
999
virtual bool addAttribute(char *path, char *name, char *value) {
1008
//-------------------------------
1009
static CSVector *parse_s3_list(CSMemoryOutputStream *output)
1011
S3ListParser s3ListParser;
1020
new_(vector, CSVector(10));
1023
data = (const char *) output->getMemory(&len);
1024
if (!s3ListParser.parseListData(data, len, vector)) {
1028
s3ListParser.getError(&err, &msg);
1029
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, msg);
1038
//-------------------------------
1039
CSVector *CSS3Protocol::s3_list(const char *bucket, const char *key_prefix, uint32_t max)
1041
CSStringBuffer *s3_buffer;
1043
CSString *signed_str;
1044
CSMemoryOutputStream *output;
1045
uint32_t retry_count = 0;
1046
S3ProtocolCon *con_data;
1049
new_(s3_buffer, CSStringBuffer());
1052
output = CSMemoryOutputStream::newStream(1024, 1024);
1055
new_(con_data, S3ProtocolCon());
1060
// Clear old settings.
1061
con_data->ms_reset();
1066
s3_buffer->setLength(0);
1067
s3_buffer->append("http://");
1068
s3_buffer->append(bucket);
1069
s3_buffer->append(".");
1070
s3_buffer->append(s3_server->getCString());
1071
//s3_buffer->append("/");
1072
//s3_buffer->append(bucket);
1074
s3_buffer->append("?prefix=");
1075
s3_buffer->append(key_prefix);
1080
s3_buffer->append("&max-keys=");
1082
s3_buffer->append("?max-keys=");
1083
s3_buffer->append(max);
1086
con_data->ms_setURL(s3_buffer->getCString());
1088
// Add the 'DATE' header
1089
s3_buffer->setLength(0);
1090
s3_buffer->append("Date: ");
1091
s3_buffer->append(date);
1092
con_data->ms_setHeader(s3_buffer->getCString());
1094
// Create the authentication signature and add the 'Authorization' header
1095
signed_str = s3_getSignature("GET", NULL, NULL, date, bucket, "");
1097
s3_buffer->setLength(0);
1098
s3_buffer->append("Authorization: AWS ");
1099
s3_buffer->append(s3_public_key->getCString());
1100
s3_buffer->append(":");
1101
s3_buffer->append(signed_str->getCString());
1102
release_(signed_str); signed_str = NULL;
1103
con_data->ms_setHeader(s3_buffer->getCString());
1105
con_data->ms_execute_get_request(RETAIN(output));
1107
if (con_data->ms_retry) {
1108
if (retry_count == s3_maxRetries) {
1109
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
1111
//printf("RETRY: s3_list()\n");
1114
self->sleep(s3_sleepTime);
1120
release_(s3_buffer);
1121
return_(parse_s3_list(output));
1124
//-------------------------------
1125
CSString *CSS3Protocol::s3_getAuthorization(const char *bucket, const char *key, const char *content_type, uint32_t *s3AuthorizationTime)
1128
CSString *signed_str;
1134
content_type = "binary/octet-stream";
1136
sys_time = time(NULL);
1138
*s3AuthorizationTime = (uint32_t)sys_time;
1140
SET_DATE_FROM_TIME(sys_time, date);
1141
signed_str = s3_getSignature("PUT", NULL, content_type, date, bucket, key);
1142
return_(signed_str);
1145
//-------------------------------
1146
CSVector *CSS3Protocol::s3_send(CSInputStream *input, const char *bucket, const char *key, off64_t size, const char *content_type, Md5Digest *digest, const char *s3Authorization, time_t s3AuthorizationTime)
1148
CSStringBuffer *s3_buffer;
1150
CSString *signed_str;
1151
uint32_t retry_count = 0;
1152
S3ProtocolCon *con_data;
1153
CSVector *replyHeaders;
1154
char checksum[32], *md5 = NULL;
1159
new_(s3_buffer, CSStringBuffer());
1162
new_(con_data, S3ProtocolCon());
1166
content_type = "binary/octet-stream";
1170
// Clear old settings.
1171
con_data->ms_reset();
1173
if (s3Authorization) {
1174
SET_DATE_FROM_TIME(s3AuthorizationTime, date);
1180
s3_buffer->setLength(0);
1181
s3_buffer->append("http://");
1182
s3_buffer->append(bucket);
1183
s3_buffer->append(".");
1184
s3_buffer->append(s3_server->getCString());
1185
s3_buffer->append(key);
1187
con_data->ms_setURL(s3_buffer->getCString());
1189
// Add the 'DATE' header
1190
s3_buffer->setLength(0);
1191
s3_buffer->append("Date: ");
1192
s3_buffer->append(date);
1193
con_data->ms_setHeader(s3_buffer->getCString());
1195
// Add the 'Content-Type' header
1196
s3_buffer->setLength(0);
1197
s3_buffer->append("Content-Type: ");
1198
s3_buffer->append(content_type);
1199
con_data->ms_setHeader(s3_buffer->getCString());
1202
// Add the Md5 checksum header
1204
memset(checksum, 0, 32);
1205
base64Encode(digest->val, 16, checksum, 32);
1207
s3_buffer->setLength(0);
1208
s3_buffer->append("Content-MD5: ");
1209
s3_buffer->append(checksum);
1210
con_data->ms_setHeader(s3_buffer->getCString());
1211
con_data->ms_calculate_md5 = false;
1213
con_data->ms_calculate_md5 = true;
1216
// Create the authentication signature and add the 'Authorization' header
1217
if (!s3Authorization)
1218
signed_str = s3_getSignature("PUT", md5, content_type, date, bucket, key);
1220
signed_str = CSString::newString(s3Authorization);
1222
s3_buffer->setLength(0);
1223
s3_buffer->append("Authorization: AWS ");
1224
s3_buffer->append(s3_public_key->getCString());
1225
s3_buffer->append(":");
1226
s3_buffer->append(signed_str->getCString());
1227
release_(signed_str); signed_str = NULL;
1228
con_data->ms_setHeader(s3_buffer->getCString());
1230
con_data->ms_execute_put_request(RETAIN(input), size);
1232
if (con_data->ms_retry) {
1233
if (retry_count == s3_maxRetries) {
1234
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
1236
//printf("RETRY: s3_send()\n");
1239
self->sleep(s3_sleepTime);
1243
replyHeaders = con_data->ms_reply_headers.takeHeaders();
1246
release_(s3_buffer);
1248
return_(replyHeaders);
1251
//-------------------------------
1252
CSString *CSS3Protocol::s3_getDataURL(const char *bucket, const char *key, uint32_t keep_alive)
1254
CSStringBuffer *s3_buffer;
1256
CSString *signed_str;
1259
new_(s3_buffer, CSStringBuffer());
1262
snprintf(timeout, 32, "%"PRId32"", ((uint32_t)time(NULL)) + keep_alive);
1264
signed_str = s3_getSignature("GET", NULL, NULL, timeout, bucket, key);
1265
//printf("Unsafe: \"%s\"\n", signed_str->getCString());
1266
signed_str = urlEncode(signed_str); // Because the signature is in the URL it must be URL encoded.
1267
//printf(" Safe: \"%s\"\n", signed_str->getCString());
1270
s3_buffer->setLength(0);
1271
s3_buffer->append("http://");
1272
s3_buffer->append(bucket);
1273
s3_buffer->append(".");
1274
s3_buffer->append(s3_server->getCString());
1275
s3_buffer->append(key);
1277
s3_buffer->append("?AWSAccessKeyId=");
1278
s3_buffer->append(s3_public_key->getCString());
1279
s3_buffer->append("&Expires=");
1280
s3_buffer->append(timeout);
1281
s3_buffer->append("&Signature=");
1282
s3_buffer->append(signed_str->getCString());
1284
release_(signed_str);
1287
CSString *str = CSString::newString(s3_buffer);
1291
//#define S3_UNIT_TEST
1293
static void show_help_info(const char *cmd)
1295
printf("Get authenticated query string:\n\t%s q <bucket> <object_key> <timeout>\n", cmd);
1296
printf("Delete object:\n\t%s d <bucket> <object_key>\n", cmd);
1297
printf("Delete all object with a given prefix:\n\t%s D <bucket> <object_prefix>\n", cmd);
1298
printf("Get object, data will be written to 'prottest.out':\n\t%s g <bucket> <object_key> <timeout>\n", cmd);
1299
printf("Get object header only:\n\t%s h <bucket> <object_key> <timeout>\n", cmd);
1300
printf("Put (Upload) an object:\n\t%s p <bucket> <object_key> <file>\n", cmd);
1301
printf("List objects in the bucket:\n\t%s l <bucket> [<object_prefix> [max_list_size]]\n", cmd);
1302
printf("Copy object:\n\t%s c <src_bucket> <src_object_key> <dst_bucket> <dst_object_key> \n", cmd);
1303
printf("Copy all object with a given prefix:\n\t%s C <src_bucket> <object_key_prefix> <dst_bucket> \n", cmd);
1306
void dump_headers(CSVector *header_array)
1308
CSHTTPHeaders headers;
1310
headers.setHeaders(header_array);
1311
printf("Reply Headers:\n");
1312
printf("--------------\n");
1314
for (uint32_t i = 0; i < headers.numHeaders(); i++) {
1315
CSHeader *h = headers.getHeader(i);
1317
printf("%s : %s\n", h->getNameCString(), h->getValueCString());
1320
printf("--------------\n");
1321
headers.clearHeaders();
1324
int main(int argc, char **argv)
1326
CSThread *main_thread;
1327
const char *pub_key;
1328
const char *priv_key;
1330
CSS3Protocol *prot = NULL;
1333
show_help_info(argv[0]);
1337
if (! CSThread::startUp()) {
1338
CSException::throwException(CS_CONTEXT, ENOMEM, "CSThread::startUp() failed.");
1344
main_thread = new CSThread( NULL);
1345
CSThread::setSelf(main_thread);
1350
pub_key = getenv("S3_ACCESS_KEY_ID");
1351
priv_key = getenv("S3_SECRET_ACCESS_KEY");
1352
new_(prot, CSS3Protocol());
1355
server = getenv("S3_SERVER");
1356
if ((server == NULL) || (*server == 0))
1357
server = "s3.amazonaws.com/";
1358
prot->s3_setServer(server);
1359
prot->s3_setPublicKey(pub_key);
1360
prot->s3_setPrivateKey(priv_key);
1362
switch (argv[1][0]) {
1363
case 'q': // Get the query string
1365
CSString *qstr = prot->s3_getDataURL(argv[2], argv[3], atoi(argv[4]));
1366
printf("To test call:\ncurl -L -D - \"%s\"\n", qstr->getCString());
1369
printf("Bad command: q <bucket> <object_key> <timeout>\n");
1372
case 'd': // Delete the object
1374
printf("delete %s %s\n", argv[2], argv[3]);
1375
if (!prot->s3_delete(argv[2], argv[3]))
1376
printf("%s/%s could not be found.\n", argv[2], argv[3]);
1379
printf("Bad command: d <bucket> <object_key>\n");
1382
case 'D': // Delete objects like
1387
list = prot->s3_list(argv[2], argv[3]);
1389
while (key = (CSString*) list->take(0)) {
1390
printf("Deleting %s\n", key->getCString());
1391
prot->s3_delete(argv[2], key->getCString());
1397
printf("Bad command: D <bucket> <object_key_prefix>\n");
1400
case 'g': // Get the object
1401
if ((argc == 4) || (argc == 6)) {
1405
S3RangeRec *range_ptr = NULL, range = {0,0};
1408
range.startByte = atoi(argv[4]);
1409
range.endByte = atoi(argv[5]);
1413
output = CSFile::newFile("prottest.out");
1415
output->open(CSFile::CREATE | CSFile::TRUNCATE);
1416
headers = prot->s3_receive(output->getOutputStream(), argv[2], argv[3], &found, range_ptr);
1418
printf("%s/%s could not be found.\n", argv[2], argv[3]);
1420
dump_headers(headers);
1424
printf("Bad command: g <bucket> <object_key>\n");
1428
case 'h': // Get the object header
1432
S3RangeRec range = {0,0};
1434
headers = prot->s3_receive(NULL, argv[2], argv[3], &found);
1436
printf("%s/%s could not be found.\n", argv[2], argv[3]);
1438
dump_headers(headers);
1441
printf("Bad command: h <bucket> <object_key>\n");
1445
case 'p': // Put (Upload) the object
1451
input = CSFile::newFile(argv[4]);
1453
input->open(CSFile::READONLY);
1454
input->md5Digest(&digest);
1455
headers = prot->s3_send(input->getInputStream(), argv[2], argv[3], input->myFilePath->getSize(), NULL, &digest);
1456
dump_headers(headers);
1459
printf("Bad command: p <bucket> <object_key> <file> \n");
1463
case 'c': // Copy the object
1465
prot->s3_copy(NULL, argv[4], argv[5], argv[2], argv[3]);
1467
printf("Bad command: c <src_bucket> <src_object_key> <dst_bucket> <dst_object_key>\n");
1471
case 'C': // Copy objects like
1476
list = prot->s3_list(argv[2], argv[3]);
1478
while (key = (CSString*) list->take(0)) {
1479
printf("Copying %s\n", key->getCString());
1480
prot->s3_copy(NULL, argv[4], key->getCString(), argv[2], key->getCString());
1486
printf("Bad command: C <src_bucket> <object_key_prefix> <dst_bucket>\n");
1489
case 'l': // List the object
1490
if ((argc == 3) || (argc == 4) || (argc == 5)) {
1492
char *prefix = NULL;
1498
if (!strlen(prefix))
1503
max = atol(argv[4]);
1505
list = prot->s3_list(argv[2], prefix, max);
1507
while (key = (CSString*) list->take(0)) {
1508
printf("%s\n", key->getCString());
1514
printf("Bad command: l <bucket> [<object_prefix> [max_list_size]] \n");
1518
printf("Unknown command.\n");
1519
show_help_info(argv[0]);
1526
self->logException();
1531
main_thread->release();
1533
CSThread::shutDown();