376
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPGET, 1L));
378
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_NOBODY, 1L));
382
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_FILETIME, 1L));
371
383
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, ms_header_list));
372
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPGET, 1L));
384
// Ask curl to parse the Last-Modified header. This is easier than
385
// parsing it ourselves.
374
387
ms_outputStream = output;
375
388
if (curl_easy_perform(ms_curl) && !ms_throw_error) {
377
390
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
379
392
ms_outputStream = NULL;
382
397
if (ms_throw_error)
385
400
check_reply_status();
401
curl_easy_getinfo(ms_curl, CURLINFO_FILETIME, &ms_last_modified);
389
inline void ms_execute_put_request(CSInputStream *input, off64_t size, Md5Digest *digest)
405
inline void ms_execute_put_request(CSInputStream *input, off64_t size)
415
431
check_reply_status();
417
// Check that the checksums agree
418
char checksum[HEX_CHECKSUM_VALUE_SIZE +1];
420
ms_md5.md5_digest(digest);
421
cs_bin_to_hex(HEX_CHECKSUM_VALUE_SIZE, checksum, CHECKSUM_VALUE_SIZE, digest->val);
422
checksum[HEX_CHECKSUM_VALUE_SIZE] = 0;
424
cs_strToUpper(ms_s3Checksum);
425
if (strcmp(checksum, ms_s3Checksum)) {
426
// The request should be restarted in this case.
428
CSException::logException(CS_CONTEXT, CS_ERR_CHECKSUM_ERROR, "Calculated checksum did not match S3 checksum");
433
if (ms_calculate_md5) {
434
// If the data was not sent with an md5 checksum then verify
435
// the server's md5 value with the one calculated during the send.
436
char checksum[HEX_CHECKSUM_VALUE_SIZE +1];
439
ms_md5.md5_get_digest(&digest);
440
cs_bin_to_hex(HEX_CHECKSUM_VALUE_SIZE, checksum, CHECKSUM_VALUE_SIZE, digest.val);
441
checksum[HEX_CHECKSUM_VALUE_SIZE] = 0;
443
cs_strToUpper(ms_s3Checksum);
444
if (strcmp(checksum, ms_s3Checksum)) {
445
// The request should be restarted in this case.
447
CSException::logException(CS_CONTEXT, CS_ERR_CHECKSUM_ERROR, "Calculated checksum did not match S3 checksum");
524
545
// CURL callback functions:
525
546
////////////////////////////
526
547
//----------------------
549
static bool try_ReadStream(CSThread *self, S3ProtocolCon *con, unsigned char *ptr, size_t buffer_size, size_t *data_sent)
551
volatile bool rtc = true;
553
*data_sent = con->ms_inputStream->read((char*)ptr, buffer_size);
554
if (*data_sent <= con->ms_data_size) {
555
con->ms_data_size -= *data_sent;
557
con->ms_md5.md5_append(ptr, *data_sent); // Calculating the checksum for the data sent.
558
} else if (*data_sent > con->ms_data_size)
559
CSException::RecordException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Blob larger than expected.");
560
else if (con->ms_data_size && !*data_sent)
561
CSException::RecordException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Blob smaller than expected.");
570
//----------------------
527
571
static size_t send_callback(void *ptr, size_t objs, size_t obj_size, void *v_con)
529
573
S3ProtocolCon *con = (S3ProtocolCon*) v_con;
537
data_sent = con->ms_inputStream->read((char*)ptr, buffer_size);
538
if (data_sent <= con->ms_data_size) {
539
con->ms_data_size -= data_sent;
541
con->ms_md5.md5_append((u_char*)ptr, data_sent); // Calculating the checksum for the data sent.
542
} else if (data_sent > con->ms_data_size)
543
CSException::RecordException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Blob larger than expected.");
544
else if (con->ms_data_size && !data_sent)
545
CSException::RecordException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Blob smaller than expected.");
580
if (try_ReadStream(self, con, (unsigned char*)ptr, buffer_size, &data_sent)) {
581
con->ms_throw_error = true;
582
data_sent = (size_t)-1;
548
con->ms_throw_error = true;
549
data_sent = SIZE_MAX;
555
//----------------------
556
static size_t receive_data(void *vptr, size_t objs, size_t obj_size, void *v_con)
589
static bool try_WriteStream(CSThread *self, S3ProtocolCon *con, char *ptr, size_t data_len)
558
S3ProtocolCon *con = (S3ProtocolCon*) v_con;
559
size_t data_len = objs * obj_size;
591
volatile bool rtc = true;
563
593
if (con->ms_replyStatus >= 400) { // Collect the error reply.
564
594
if (!con->ms_errorReply)
565
595
con->ms_errorReply = new CSStringBuffer(50);
566
con->ms_errorReply->append((char*)vptr, data_len);
596
con->ms_errorReply->append(ptr, data_len);
567
597
} else if ( con->ms_outputStream)
568
con->ms_outputStream->write((char*)vptr, data_len);
598
con->ms_outputStream->write(ptr, data_len);
571
con->ms_throw_error = true;
607
//----------------------
608
static size_t receive_data(void *vptr, size_t objs, size_t obj_size, void *v_con)
610
S3ProtocolCon *con = (S3ProtocolCon*) v_con;
611
size_t data_len = objs * obj_size;
614
if (try_WriteStream(self, con, (char*)vptr, data_len)) {
615
con->ms_throw_error = true;
616
data_len = (size_t)-1;
575
619
return_(data_len);
578
622
#define IS_REDIRECT(s) ((s >= 300) && (s < 400))
579
623
//----------------------
624
static bool try_addHeader(CSThread *self, S3ProtocolCon *con, char *name, uint32_t name_len, char *value, uint32_t value_len)
627
con->ms_reply_headers.addHeader(name, name_len, value, value_len);
636
//----------------------
580
637
static size_t receive_header(void *header, size_t objs, size_t obj_size, void *v_con)
582
639
S3ProtocolCon *con = (S3ProtocolCon*) v_con;
584
641
char *end, *ptr = (char*) header, *name, *value = NULL;
585
642
uint32_t name_len =0, value_len = 0;
587
CLOBBER_PROTECT(con);
588
CLOBBER_PROTECT(size);
589
CLOBBER_PROTECT(ptr);
590
CLOBBER_PROTECT(value);
591
CLOBBER_PROTECT(value_len);
592
CLOBBER_PROTECT(name_len);
594
644
//printf( "receive_header: %s\n", ptr);
595
645
end = ptr + size;
596
646
if (*(end -2) == '\r' && *(end -1) == '\n')
638
688
while (value[value_len-1] == ' ') value_len--;
640
690
if (!strncasecmp(name, "ETag", 4)) {
641
value++; value_len -=2; // Strip quotation marks from checksum string.
692
value++; value_len -=2; // Strip quotation marks from checksum string.
642
694
if (value_len == HEX_CHECKSUM_VALUE_SIZE) {
643
695
memcpy(con->ms_s3Checksum, value, value_len);
644
696
con->ms_s3Checksum[value_len] = 0;
650
con->ms_reply_headers.addHeader(name, name_len, value, value_len);
701
if (try_addHeader(self, con, name, name_len, value, value_len)) {
702
con->ms_throw_error = true;
654
con->ms_throw_error = true;
716
763
con_data->ms_execute_delete_request();
718
765
if (con_data->ms_retry) {
719
if (retry_count == s3_maxRetrys) {
766
if (retry_count == s3_maxRetries) {
720
767
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
722
printf("RETRY: s3_delete()\n");
769
//printf("RETRY: s3_delete()\n");
771
self->sleep(s3_sleepTime);
817
865
if (con_data->ms_retry) {
818
if (retry_count == s3_maxRetrys) {
866
if (retry_count == s3_maxRetries) {
819
867
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
821
printf("RETRY: s3_copy()\n");
869
//printf("RETRY: s3_copy()\n");
871
self->sleep(s3_sleepTime);
833
882
//-------------------------------
834
CSVector *CSS3Protocol::s3_receive(CSOutputStream *output, const char *bucket, const char *key, bool *found)
883
CSVector *CSS3Protocol::s3_receive(CSOutputStream *output, const char *bucket, const char *key, bool *found, S3RangePtr range, time_t *last_modified)
836
885
CSStringBuffer *s3_buffer;
872
927
s3_buffer->append(date);
873
928
con_data->ms_setHeader(s3_buffer->getCString());
932
snprintf(buffer, 80,"Range: bytes=%"PRIu64"-%"PRIu64, range->startByte, range->endByte);
934
range_header = CSString::newString(buffer);
875
936
// Create the authentication signature and add the 'Authorization' header
876
signed_str = s3_getSignature("GET", NULL, NULL, date, bucket, key);
938
con_data->ms_setHeader(range_header->getCString());
939
signed_str = s3_getSignature(http_op, NULL, NULL, date, bucket, key, NULL);
877
940
push_(signed_str);
878
941
s3_buffer->setLength(0);
879
942
s3_buffer->append("Authorization: AWS ");
883
946
release_(signed_str); signed_str = NULL;
884
947
con_data->ms_setHeader(s3_buffer->getCString());
886
con_data->ms_execute_get_request(RETAIN(output));
949
if (output) output->retain();
950
con_data->ms_execute_get_request(output);
888
952
if (con_data->ms_retry) {
889
if (retry_count == s3_maxRetrys) {
953
if (retry_count == s3_maxRetries) {
890
954
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
892
printf("RETRY: s3_receive()\n");
956
//printf("RETRY: s3_receive()\n");
959
self->sleep(s3_sleepTime);
964
*last_modified = con_data->ms_last_modified;
898
965
*found = !con_data->ms_notFound;
899
966
replyHeaders = con_data->ms_reply_headers.takeHeaders();
900
967
release_(con_data);
901
968
release_(s3_buffer);
904
972
return_(replyHeaders);
1035
1105
con_data->ms_execute_get_request(RETAIN(output));
1037
1107
if (con_data->ms_retry) {
1038
if (retry_count == s3_maxRetrys) {
1108
if (retry_count == s3_maxRetries) {
1039
1109
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
1041
printf("RETRY: s3_list()\n");
1111
//printf("RETRY: s3_list()\n");
1043
1113
output->reset();
1114
self->sleep(s3_sleepTime);
1130
1198
s3_buffer->append(content_type);
1131
1199
con_data->ms_setHeader(s3_buffer->getCString());
1202
// Add the Md5 checksum header
1204
memset(checksum, 0, 32);
1205
base64Encode(digest->val, 16, checksum, 32);
1207
s3_buffer->setLength(0);
1208
s3_buffer->append("Content-MD5: ");
1209
s3_buffer->append(checksum);
1210
con_data->ms_setHeader(s3_buffer->getCString());
1211
con_data->ms_calculate_md5 = false;
1213
con_data->ms_calculate_md5 = true;
1133
1216
// Create the authentication signature and add the 'Authorization' header
1134
1217
if (!s3Authorization)
1135
signed_str = s3_getSignature("PUT", NULL, content_type, date, bucket, key);
1218
signed_str = s3_getSignature("PUT", md5, content_type, date, bucket, key);
1137
1220
signed_str = CSString::newString(s3Authorization);
1138
1221
push_(signed_str);
1144
1227
release_(signed_str); signed_str = NULL;
1145
1228
con_data->ms_setHeader(s3_buffer->getCString());
1147
con_data->ms_execute_put_request(RETAIN(input), size, digest);
1230
con_data->ms_execute_put_request(RETAIN(input), size);
1149
1232
if (con_data->ms_retry) {
1150
if (retry_count == s3_maxRetrys) {
1233
if (retry_count == s3_maxRetries) {
1151
1234
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
1153
printf("RETRY: s3_send()\n");
1236
//printf("RETRY: s3_send()\n");
1155
1238
input->reset();
1239
self->sleep(s3_sleepTime);
1159
1243
replyHeaders = con_data->ms_reply_headers.takeHeaders();
1160
1245
release_(con_data);
1161
1246
release_(s3_buffer);
1162
1247
release_(input);
1211
1296
printf("Delete object:\n\t%s d <bucket> <object_key>\n", cmd);
1212
1297
printf("Delete all object with a given prefix:\n\t%s D <bucket> <object_prefix>\n", cmd);
1213
1298
printf("Get object, data will be written to 'prottest.out':\n\t%s g <bucket> <object_key> <timeout>\n", cmd);
1299
printf("Get object header only:\n\t%s h <bucket> <object_key> <timeout>\n", cmd);
1214
1300
printf("Put (Upload) an object:\n\t%s p <bucket> <object_key> <file>\n", cmd);
1215
1301
printf("List objects in the bucket:\n\t%s l <bucket> [<object_prefix> [max_list_size]]\n", cmd);
1216
1302
printf("Copy object:\n\t%s c <src_bucket> <src_object_key> <dst_bucket> <dst_object_key> \n", cmd);
1265
1352
new_(prot, CSS3Protocol());
1268
prot->s3_setServer("s3.amazonaws.com/");
1355
server = getenv("S3_SERVER");
1356
if ((server == NULL) || (*server == 0))
1357
server = "s3.amazonaws.com/";
1358
prot->s3_setServer(server);
1269
1359
prot->s3_setPublicKey(pub_key);
1270
1360
prot->s3_setPrivateKey(priv_key);
1310
1400
case 'g': // Get the object
1401
if ((argc == 4) || (argc == 6)) {
1312
1402
CSFile *output;
1313
1403
CSVector *headers;
1405
S3RangeRec *range_ptr = NULL, range = {0,0};
1408
range.startByte = atoi(argv[4]);
1409
range.endByte = atoi(argv[5]);
1316
1413
output = CSFile::newFile("prottest.out");
1318
1415
output->open(CSFile::CREATE | CSFile::TRUNCATE);
1319
headers = prot->s3_receive(output->getOutputStream(), argv[2], argv[3], &found);
1416
headers = prot->s3_receive(output->getOutputStream(), argv[2], argv[3], &found, range_ptr);
1321
1418
printf("%s/%s could not be found.\n", argv[2], argv[3]);
1428
case 'h': // Get the object header
1432
S3RangeRec range = {0,0};
1434
headers = prot->s3_receive(NULL, argv[2], argv[3], &found);
1436
printf("%s/%s could not be found.\n", argv[2], argv[3]);
1438
dump_headers(headers);
1441
printf("Bad command: h <bucket> <object_key>\n");
1331
1445
case 'p': // Put (Upload) the object
1332
1446
if (argc == 5) {
1334
1449
CSVector *headers;
1336
1451
input = CSFile::newFile(argv[4]);
1338
1453
input->open(CSFile::READONLY);
1339
headers = prot->s3_send(input->getInputStream(), argv[2], argv[3], input->myFilePath->getSize());
1454
input->md5Digest(&digest);
1455
headers = prot->s3_send(input->getInputStream(), argv[2], argv[3], input->myFilePath->getSize(), NULL, &digest);
1340
1456
dump_headers(headers);
1341
1457
release_(input);