376
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPGET, 1L));
378
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_NOBODY, 1L));
382
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_FILETIME, 1L));
383
371
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, ms_header_list));
384
// Ask curl to parse the Last-Modified header. This is easier than
385
// parsing it ourselves.
372
THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPGET, 1L));
387
374
ms_outputStream = output;
388
375
if (curl_easy_perform(ms_curl) && !ms_throw_error) {
390
377
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
392
379
ms_outputStream = NULL;
397
382
if (ms_throw_error)
400
385
check_reply_status();
401
curl_easy_getinfo(ms_curl, CURLINFO_FILETIME, &ms_last_modified);
405
inline void ms_execute_put_request(CSInputStream *input, off64_t size)
389
inline void ms_execute_put_request(CSInputStream *input, off64_t size, Md5Digest *digest)
431
415
check_reply_status();
433
if (ms_calculate_md5) {
434
// If the data was not sent with an md5 checksum then verify
435
// the server's md5 value with the one calculated during the send.
436
char checksum[HEX_CHECKSUM_VALUE_SIZE +1];
439
ms_md5.md5_get_digest(&digest);
440
cs_bin_to_hex(HEX_CHECKSUM_VALUE_SIZE, checksum, CHECKSUM_VALUE_SIZE, digest.val);
441
checksum[HEX_CHECKSUM_VALUE_SIZE] = 0;
443
cs_strToUpper(ms_s3Checksum);
444
if (strcmp(checksum, ms_s3Checksum)) {
445
// The request should be restarted in this case.
447
CSException::logException(CS_CONTEXT, CS_ERR_CHECKSUM_ERROR, "Calculated checksum did not match S3 checksum");
417
// Check that the checksums agree
418
char checksum[HEX_CHECKSUM_VALUE_SIZE +1];
420
ms_md5.md5_digest(digest);
421
cs_bin_to_hex(HEX_CHECKSUM_VALUE_SIZE, checksum, CHECKSUM_VALUE_SIZE, digest->val);
422
checksum[HEX_CHECKSUM_VALUE_SIZE] = 0;
424
cs_strToUpper(ms_s3Checksum);
425
if (strcmp(checksum, ms_s3Checksum)) {
426
// The request should be restarted in this case.
428
CSException::logException(CS_CONTEXT, CS_ERR_CHECKSUM_ERROR, "Calculated checksum did not match S3 checksum");
545
524
// CURL callback functions:
546
525
////////////////////////////
547
526
//----------------------
549
static bool try_ReadStream(CSThread *self, S3ProtocolCon *con, unsigned char *ptr, size_t buffer_size, size_t *data_sent)
551
volatile bool rtc = true;
553
*data_sent = con->ms_inputStream->read((char*)ptr, buffer_size);
554
if (*data_sent <= con->ms_data_size) {
555
con->ms_data_size -= *data_sent;
557
con->ms_md5.md5_append(ptr, *data_sent); // Calculating the checksum for the data sent.
558
} else if (*data_sent > con->ms_data_size)
559
CSException::RecordException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Blob larger than expected.");
560
else if (con->ms_data_size && !*data_sent)
561
CSException::RecordException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Blob smaller than expected.");
570
//----------------------
571
527
static size_t send_callback(void *ptr, size_t objs, size_t obj_size, void *v_con)
573
529
S3ProtocolCon *con = (S3ProtocolCon*) v_con;
580
if (try_ReadStream(self, con, (unsigned char*)ptr, buffer_size, &data_sent)) {
581
con->ms_throw_error = true;
582
data_sent = (size_t)-1;
537
data_sent = con->ms_inputStream->read((char*)ptr, buffer_size);
538
if (data_sent <= con->ms_data_size) {
539
con->ms_data_size -= data_sent;
541
con->ms_md5.md5_append((u_char*)ptr, data_sent); // Calculating the checksum for the data sent.
542
} else if (data_sent > con->ms_data_size)
543
CSException::RecordException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Blob larger than expected.");
544
else if (con->ms_data_size && !data_sent)
545
CSException::RecordException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Blob smaller than expected.");
548
con->ms_throw_error = true;
549
data_sent = SIZE_MAX;
555
//----------------------
556
static size_t receive_data(void *vptr, size_t objs, size_t obj_size, void *v_con)
558
S3ProtocolCon *con = (S3ProtocolCon*) v_con;
559
size_t data_len = objs * obj_size;
589
static bool try_WriteStream(CSThread *self, S3ProtocolCon *con, char *ptr, size_t data_len)
591
volatile bool rtc = true;
593
563
if (con->ms_replyStatus >= 400) { // Collect the error reply.
594
564
if (!con->ms_errorReply)
595
565
con->ms_errorReply = new CSStringBuffer(50);
596
con->ms_errorReply->append(ptr, data_len);
566
con->ms_errorReply->append((char*)vptr, data_len);
597
567
} else if ( con->ms_outputStream)
598
con->ms_outputStream->write(ptr, data_len);
568
con->ms_outputStream->write((char*)vptr, data_len);
571
con->ms_throw_error = true;
607
//----------------------
608
static size_t receive_data(void *vptr, size_t objs, size_t obj_size, void *v_con)
610
S3ProtocolCon *con = (S3ProtocolCon*) v_con;
611
size_t data_len = objs * obj_size;
614
if (try_WriteStream(self, con, (char*)vptr, data_len)) {
615
con->ms_throw_error = true;
616
data_len = (size_t)-1;
619
575
return_(data_len);
622
578
#define IS_REDIRECT(s) ((s >= 300) && (s < 400))
623
579
//----------------------
624
static bool try_addHeader(CSThread *self, S3ProtocolCon *con, char *name, uint32_t name_len, char *value, uint32_t value_len)
627
con->ms_reply_headers.addHeader(name, name_len, value, value_len);
636
//----------------------
637
580
static size_t receive_header(void *header, size_t objs, size_t obj_size, void *v_con)
639
582
S3ProtocolCon *con = (S3ProtocolCon*) v_con;
641
584
char *end, *ptr = (char*) header, *name, *value = NULL;
642
585
uint32_t name_len =0, value_len = 0;
587
CLOBBER_PROTECT(con);
588
CLOBBER_PROTECT(size);
589
CLOBBER_PROTECT(ptr);
590
CLOBBER_PROTECT(value);
591
CLOBBER_PROTECT(value_len);
592
CLOBBER_PROTECT(name_len);
644
594
//printf( "receive_header: %s\n", ptr);
645
595
end = ptr + size;
646
596
if (*(end -2) == '\r' && *(end -1) == '\n')
688
638
while (value[value_len-1] == ' ') value_len--;
690
640
if (!strncasecmp(name, "ETag", 4)) {
692
value++; value_len -=2; // Strip quotation marks from checksum string.
641
value++; value_len -=2; // Strip quotation marks from checksum string.
694
642
if (value_len == HEX_CHECKSUM_VALUE_SIZE) {
695
643
memcpy(con->ms_s3Checksum, value, value_len);
696
644
con->ms_s3Checksum[value_len] = 0;
701
if (try_addHeader(self, con, name, name_len, value, value_len)) {
702
con->ms_throw_error = true;
650
con->ms_reply_headers.addHeader(name, name_len, value, value_len);
654
con->ms_throw_error = true;
763
716
con_data->ms_execute_delete_request();
765
718
if (con_data->ms_retry) {
766
if (retry_count == s3_maxRetries) {
719
if (retry_count == s3_maxRetrys) {
767
720
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
769
//printf("RETRY: s3_delete()\n");
722
printf("RETRY: s3_delete()\n");
771
self->sleep(s3_sleepTime);
865
817
if (con_data->ms_retry) {
866
if (retry_count == s3_maxRetries) {
818
if (retry_count == s3_maxRetrys) {
867
819
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
869
//printf("RETRY: s3_copy()\n");
821
printf("RETRY: s3_copy()\n");
871
self->sleep(s3_sleepTime);
882
833
//-------------------------------
883
CSVector *CSS3Protocol::s3_receive(CSOutputStream *output, const char *bucket, const char *key, bool *found, S3RangePtr range, time_t *last_modified)
834
CSVector *CSS3Protocol::s3_receive(CSOutputStream *output, const char *bucket, const char *key, bool *found)
885
836
CSStringBuffer *s3_buffer;
927
872
s3_buffer->append(date);
928
873
con_data->ms_setHeader(s3_buffer->getCString());
932
snprintf(buffer, 80,"Range: bytes=%"PRIu64"-%"PRIu64, range->startByte, range->endByte);
934
range_header = CSString::newString(buffer);
936
875
// Create the authentication signature and add the 'Authorization' header
938
con_data->ms_setHeader(range_header->getCString());
939
signed_str = s3_getSignature(http_op, NULL, NULL, date, bucket, key, NULL);
876
signed_str = s3_getSignature("GET", NULL, NULL, date, bucket, key);
940
877
push_(signed_str);
941
878
s3_buffer->setLength(0);
942
879
s3_buffer->append("Authorization: AWS ");
946
883
release_(signed_str); signed_str = NULL;
947
884
con_data->ms_setHeader(s3_buffer->getCString());
949
if (output) output->retain();
950
con_data->ms_execute_get_request(output);
886
con_data->ms_execute_get_request(RETAIN(output));
952
888
if (con_data->ms_retry) {
953
if (retry_count == s3_maxRetries) {
889
if (retry_count == s3_maxRetrys) {
954
890
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
956
//printf("RETRY: s3_receive()\n");
892
printf("RETRY: s3_receive()\n");
959
self->sleep(s3_sleepTime);
964
*last_modified = con_data->ms_last_modified;
965
898
*found = !con_data->ms_notFound;
966
899
replyHeaders = con_data->ms_reply_headers.takeHeaders();
967
900
release_(con_data);
968
901
release_(s3_buffer);
972
904
return_(replyHeaders);
1105
1035
con_data->ms_execute_get_request(RETAIN(output));
1107
1037
if (con_data->ms_retry) {
1108
if (retry_count == s3_maxRetries) {
1038
if (retry_count == s3_maxRetrys) {
1109
1039
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
1111
//printf("RETRY: s3_list()\n");
1041
printf("RETRY: s3_list()\n");
1113
1043
output->reset();
1114
self->sleep(s3_sleepTime);
1198
1130
s3_buffer->append(content_type);
1199
1131
con_data->ms_setHeader(s3_buffer->getCString());
1202
// Add the Md5 checksum header
1204
memset(checksum, 0, 32);
1205
base64Encode(digest->val, 16, checksum, 32);
1207
s3_buffer->setLength(0);
1208
s3_buffer->append("Content-MD5: ");
1209
s3_buffer->append(checksum);
1210
con_data->ms_setHeader(s3_buffer->getCString());
1211
con_data->ms_calculate_md5 = false;
1213
con_data->ms_calculate_md5 = true;
1216
1133
// Create the authentication signature and add the 'Authorization' header
1217
1134
if (!s3Authorization)
1218
signed_str = s3_getSignature("PUT", md5, content_type, date, bucket, key);
1135
signed_str = s3_getSignature("PUT", NULL, content_type, date, bucket, key);
1220
1137
signed_str = CSString::newString(s3Authorization);
1221
1138
push_(signed_str);
1227
1144
release_(signed_str); signed_str = NULL;
1228
1145
con_data->ms_setHeader(s3_buffer->getCString());
1230
con_data->ms_execute_put_request(RETAIN(input), size);
1147
con_data->ms_execute_put_request(RETAIN(input), size, digest);
1232
1149
if (con_data->ms_retry) {
1233
if (retry_count == s3_maxRetries) {
1150
if (retry_count == s3_maxRetrys) {
1234
1151
CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
1236
//printf("RETRY: s3_send()\n");
1153
printf("RETRY: s3_send()\n");
1238
1155
input->reset();
1239
self->sleep(s3_sleepTime);
1243
1159
replyHeaders = con_data->ms_reply_headers.takeHeaders();
1245
1160
release_(con_data);
1246
1161
release_(s3_buffer);
1247
1162
release_(input);
1296
1211
printf("Delete object:\n\t%s d <bucket> <object_key>\n", cmd);
1297
1212
printf("Delete all object with a given prefix:\n\t%s D <bucket> <object_prefix>\n", cmd);
1298
1213
printf("Get object, data will be written to 'prottest.out':\n\t%s g <bucket> <object_key> <timeout>\n", cmd);
1299
printf("Get object header only:\n\t%s h <bucket> <object_key> <timeout>\n", cmd);
1300
1214
printf("Put (Upload) an object:\n\t%s p <bucket> <object_key> <file>\n", cmd);
1301
1215
printf("List objects in the bucket:\n\t%s l <bucket> [<object_prefix> [max_list_size]]\n", cmd);
1302
1216
printf("Copy object:\n\t%s c <src_bucket> <src_object_key> <dst_bucket> <dst_object_key> \n", cmd);
1352
1265
new_(prot, CSS3Protocol());
1355
server = getenv("S3_SERVER");
1356
if ((server == NULL) || (*server == 0))
1357
server = "s3.amazonaws.com/";
1358
prot->s3_setServer(server);
1268
prot->s3_setServer("s3.amazonaws.com/");
1359
1269
prot->s3_setPublicKey(pub_key);
1360
1270
prot->s3_setPrivateKey(priv_key);
1400
1310
case 'g': // Get the object
1401
if ((argc == 4) || (argc == 6)) {
1402
1312
CSFile *output;
1403
1313
CSVector *headers;
1405
S3RangeRec *range_ptr = NULL, range = {0,0};
1408
range.startByte = atoi(argv[4]);
1409
range.endByte = atoi(argv[5]);
1413
1316
output = CSFile::newFile("prottest.out");
1415
1318
output->open(CSFile::CREATE | CSFile::TRUNCATE);
1416
headers = prot->s3_receive(output->getOutputStream(), argv[2], argv[3], &found, range_ptr);
1319
headers = prot->s3_receive(output->getOutputStream(), argv[2], argv[3], &found);
1418
1321
printf("%s/%s could not be found.\n", argv[2], argv[3]);
1428
case 'h': // Get the object header
1432
S3RangeRec range = {0,0};
1434
headers = prot->s3_receive(NULL, argv[2], argv[3], &found);
1436
printf("%s/%s could not be found.\n", argv[2], argv[3]);
1438
dump_headers(headers);
1441
printf("Bad command: h <bucket> <object_key>\n");
1445
1331
case 'p': // Put (Upload) the object
1446
1332
if (argc == 5) {
1449
1334
CSVector *headers;
1451
1336
input = CSFile::newFile(argv[4]);
1453
1338
input->open(CSFile::READONLY);
1454
input->md5Digest(&digest);
1455
headers = prot->s3_send(input->getInputStream(), argv[2], argv[3], input->myFilePath->getSize(), NULL, &digest);
1339
headers = prot->s3_send(input->getInputStream(), argv[2], argv[3], input->myFilePath->getSize());
1456
1340
dump_headers(headers);
1457
1341
release_(input);