~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/cslib/CSS3Protocol.cc

  • Committer: tdavies
  • Date: 2010-10-31 07:38:13 UTC
  • mto: (1897.2.4 merge)
  • mto: This revision was merged to the branch mainline in revision 1899.
  • Revision ID: tdavies@molly-20101031073813-mmu12nqc0bwezxny
struct order_st changed and renamed to c++ class named:Order

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2009 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 * PrimeBase Media Stream for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
 
18
 *
 
19
 *  Created by Barry Leslie on 10/02/09.
 
20
 *
 
21
 */
 
22
#include "CSConfig.h"
 
23
#include <inttypes.h>
 
24
#include <stdlib.h>
 
25
 
 
26
#include <curl/curl.h>
 
27
 
 
28
#include "CSGlobal.h"
 
29
#include "CSString.h"
 
30
#include "CSStrUtil.h"
 
31
#include "CSEncode.h"
 
32
#include "CSS3Protocol.h"
 
33
#include "CSXML.h"
 
34
 
 
35
#ifdef S3_UNIT_TEST
 
36
//#define SHOW_SIGNING
 
37
// Uncomment this line to trace network action during request. Very Usefull!!
 
38
#define DEBUG_CURL
 
39
#define DUMP_ERRORS
 
40
#endif
 
41
 
 
42
//#define DUMP_ERRORS
 
43
//#define SHOW_SIGNING
 
44
 
 
45
#define HEX_CHECKSUM_VALUE_SIZE (2 *CHECKSUM_VALUE_SIZE)
 
46
 
 
47
#define THROW_CURL_IF(v) { if (v) CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);}
 
48
 
 
49
//-------------------------------
 
50
static const char *retryCodes[] = {
 
51
        "ExpiredToken",
 
52
        "InternalError",
 
53
        "OperationAborted",
 
54
        "RequestTimeout",
 
55
        "SlowDown",
 
56
        NULL
 
57
};
 
58
 
 
59
//======================================
 
60
static size_t receive_data(void *ptr, size_t size, size_t nmemb, void *stream);
 
61
static size_t receive_header(void *ptr, size_t size, size_t nmemb, void *stream);
 
62
static size_t send_callback(void *ptr, size_t size, size_t nmemb, void *stream);
 
63
 
 
64
class S3ProtocolCon : CSXMLBuffer, public CSObject {
 
65
 
 
66
        private:
 
67
        
 
68
        virtual bool openNode(char *path, char *value) {
 
69
                if (value && *value && (strcmp(path,"/error/code/") == 0)) {
 
70
                        printf("S3 ERROR Code: %s\n", value);
 
71
                        for (int i = 0; retryCodes[i] && !ms_retry; i++)
 
72
                                ms_retry = (strcmp(value, retryCodes[i]) == 0);
 
73
                                
 
74
                        if (ms_retry && !strcmp("slowdown", value)) 
 
75
                                ms_slowDown = true;
 
76
                } else if (value && *value && (strcmp(path,"/error/message/") == 0)) {
 
77
                        printf("S3 ERROR MESSAGE: %s\n", value);
 
78
                }
 
79
                return true;
 
80
        }
 
81
 
 
82
        virtual bool closeNode(char *path) {
 
83
                (void)path;
 
84
                return true;
 
85
        }
 
86
 
 
87
        virtual bool addAttribute(char *path, char *name, char *value) {
 
88
                (void)path;
 
89
                (void)name;
 
90
                (void)value;
 
91
                return true;
 
92
        }
 
93
        
 
94
        //-------------------------------
 
95
        void parse_s3_error()
 
96
        {
 
97
                enter_();
 
98
 
 
99
                if (!ms_errorReply)
 
100
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Missing HTTP reply: possible S3 connection failure.");
 
101
 
 
102
        #ifdef DUMP_ERRORS
 
103
                printf("ms_errorReply:\n===========\n%s\n===========\n", ms_errorReply->getCString());
 
104
        #endif
 
105
                
 
106
                if (!parseData(ms_errorReply->getCString(), ms_errorReply->length(), 0)){
 
107
                        int             err;
 
108
                        char    *msg;
 
109
 
 
110
                        getError(&err, &msg);
 
111
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, msg);
 
112
                }
 
113
                
 
114
                exit_();
 
115
        }
 
116
        
 
117
        public:
 
118
        
 
119
        CSHTTPHeaders   ms_reply_headers;
 
120
        CSStringBuffer  ms_buffer; // A scratch buffer
 
121
 
 
122
        CURL                    *ms_curl;       
 
123
        struct curl_slist       *ms_header_list;        // A curl list of headers to be sent with the next request.
 
124
 
 
125
        CSInputStream   *ms_inputStream;
 
126
        CSOutputStream  *ms_outputStream;
 
127
        
 
128
        CSMd5                   ms_md5;
 
129
        char                    ms_s3Checksum[HEX_CHECKSUM_VALUE_SIZE +1];
 
130
        bool                    ms_calculate_md5;
 
131
        
 
132
        bool                    ms_notFound; // True if the object could not be found
 
133
        bool                    ms_retry; // True if the request failed with a retry error.
 
134
        bool                    ms_slowDown;
 
135
        
 
136
        CSStringBuffer  *ms_errorReply;
 
137
        char                    ms_curl_error[CURL_ERROR_SIZE];
 
138
        
 
139
        off64_t                 ms_data_size;
 
140
        
 
141
        unsigned int    ms_replyStatus;
 
142
        bool                    ms_throw_error; // Gets set if an exception occurs in a callback.
 
143
        bool                    ms_old_libcurl;
 
144
        char                    *ms_safe_url;
 
145
        time_t                  ms_last_modified;
 
146
        
 
147
        S3ProtocolCon():
 
148
                ms_curl(NULL),
 
149
                ms_header_list(NULL),
 
150
                ms_inputStream(NULL),
 
151
                ms_outputStream(NULL),
 
152
                ms_calculate_md5(false),
 
153
                ms_notFound(false),
 
154
                ms_retry(false),
 
155
                ms_slowDown(false),
 
156
                ms_errorReply(NULL),
 
157
                ms_data_size(0),
 
158
                ms_replyStatus(0),
 
159
                ms_throw_error(false),
 
160
                ms_old_libcurl(false),
 
161
                ms_safe_url(NULL),
 
162
                ms_last_modified(0)
 
163
        {
 
164
        
 
165
                ms_curl = curl_easy_init();
 
166
                if (!ms_curl)
 
167
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_easy_init() failed.");
 
168
 
 
169
                curl_version_info_data *curl_ver = curl_version_info(CURLVERSION_NOW); 
 
170
                
 
171
                // libCurl versions prior to 7.17.0 did not make copies of strings passed into curl_easy_setopt()
 
172
                // If this version requirement is a problem I can do this myself, if I have to, I guess. :(
 
173
                if (curl_ver->version_num < 0X071700 ) {
 
174
                        ms_old_libcurl = true;
 
175
                        
 
176
                        //char msg[200];
 
177
                        //snprintf(msg, 200, "libcurl version %s is too old, require version 7.17.0 or newer.", curl_ver->version);
 
178
                        //CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, msg);
 
179
                }
 
180
                
 
181
                if (curl_easy_setopt(ms_curl, CURLOPT_ERRORBUFFER, ms_curl_error))
 
182
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_easy_setopt(CURLOPT_ERRORBUFFER) failed.");
 
183
                
 
184
#ifdef DEBUG_CURL
 
185
                curl_easy_setopt(ms_curl, CURLOPT_VERBOSE, 1L);
 
186
#endif          
 
187
                //THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_TCP_NODELAY, 1L));
 
188
        
 
189
 
 
190
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_NOPROGRESS, 1L));
 
191
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_WRITEFUNCTION, receive_data));
 
192
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_READFUNCTION, send_callback));  
 
193
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HEADERFUNCTION, receive_header));
 
194
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_WRITEDATA, this));
 
195
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_READDATA, this));
 
196
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_WRITEHEADER, this));
 
197
                
 
198
 
 
199
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_FOLLOWLOCATION, 1L)); // Follow redirects.
 
200
        
 
201
        }
 
202
        
 
203
        ~S3ProtocolCon()
 
204
        {
 
205
                if (ms_curl) 
 
206
                        curl_easy_cleanup(ms_curl);                     
 
207
                if (ms_header_list) 
 
208
                        curl_slist_free_all(ms_header_list);                    
 
209
                if (ms_inputStream)
 
210
                        ms_inputStream->release();
 
211
                if (ms_outputStream)
 
212
                        ms_outputStream->release();
 
213
                if (ms_errorReply)
 
214
                        ms_errorReply->release();
 
215
                        
 
216
                ms_reply_headers.clearHeaders();
 
217
                
 
218
                if (ms_safe_url)
 
219
                        cs_free(ms_safe_url);
 
220
        }
 
221
 
 
222
        inline void check_reply_status() 
 
223
        {
 
224
                if (ms_replyStatus > 199 && ms_replyStatus < 300)
 
225
                        return;
 
226
                
 
227
                
 
228
                
 
229
                switch (ms_replyStatus) {
 
230
                        case 200:
 
231
                        case 204:       // No Content
 
232
                        //case 301: // Moved Permanently
 
233
                        //case 307: // Temporary Redirect
 
234
                                break;
 
235
                        case 404:       // Not Found
 
236
                        case 403:       // Forbidden (S3 object not found)
 
237
                                ms_notFound = true;
 
238
                                break;
 
239
                        case 500:       
 
240
                                ms_retry = true;
 
241
                                break;
 
242
                        default: {
 
243
                                parse_s3_error();
 
244
                                
 
245
                                
 
246
                                
 
247
                                if (!ms_retry) {
 
248
                                        enter_();
 
249
                                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_errorReply->getCString());
 
250
                                        outer_();
 
251
                                } else if (ms_slowDown) {
 
252
                                        enter_();
 
253
                                        CSException::logException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 slow down request.");
 
254
                                        self->sleep(10); // sleep for 1/100 second.
 
255
                                        outer_();
 
256
                                }
 
257
                        }
 
258
                }
 
259
                
 
260
        }
 
261
 
 
262
        
 
263
        inline void ms_reset()
 
264
        {
 
265
                // Remove any old headers
 
266
                if (ms_header_list) {
 
267
                        curl_slist_free_all(ms_header_list);
 
268
                        ms_header_list = NULL;
 
269
                }
 
270
 
 
271
                ms_reply_headers.clearHeaders();
 
272
                ms_replyStatus = 0;
 
273
                ms_throw_error = false;
 
274
                if (ms_errorReply)
 
275
                        ms_errorReply->setLength(0);
 
276
                        
 
277
                ms_s3Checksum[0] = 0;
 
278
                ms_notFound = false;
 
279
                ms_retry = false;
 
280
                
 
281
                if (ms_outputStream) {
 
282
                        ms_outputStream->release();
 
283
                        ms_outputStream = NULL;
 
284
                }
 
285
                if (ms_inputStream) {
 
286
                        ms_inputStream->release();
 
287
                        ms_inputStream = NULL;
 
288
                }
 
289
                
 
290
                if (ms_safe_url) {
 
291
                        cs_free(ms_safe_url);
 
292
                        ms_safe_url = NULL;
 
293
                }
 
294
        }
 
295
        
 
296
        inline void ms_setHeader(const char *header)
 
297
        {
 
298
                ms_header_list = curl_slist_append(ms_header_list, header);
 
299
                if (!ms_header_list) 
 
300
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "curl_slist_append() failed.");
 
301
        }
 
302
 
 
303
 
 
304
private:        
 
305
        inline const char *safe_url(const char *url)
 
306
        {
 
307
                if (ms_old_libcurl == false)
 
308
                        return url;
 
309
                        
 
310
                if (ms_safe_url) {
 
311
                        cs_free(ms_safe_url);
 
312
                        ms_safe_url = NULL;
 
313
                }
 
314
                ms_safe_url = cs_strdup(url);
 
315
                return ms_safe_url;
 
316
        }
 
317
        
 
318
public: 
 
319
        inline void ms_setURL(const char *url)
 
320
        {
 
321
                //printf("URL: \"%s\n", url);
 
322
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_URL, safe_url(url)));
 
323
        }
 
324
        
 
325
        inline void ms_execute_delete_request()
 
326
        {
 
327
                CURLcode rtc;
 
328
                
 
329
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, ms_header_list));
 
330
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_CUSTOMREQUEST, "DELETE"));
 
331
 
 
332
                rtc = curl_easy_perform(ms_curl);
 
333
                
 
334
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_CUSTOMREQUEST, NULL)); // IMPORTANT: Reset this to it's default value
 
335
 
 
336
                if (rtc && !ms_throw_error)
 
337
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
 
338
                        
 
339
                if (ms_throw_error) {
 
340
                        enter_();
 
341
                        throw_();
 
342
                        outer_();
 
343
                }
 
344
                
 
345
                check_reply_status();
 
346
        }
 
347
        
 
348
        inline void ms_execute_copy_request()
 
349
        {
 
350
                CURLcode rtc;
 
351
                
 
352
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, ms_header_list));
 
353
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_INFILESIZE_LARGE, 0));
 
354
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_UPLOAD, 1L));
 
355
                
 
356
                rtc = curl_easy_perform(ms_curl);
 
357
                
 
358
                if (rtc && !ms_throw_error)
 
359
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
 
360
                        
 
361
                if (ms_throw_error) {
 
362
                        enter_();
 
363
                        throw_();
 
364
                        outer_();
 
365
                }
 
366
                
 
367
                check_reply_status();
 
368
        }
 
369
        
 
370
        inline void ms_execute_get_request(CSOutputStream *output)
 
371
        {
 
372
                enter_();
 
373
                
 
374
                if (output) {
 
375
                        push_(output);
 
376
                        THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPGET, 1L));
 
377
                } else {
 
378
                        THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_NOBODY, 1L));
 
379
                }
 
380
                
 
381
                // 
 
382
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_FILETIME, 1L));
 
383
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, ms_header_list));
 
384
    // Ask curl to parse the Last-Modified header.  This is easier than
 
385
    // parsing it ourselves.
 
386
 
 
387
                ms_outputStream = output;       
 
388
                if (curl_easy_perform(ms_curl) && !ms_throw_error) {
 
389
                        ms_outputStream = NULL; 
 
390
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
 
391
                }
 
392
                ms_outputStream = NULL; 
 
393
                if (output){
 
394
                        release_(output);
 
395
                }
 
396
                
 
397
                if (ms_throw_error) 
 
398
                        throw_();
 
399
                
 
400
                check_reply_status();
 
401
                curl_easy_getinfo(ms_curl, CURLINFO_FILETIME, &ms_last_modified);
 
402
                exit_();                
 
403
        }
 
404
        
 
405
        inline void ms_execute_put_request(CSInputStream *input, off64_t size)
 
406
        {
 
407
                enter_();
 
408
                
 
409
                push_(input);   
 
410
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_HTTPHEADER, ms_header_list));
 
411
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_INFILESIZE_LARGE, size));
 
412
                THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_UPLOAD, 1L));
 
413
                //THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_POSTFIELDSIZE_LARGE, size));
 
414
                //THROW_CURL_IF(curl_easy_setopt(ms_curl, CURLOPT_POST, 1L));
 
415
 
 
416
                ms_md5.md5_init();
 
417
                
 
418
                ms_data_size = size;
 
419
                ms_inputStream = input; 
 
420
                if (curl_easy_perform(ms_curl) && !ms_throw_error) {
 
421
                        ms_inputStream = NULL;  
 
422
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, ms_curl_error);
 
423
                }
 
424
                ms_inputStream = NULL;
 
425
                release_(input);        
 
426
 
 
427
                        
 
428
                if (ms_throw_error)
 
429
                        throw_();
 
430
                
 
431
                check_reply_status();
 
432
                
 
433
                if (ms_calculate_md5) {
 
434
                        // If the data was not sent with an md5 checksum then verify
 
435
                        // the server's md5 value with the one calculated during the send.
 
436
                        char checksum[HEX_CHECKSUM_VALUE_SIZE +1];
 
437
                        Md5Digest digest;
 
438
                        
 
439
                        ms_md5.md5_get_digest(&digest);
 
440
                        cs_bin_to_hex(HEX_CHECKSUM_VALUE_SIZE, checksum, CHECKSUM_VALUE_SIZE, digest.val);
 
441
                        checksum[HEX_CHECKSUM_VALUE_SIZE] = 0;
 
442
                        
 
443
                        cs_strToUpper(ms_s3Checksum);
 
444
                        if (strcmp(checksum, ms_s3Checksum)) {
 
445
                                // The request should be restarted in this case.
 
446
                                ms_retry = true;
 
447
                                CSException::logException(CS_CONTEXT, CS_ERR_CHECKSUM_ERROR, "Calculated checksum did not match S3 checksum");
 
448
                        }
 
449
                }
 
450
 
 
451
                exit_();                
 
452
        }
 
453
        
 
454
};
 
455
 
 
456
//======================================
 
457
 
 
458
 
 
459
 
 
460
 
 
461
//======================================
 
462
CSS3Protocol::CSS3Protocol():
 
463
        s3_server(NULL),
 
464
        s3_public_key(NULL),
 
465
        s3_private_key(NULL),
 
466
        s3_maxRetries(5),
 
467
        s3_sleepTime(0)
 
468
{
 
469
        new_(s3_server, CSStringBuffer());
 
470
        s3_server->append("s3.amazonaws.com/");
 
471
 
 
472
        s3_public_key = CSString::newString("");
 
473
        s3_private_key = CSString::newString("");
 
474
        
 
475
}
 
476
 
 
477
//------------------
 
478
CSS3Protocol::~CSS3Protocol()
 
479
{
 
480
        if (s3_server)
 
481
                s3_server->release();
 
482
        
 
483
        if (s3_public_key)
 
484
                s3_public_key->release();
 
485
        
 
486
        if (s3_private_key)
 
487
                s3_private_key->release();
 
488
}
 
489
        
 
490
//------------------
 
491
CSString *CSS3Protocol::s3_getSignature(const char *verb, 
 
492
                                                                                const char *md5, 
 
493
                                                                                const char *content_type, 
 
494
                                                                                const char *date, 
 
495
                                                                                const char *bucket, 
 
496
                                                                                const char *key,
 
497
                                                                                CSString *headers 
 
498
                                                                        )
 
499
{
 
500
        CSStringBuffer *s3_buffer;
 
501
        enter_();
 
502
        if (headers)
 
503
                push_(headers);
 
504
        
 
505
        new_(s3_buffer, CSStringBuffer());
 
506
        push_(s3_buffer);
 
507
        
 
508
        s3_buffer->setLength(0);
 
509
        s3_buffer->append(verb);        
 
510
        s3_buffer->append("\n");        
 
511
        if (md5) s3_buffer->append(md5);        
 
512
        s3_buffer->append("\n");        
 
513
        if (content_type) s3_buffer->append(content_type);      
 
514
        s3_buffer->append("\n");        
 
515
        s3_buffer->append(date);
 
516
        if (headers) { 
 
517
                // Note: headers are assumed to be in lower case, sorted, and containing no white space.
 
518
                s3_buffer->append("\n");        
 
519
                s3_buffer->append(headers->getCString());
 
520
        }
 
521
        s3_buffer->append("\n/");
 
522
        s3_buffer->append(bucket);
 
523
        s3_buffer->append("/");
 
524
        s3_buffer->append(key);
 
525
        
 
526
#ifdef SHOW_SIGNING
 
527
printf("signing:\n=================\n%s\n=================\n",  s3_buffer->getCString());
 
528
if(0){
 
529
        const char *ptr = s3_buffer->getCString();
 
530
        while (*ptr) {
 
531
                printf("%x ", *ptr); ptr++;
 
532
        }
 
533
        printf("\n");
 
534
}
 
535
#endif
 
536
 
 
537
        CSString *sig = signature(s3_buffer->getCString(), s3_private_key->getCString());
 
538
        release_(s3_buffer);
 
539
        if (headers) 
 
540
                release_(headers);
 
541
 
 
542
        return_(sig);
 
543
}
 
544
//----------------------
 
545
// CURL callback functions:
 
546
////////////////////////////
 
547
//----------------------
 
548
//-----------------
 
549
static bool try_ReadStream(CSThread *self, S3ProtocolCon *con, unsigned char *ptr, size_t buffer_size, size_t *data_sent)
 
550
{
 
551
        volatile bool rtc = true;
 
552
        try_(a) {
 
553
                *data_sent = con->ms_inputStream->read((char*)ptr, buffer_size);
 
554
                if (*data_sent <= con->ms_data_size) {
 
555
                        con->ms_data_size -= *data_sent;
 
556
                        if (*data_sent)
 
557
                                con->ms_md5.md5_append(ptr, *data_sent); // Calculating the checksum for the data sent.
 
558
                } else if (*data_sent > con->ms_data_size) 
 
559
                        CSException::RecordException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Blob larger than expected.");
 
560
                else if (con->ms_data_size && !*data_sent)
 
561
                        CSException::RecordException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Blob smaller than expected.");
 
562
                rtc = false;
 
563
        }
 
564
        
 
565
        catch_(a)
 
566
        cont_(a);
 
567
        return rtc;
 
568
}
 
569
 
 
570
//----------------------
 
571
static size_t send_callback(void *ptr, size_t objs, size_t obj_size, void *v_con)
 
572
{
 
573
        S3ProtocolCon *con = (S3ProtocolCon*) v_con;
 
574
        size_t data_sent, buffer_size = objs * obj_size;
 
575
 
 
576
        if (!con->ms_data_size)
 
577
                return 0;
 
578
                
 
579
        enter_();
 
580
        if (try_ReadStream(self, con, (unsigned char*)ptr, buffer_size, &data_sent)) {
 
581
                con->ms_throw_error = true;
 
582
                data_sent = (size_t)-1;
 
583
        }
 
584
        
 
585
        return_(data_sent);
 
586
}
 
587
 
 
588
//-----------------
 
589
static bool try_WriteStream(CSThread *self, S3ProtocolCon *con, char *ptr, size_t data_len)
 
590
{
 
591
        volatile bool rtc = true;
 
592
        try_(a) {
 
593
                if (con->ms_replyStatus >= 400) { // Collect the error reply.
 
594
                        if (!con->ms_errorReply)
 
595
                                con->ms_errorReply = new CSStringBuffer(50);            
 
596
                        con->ms_errorReply->append(ptr, data_len);
 
597
                } else if (     con->ms_outputStream)
 
598
                        con->ms_outputStream->write(ptr, data_len);
 
599
                rtc = false;
 
600
        }
 
601
        
 
602
        catch_(a)
 
603
        cont_(a);
 
604
        return rtc;
 
605
}
 
606
 
 
607
//----------------------
 
608
static size_t receive_data(void *vptr, size_t objs, size_t obj_size, void *v_con)
 
609
{
 
610
        S3ProtocolCon *con = (S3ProtocolCon*) v_con;
 
611
        size_t data_len = objs * obj_size;
 
612
 
 
613
        enter_();
 
614
        if (try_WriteStream(self, con, (char*)vptr, data_len)) {
 
615
                con->ms_throw_error = true;
 
616
                data_len = (size_t)-1;
 
617
        }
 
618
 
 
619
        return_(data_len);      
 
620
}
 
621
 
 
622
#define IS_REDIRECT(s) ((s >= 300) && (s < 400))
 
623
//----------------------
 
624
static bool try_addHeader(CSThread *self, S3ProtocolCon *con, char *name, uint32_t name_len, char *value, uint32_t value_len)
 
625
{
 
626
        try_(a) {
 
627
                con->ms_reply_headers.addHeader(name, name_len, value, value_len);
 
628
                return false;
 
629
        }
 
630
        
 
631
        catch_(a);
 
632
        cont_(a);
 
633
        return true;
 
634
}
 
635
 
 
636
//----------------------
 
637
static size_t receive_header(void *header, size_t objs, size_t obj_size, void *v_con)
 
638
{
 
639
        S3ProtocolCon *con = (S3ProtocolCon*) v_con;
 
640
        size_t size = objs * obj_size;
 
641
        char *end, *ptr = (char*) header, *name, *value = NULL;
 
642
        uint32_t name_len =0, value_len = 0;
 
643
        
 
644
//printf(       "receive_header: %s\n", ptr);
 
645
        end = ptr + size;
 
646
        if (*(end -2) == '\r' && *(end -1) == '\n')
 
647
                end -=2;
 
648
                
 
649
        while ((end != ptr) && (*ptr == ' ')) ptr++;
 
650
        if (end == ptr)
 
651
                return size;
 
652
        
 
653
        // Get the reply status.
 
654
        // Status 100 = Continue
 
655
        if (((!con->ms_replyStatus) || (con->ms_replyStatus == 100) || IS_REDIRECT(con->ms_replyStatus) ) 
 
656
                        && !strncasecmp(ptr, "HTTP", 4)
 
657
                ) {
 
658
                char status[4];
 
659
                while ((end != ptr) && (*ptr != ' ')) ptr++; // skip HTTP stuff
 
660
                while ((end != ptr) && (*ptr == ' ')) ptr++; // find the start of eh status code.
 
661
                if (end == ptr)
 
662
                        return size;
 
663
                        
 
664
                if (end < (ptr +3)) // expecting a 3 digit status code.
 
665
                        return size;
 
666
                        
 
667
                memcpy(status, ptr, 3);
 
668
                status[3] = 0;
 
669
                
 
670
                con->ms_replyStatus = atoi(status);
 
671
        }
 
672
        
 
673
        name = ptr;
 
674
        while ((end != ptr) && (*ptr != ':')) ptr++;
 
675
        if (end == ptr)
 
676
                return size;
 
677
        name_len = ptr - name;
 
678
        
 
679
        ptr++; 
 
680
        while ((end != ptr) && (*ptr == ' ')) ptr++;
 
681
        if (end == ptr)
 
682
                return size;
 
683
        
 
684
        value = ptr;
 
685
        value_len = end - value;
 
686
        
 
687
        while (name[name_len-1] == ' ') name_len--;
 
688
        while (value[value_len-1] == ' ') value_len--;
 
689
        
 
690
        if (!strncasecmp(name, "ETag", 4)) {
 
691
                if (*value == '"') {
 
692
                        value++; value_len -=2; // Strip quotation marks from checksum string.
 
693
                }
 
694
                if (value_len == HEX_CHECKSUM_VALUE_SIZE) {
 
695
                        memcpy(con->ms_s3Checksum, value, value_len);
 
696
                        con->ms_s3Checksum[value_len] = 0;
 
697
                }
 
698
        }
 
699
        
 
700
        enter_();
 
701
        if (try_addHeader(self, con, name, name_len, value, value_len)) {
 
702
                con->ms_throw_error = true;
 
703
                size = (size_t)-1;
 
704
        }
 
705
        return_(size);
 
706
}
 
707
 
 
708
//----------------------
 
709
 
 
710
#define SET_DATE_FROM_TIME(t, d) {strftime(d, sizeof(d), "%a, %d %b %Y %H:%M:%S GMT", gmtime(&t));}
 
711
#define SET_DATE(d) {time_t t = time(NULL); SET_DATE_FROM_TIME(t, d);}
 
712
 
 
713
bool CSS3Protocol::s3_delete(const char *bucket, const char *key)
 
714
{
 
715
        CSStringBuffer *s3_buffer;
 
716
        char date[64];
 
717
        CSString *signed_str;
 
718
        uint32_t retry_count = 0;
 
719
        S3ProtocolCon *con_data;
 
720
 
 
721
        enter_();
 
722
 
 
723
        new_(s3_buffer, CSStringBuffer());
 
724
        push_(s3_buffer);
 
725
 
 
726
        new_(con_data, S3ProtocolCon());
 
727
        push_(con_data);
 
728
 
 
729
retry:
 
730
        // Clear old settings. 
 
731
        con_data->ms_reset();   
 
732
        
 
733
        SET_DATE(date);
 
734
 
 
735
        // Build the URL
 
736
        s3_buffer->setLength(0);
 
737
        s3_buffer->append("http://");
 
738
        s3_buffer->append(bucket);
 
739
        s3_buffer->append("."); 
 
740
        s3_buffer->append(s3_server->getCString());
 
741
        s3_buffer->append(key);
 
742
 
 
743
        con_data->ms_setURL(s3_buffer->getCString());
 
744
        
 
745
        // Add the 'DATE' header
 
746
        s3_buffer->setLength(0);
 
747
        s3_buffer->append("Date: ");    
 
748
        s3_buffer->append(date);
 
749
        con_data->ms_setHeader(s3_buffer->getCString());
 
750
 
 
751
        // Create the authentication signature and add the 'Authorization' header
 
752
        signed_str = s3_getSignature("DELETE", NULL, NULL, date, bucket, key);
 
753
        push_(signed_str);
 
754
        s3_buffer->setLength(0);
 
755
        s3_buffer->append("Authorization: AWS ");       
 
756
        s3_buffer->append(s3_public_key->getCString());
 
757
        s3_buffer->append(":"); 
 
758
        s3_buffer->append(signed_str->getCString());    
 
759
        release_(signed_str); signed_str = NULL;
 
760
        
 
761
        con_data->ms_setHeader(s3_buffer->getCString());
 
762
        
 
763
        con_data->ms_execute_delete_request();
 
764
        
 
765
        if (con_data->ms_retry) {
 
766
                if (retry_count == s3_maxRetries) {
 
767
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
 
768
                }
 
769
        //printf("RETRY: s3_delete()\n");
 
770
                retry_count++;
 
771
                self->sleep(s3_sleepTime);
 
772
                goto retry;
 
773
        }
 
774
        
 
775
        bool notFound = con_data->ms_notFound;
 
776
        release_(con_data);
 
777
        release_(s3_buffer);
 
778
        
 
779
        return_(!notFound);
 
780
}
 
781
 
 
782
//-------------------------------
 
783
void CSS3Protocol::s3_copy(const char *dest_server, const char *dest_bucket, const char *dest_key, const char *src_bucket, const char *src_key)
 
784
{
 
785
        CSStringBuffer *s3_buffer;
 
786
        char date[64];
 
787
        CSString *signed_str;
 
788
        uint32_t retry_count = 0;
 
789
        S3ProtocolCon *con_data;
 
790
 
 
791
        enter_();
 
792
 
 
793
        new_(s3_buffer, CSStringBuffer());
 
794
        push_(s3_buffer);
 
795
 
 
796
        new_(con_data, S3ProtocolCon());
 
797
        push_(con_data);
 
798
        
 
799
        if (!dest_server)
 
800
                dest_server = s3_server->getCString();
 
801
 
 
802
retry:
 
803
        // Clear old settings. 
 
804
        con_data->ms_reset();   
 
805
        
 
806
        SET_DATE(date);
 
807
 
 
808
        // Build the URL
 
809
        s3_buffer->setLength(0);
 
810
        s3_buffer->append("http://");
 
811
        s3_buffer->append(dest_bucket);
 
812
        s3_buffer->append("."); 
 
813
        s3_buffer->append(s3_server->getCString());
 
814
        s3_buffer->append(dest_key);
 
815
 
 
816
        con_data->ms_setURL(s3_buffer->getCString());
 
817
        
 
818
        // Add the destination location
 
819
        s3_buffer->setLength(0);
 
820
        s3_buffer->append("Host: ");    
 
821
        s3_buffer->append(dest_bucket);
 
822
        s3_buffer->append("."); 
 
823
        s3_buffer->append(dest_server);
 
824
        s3_buffer->setLength(s3_buffer->length() -1); // trim the '/'
 
825
        con_data->ms_setHeader(s3_buffer->getCString());
 
826
        
 
827
        // Add the source location
 
828
        s3_buffer->setLength(0);
 
829
        s3_buffer->append("x-amz-copy-source:");        
 
830
        s3_buffer->append(src_bucket);
 
831
        s3_buffer->append("/");
 
832
        s3_buffer->append(src_key);
 
833
        con_data->ms_setHeader(s3_buffer->getCString());
 
834
        
 
835
        // Create the authentication signature and add the 'Authorization' header
 
836
        signed_str = s3_getSignature("PUT", NULL, NULL, date, dest_bucket, dest_key, CSString::newString(s3_buffer->getCString()));
 
837
        push_(signed_str);
 
838
 
 
839
        // Add the 'DATE' header
 
840
        s3_buffer->setLength(0);
 
841
        s3_buffer->append("Date: ");    
 
842
        s3_buffer->append(date);
 
843
        con_data->ms_setHeader(s3_buffer->getCString());
 
844
 
 
845
        // Add the signature
 
846
        s3_buffer->setLength(0);
 
847
        s3_buffer->append("Authorization: AWS ");       
 
848
        s3_buffer->append(s3_public_key->getCString());
 
849
        s3_buffer->append(":"); 
 
850
        s3_buffer->append(signed_str->getCString());    
 
851
        release_(signed_str); signed_str = NULL;
 
852
        con_data->ms_setHeader(s3_buffer->getCString());
 
853
        
 
854
        con_data->ms_execute_copy_request();
 
855
        
 
856
        if (con_data->ms_notFound) {
 
857
                s3_buffer->setLength(0);
 
858
                s3_buffer->append("Cloud copy failed, object not found: ");
 
859
                s3_buffer->append(src_bucket);
 
860
                s3_buffer->append(" ");
 
861
                s3_buffer->append(src_key);
 
862
                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, s3_buffer->getCString());
 
863
        }
 
864
        
 
865
        if (con_data->ms_retry) {
 
866
                if (retry_count == s3_maxRetries) {
 
867
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
 
868
                }
 
869
        //printf("RETRY: s3_copy()\n");
 
870
                retry_count++;
 
871
                self->sleep(s3_sleepTime);
 
872
                goto retry;
 
873
        }
 
874
        
 
875
        release_(con_data);
 
876
        release_(s3_buffer);
 
877
        
 
878
        exit_();
 
879
}
 
880
 
 
881
 
 
882
//-------------------------------
 
883
CSVector *CSS3Protocol::s3_receive(CSOutputStream *output, const char *bucket, const char *key, bool *found, S3RangePtr range, time_t *last_modified)
 
884
{
 
885
        CSStringBuffer *s3_buffer;
 
886
    char date[64];
 
887
        CSString *signed_str;
 
888
        uint32_t retry_count = 0;
 
889
        S3ProtocolCon *con_data;
 
890
        CSVector *replyHeaders;
 
891
        CSString *range_header = NULL;
 
892
        const char *http_op;
 
893
 
 
894
        enter_();
 
895
 
 
896
        if (output) {
 
897
                push_(output);
 
898
                http_op = "GET";
 
899
        } else
 
900
                http_op = "HEAD";
 
901
 
 
902
        new_(s3_buffer, CSStringBuffer());
 
903
        push_(s3_buffer);
 
904
 
 
905
        new_(con_data, S3ProtocolCon());
 
906
        push_(con_data);
 
907
 
 
908
retry:
 
909
        // Clear old settings. 
 
910
        con_data->ms_reset();   
 
911
        
 
912
        SET_DATE(date);
 
913
 
 
914
        // Build the URL
 
915
        s3_buffer->setLength(0);
 
916
        s3_buffer->append("http://");
 
917
        s3_buffer->append(bucket);
 
918
        s3_buffer->append("."); 
 
919
        s3_buffer->append(s3_server->getCString());
 
920
        s3_buffer->append(key);
 
921
 
 
922
        con_data->ms_setURL(s3_buffer->getCString());
 
923
        
 
924
        // Add the 'DATE' header
 
925
        s3_buffer->setLength(0);
 
926
        s3_buffer->append("Date: ");    
 
927
        s3_buffer->append(date);
 
928
        con_data->ms_setHeader(s3_buffer->getCString());
 
929
 
 
930
        if (range) {
 
931
                char buffer[80];
 
932
                snprintf(buffer, 80,"Range: bytes=%"PRIu64"-%"PRIu64, range->startByte, range->endByte);
 
933
 
 
934
                range_header = CSString::newString(buffer);
 
935
        }
 
936
        // Create the authentication signature and add the 'Authorization' header
 
937
        if (range_header)
 
938
                con_data->ms_setHeader(range_header->getCString());
 
939
        signed_str = s3_getSignature(http_op, NULL, NULL, date, bucket, key, NULL);
 
940
        push_(signed_str);
 
941
        s3_buffer->setLength(0);
 
942
        s3_buffer->append("Authorization: AWS ");       
 
943
        s3_buffer->append(s3_public_key->getCString());
 
944
        s3_buffer->append(":"); 
 
945
        s3_buffer->append(signed_str->getCString());    
 
946
        release_(signed_str); signed_str = NULL;
 
947
        con_data->ms_setHeader(s3_buffer->getCString());
 
948
        
 
949
        if (output) output->retain();
 
950
        con_data->ms_execute_get_request(output);
 
951
        
 
952
        if (con_data->ms_retry) {
 
953
                if (retry_count == s3_maxRetries) {
 
954
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
 
955
                }
 
956
        //printf("RETRY: s3_receive()\n");
 
957
                retry_count++;
 
958
                output->reset();
 
959
                self->sleep(s3_sleepTime);
 
960
                goto retry;
 
961
        }
 
962
        
 
963
        if (last_modified)
 
964
                *last_modified = con_data->ms_last_modified;
 
965
        *found = !con_data->ms_notFound;
 
966
        replyHeaders = con_data->ms_reply_headers.takeHeaders();
 
967
        release_(con_data);
 
968
        release_(s3_buffer);
 
969
        if (output)
 
970
                release_(output);
 
971
        
 
972
        return_(replyHeaders);
 
973
}
 
974
 
 
975
class S3ListParser : public CSXMLBuffer {
 
976
 
 
977
        CSVector *list;
 
978
        public:
 
979
 
 
980
 
 
981
        bool parseListData(const char *data, size_t len, CSVector *keys)
 
982
        {
 
983
                list = keys;
 
984
                return parseData(data, len, 0);
 
985
        }
 
986
 
 
987
        private:
 
988
        virtual bool openNode(char *path, char *value) {
 
989
                if (value && *value && (strcmp(path,"/listbucketresult/contents/key/") == 0))
 
990
                        list->add(CSString::newString(value));
 
991
                return true;
 
992
        }
 
993
 
 
994
        virtual bool closeNode(char *path) {
 
995
                (void)path;
 
996
                return true;
 
997
        }
 
998
 
 
999
        virtual bool addAttribute(char *path, char *name, char *value) {
 
1000
                (void)path;
 
1001
                (void)name;
 
1002
                (void)value;
 
1003
                return true;
 
1004
        }
 
1005
 
 
1006
};
 
1007
 
 
1008
//-------------------------------
 
1009
static CSVector *parse_s3_list(CSMemoryOutputStream *output)
 
1010
{
 
1011
        S3ListParser s3ListParser;
 
1012
        const char *data;
 
1013
        CSVector *vector;
 
1014
        size_t len;
 
1015
        
 
1016
        enter_();
 
1017
 
 
1018
        push_(output);
 
1019
        
 
1020
        new_(vector, CSVector(10));     
 
1021
        push_(vector);  
 
1022
 
 
1023
        data = (const char *) output->getMemory(&len);
 
1024
        if (!s3ListParser.parseListData(data, len, vector)) {
 
1025
                int             err;
 
1026
                char    *msg;
 
1027
 
 
1028
                s3ListParser.getError(&err, &msg);
 
1029
                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, msg);
 
1030
        }
 
1031
 
 
1032
        pop_(vector);
 
1033
        release_(output);
 
1034
        return_(vector);
 
1035
}
 
1036
 
 
1037
 
 
1038
//-------------------------------
 
1039
CSVector *CSS3Protocol::s3_list(const char *bucket, const char *key_prefix, uint32_t max)
 
1040
{
 
1041
        CSStringBuffer *s3_buffer;
 
1042
    char date[64];
 
1043
        CSString *signed_str;
 
1044
        CSMemoryOutputStream *output;
 
1045
        uint32_t retry_count = 0;
 
1046
        S3ProtocolCon *con_data;
 
1047
        enter_();
 
1048
 
 
1049
        new_(s3_buffer, CSStringBuffer());
 
1050
        push_(s3_buffer);
 
1051
 
 
1052
        output = CSMemoryOutputStream::newStream(1024, 1024);
 
1053
        push_(output);
 
1054
        
 
1055
        new_(con_data, S3ProtocolCon());
 
1056
        push_(con_data);
 
1057
 
 
1058
retry:
 
1059
 
 
1060
        // Clear old settings. 
 
1061
        con_data->ms_reset();   
 
1062
        
 
1063
        SET_DATE(date);
 
1064
 
 
1065
        // Build the URL
 
1066
        s3_buffer->setLength(0);
 
1067
        s3_buffer->append("http://");
 
1068
        s3_buffer->append(bucket);
 
1069
        s3_buffer->append("."); 
 
1070
        s3_buffer->append(s3_server->getCString());
 
1071
//s3_buffer->append("/");       
 
1072
//s3_buffer->append(bucket);
 
1073
        if (key_prefix) {
 
1074
                s3_buffer->append("?prefix=");
 
1075
                s3_buffer->append(key_prefix);
 
1076
        }
 
1077
        
 
1078
        if (max) {
 
1079
                if (key_prefix)
 
1080
                        s3_buffer->append("&max-keys=");
 
1081
                else
 
1082
                        s3_buffer->append("?max-keys=");
 
1083
                s3_buffer->append(max);
 
1084
        }
 
1085
 
 
1086
        con_data->ms_setURL(s3_buffer->getCString());
 
1087
        
 
1088
        // Add the 'DATE' header
 
1089
        s3_buffer->setLength(0);
 
1090
        s3_buffer->append("Date: ");    
 
1091
        s3_buffer->append(date);
 
1092
        con_data->ms_setHeader(s3_buffer->getCString());
 
1093
 
 
1094
        // Create the authentication signature and add the 'Authorization' header
 
1095
        signed_str = s3_getSignature("GET", NULL, NULL, date, bucket, "");
 
1096
        push_(signed_str);
 
1097
        s3_buffer->setLength(0);
 
1098
        s3_buffer->append("Authorization: AWS ");       
 
1099
        s3_buffer->append(s3_public_key->getCString());
 
1100
        s3_buffer->append(":"); 
 
1101
        s3_buffer->append(signed_str->getCString());    
 
1102
        release_(signed_str); signed_str = NULL;
 
1103
        con_data->ms_setHeader(s3_buffer->getCString());
 
1104
        
 
1105
        con_data->ms_execute_get_request(RETAIN(output));
 
1106
        
 
1107
        if (con_data->ms_retry) {
 
1108
                if (retry_count == s3_maxRetries) {
 
1109
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
 
1110
                }
 
1111
        //printf("RETRY: s3_list()\n");
 
1112
                retry_count++;
 
1113
                output->reset();
 
1114
                self->sleep(s3_sleepTime);
 
1115
                goto retry;
 
1116
        }
 
1117
        
 
1118
        release_(con_data);
 
1119
        pop_(output);
 
1120
        release_(s3_buffer);
 
1121
        return_(parse_s3_list(output));
 
1122
}
 
1123
 
 
1124
//-------------------------------
 
1125
CSString *CSS3Protocol::s3_getAuthorization(const char *bucket, const char *key, const char *content_type, uint32_t *s3AuthorizationTime)
 
1126
{
 
1127
    char date[64];
 
1128
        CSString *signed_str;
 
1129
        time_t sys_time;
 
1130
 
 
1131
        enter_();
 
1132
 
 
1133
        if (!content_type)
 
1134
                content_type = "binary/octet-stream";
 
1135
                
 
1136
        sys_time = time(NULL);
 
1137
        
 
1138
        *s3AuthorizationTime = (uint32_t)sys_time;
 
1139
        
 
1140
        SET_DATE_FROM_TIME(sys_time, date);
 
1141
        signed_str = s3_getSignature("PUT", NULL, content_type, date, bucket, key);
 
1142
        return_(signed_str);
 
1143
}
 
1144
 
 
1145
//-------------------------------
 
1146
CSVector *CSS3Protocol::s3_send(CSInputStream *input, const char *bucket, const char *key, off64_t size, const char *content_type, Md5Digest *digest, const char *s3Authorization, time_t s3AuthorizationTime)
 
1147
{
 
1148
        CSStringBuffer *s3_buffer;
 
1149
    char date[64];
 
1150
        CSString *signed_str;
 
1151
        uint32_t retry_count = 0;
 
1152
        S3ProtocolCon *con_data;
 
1153
        CSVector *replyHeaders;
 
1154
        char checksum[32], *md5 = NULL;
 
1155
 
 
1156
        enter_();
 
1157
        push_(input);
 
1158
 
 
1159
        new_(s3_buffer, CSStringBuffer());
 
1160
        push_(s3_buffer);
 
1161
 
 
1162
        new_(con_data, S3ProtocolCon());
 
1163
        push_(con_data);
 
1164
                
 
1165
        if (!content_type)
 
1166
                content_type = "binary/octet-stream";
 
1167
                
 
1168
retry:
 
1169
 
 
1170
        // Clear old settings. 
 
1171
        con_data->ms_reset();   
 
1172
        
 
1173
        if (s3Authorization) {
 
1174
                SET_DATE_FROM_TIME(s3AuthorizationTime, date);
 
1175
        } else {
 
1176
                SET_DATE(date);
 
1177
        }
 
1178
        
 
1179
        // Build the URL
 
1180
        s3_buffer->setLength(0);
 
1181
        s3_buffer->append("http://");
 
1182
        s3_buffer->append(bucket);
 
1183
        s3_buffer->append("."); 
 
1184
        s3_buffer->append(s3_server->getCString());
 
1185
        s3_buffer->append(key);
 
1186
 
 
1187
        con_data->ms_setURL(s3_buffer->getCString());
 
1188
        
 
1189
        // Add the 'DATE' header
 
1190
        s3_buffer->setLength(0);
 
1191
        s3_buffer->append("Date: ");    
 
1192
        s3_buffer->append(date);
 
1193
        con_data->ms_setHeader(s3_buffer->getCString());
 
1194
        
 
1195
        // Add the 'Content-Type' header
 
1196
        s3_buffer->setLength(0);
 
1197
        s3_buffer->append("Content-Type: ");    
 
1198
        s3_buffer->append(content_type);
 
1199
        con_data->ms_setHeader(s3_buffer->getCString());
 
1200
                
 
1201
        if (digest) {
 
1202
                // Add the Md5 checksum header
 
1203
                md5 = checksum;
 
1204
                memset(checksum, 0, 32);
 
1205
                base64Encode(digest->val, 16, checksum, 32);
 
1206
                
 
1207
                s3_buffer->setLength(0);
 
1208
                s3_buffer->append("Content-MD5: ");     
 
1209
                s3_buffer->append(checksum);
 
1210
                con_data->ms_setHeader(s3_buffer->getCString());                
 
1211
                con_data->ms_calculate_md5 = false;
 
1212
        } else 
 
1213
                con_data->ms_calculate_md5 = true;
 
1214
        
 
1215
 
 
1216
        // Create the authentication signature and add the 'Authorization' header
 
1217
        if (!s3Authorization)
 
1218
                signed_str = s3_getSignature("PUT", md5, content_type, date, bucket, key);
 
1219
        else
 
1220
                signed_str = CSString::newString(s3Authorization);
 
1221
        push_(signed_str);
 
1222
        s3_buffer->setLength(0);
 
1223
        s3_buffer->append("Authorization: AWS ");       
 
1224
        s3_buffer->append(s3_public_key->getCString());
 
1225
        s3_buffer->append(":"); 
 
1226
        s3_buffer->append(signed_str->getCString());    
 
1227
        release_(signed_str); signed_str = NULL;
 
1228
        con_data->ms_setHeader(s3_buffer->getCString());
 
1229
        
 
1230
        con_data->ms_execute_put_request(RETAIN(input), size);
 
1231
        
 
1232
        if (con_data->ms_retry) {
 
1233
                if (retry_count == s3_maxRetries) {
 
1234
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "S3 operation aborted after max retries.");
 
1235
                }
 
1236
        //printf("RETRY: s3_send()\n");
 
1237
                retry_count++;
 
1238
                input->reset();
 
1239
                self->sleep(s3_sleepTime);
 
1240
                goto retry;
 
1241
        }
 
1242
        
 
1243
        replyHeaders = con_data->ms_reply_headers.takeHeaders();
 
1244
 
 
1245
        release_(con_data);
 
1246
        release_(s3_buffer);
 
1247
        release_(input);
 
1248
        return_(replyHeaders);
 
1249
}
 
1250
 
 
1251
//-------------------------------
 
1252
CSString *CSS3Protocol::s3_getDataURL(const char *bucket, const char *key, uint32_t keep_alive)
 
1253
{
 
1254
        CSStringBuffer *s3_buffer;
 
1255
        char timeout[32];
 
1256
        CSString *signed_str;
 
1257
        enter_();
 
1258
        
 
1259
        new_(s3_buffer, CSStringBuffer());
 
1260
        push_(s3_buffer);
 
1261
 
 
1262
        snprintf(timeout, 32, "%"PRId32"", ((uint32_t)time(NULL)) + keep_alive);
 
1263
        
 
1264
        signed_str = s3_getSignature("GET", NULL, NULL, timeout, bucket, key);
 
1265
//printf("Unsafe: \"%s\"\n", signed_str->getCString());
 
1266
        signed_str = urlEncode(signed_str); // Because the signature is in the URL it must be URL encoded.
 
1267
//printf("  Safe: \"%s\"\n", signed_str->getCString());
 
1268
        push_(signed_str);
 
1269
        
 
1270
        s3_buffer->setLength(0);        
 
1271
        s3_buffer->append("http://");
 
1272
        s3_buffer->append(bucket);
 
1273
        s3_buffer->append("."); 
 
1274
        s3_buffer->append(s3_server->getCString());
 
1275
        s3_buffer->append(key);
 
1276
 
 
1277
        s3_buffer->append("?AWSAccessKeyId=");
 
1278
        s3_buffer->append(s3_public_key->getCString());
 
1279
        s3_buffer->append("&Expires=");
 
1280
        s3_buffer->append(timeout);
 
1281
        s3_buffer->append("&Signature=");
 
1282
        s3_buffer->append(signed_str->getCString());
 
1283
        
 
1284
        release_(signed_str);
 
1285
        
 
1286
        pop_(s3_buffer);
 
1287
        CSString *str = CSString::newString(s3_buffer);
 
1288
        return_(str);   
 
1289
}
 
1290
 
 
1291
//#define S3_UNIT_TEST
 
1292
#ifdef S3_UNIT_TEST
 
1293
static void show_help_info(const char *cmd)
 
1294
{
 
1295
        printf("Get authenticated query string:\n\t%s q <bucket> <object_key> <timeout>\n", cmd);
 
1296
        printf("Delete object:\n\t%s d <bucket> <object_key>\n", cmd);
 
1297
        printf("Delete all object with a given prefix:\n\t%s D <bucket> <object_prefix>\n", cmd);
 
1298
        printf("Get object, data will be written to 'prottest.out':\n\t%s g <bucket> <object_key> <timeout>\n", cmd);
 
1299
        printf("Get object header only:\n\t%s h <bucket> <object_key> <timeout>\n", cmd);
 
1300
        printf("Put (Upload) an object:\n\t%s p <bucket> <object_key> <file>\n", cmd);
 
1301
        printf("List objects in the bucket:\n\t%s l <bucket> [<object_prefix> [max_list_size]]\n", cmd);
 
1302
        printf("Copy object:\n\t%s c <src_bucket> <src_object_key> <dst_bucket> <dst_object_key> \n", cmd);
 
1303
        printf("Copy all object with a given prefix:\n\t%s C <src_bucket> <object_key_prefix> <dst_bucket> \n", cmd);
 
1304
}
 
1305
 
 
1306
void dump_headers(CSVector *header_array)
 
1307
{
 
1308
        CSHTTPHeaders headers;
 
1309
        
 
1310
        headers.setHeaders(header_array);
 
1311
        printf("Reply Headers:\n");
 
1312
        printf("--------------\n");
 
1313
        
 
1314
        for (uint32_t i = 0; i < headers.numHeaders(); i++) {
 
1315
                CSHeader *h = headers.getHeader(i);
 
1316
                
 
1317
                printf("%s : %s\n", h->getNameCString(), h->getValueCString());
 
1318
                h->release();
 
1319
        }
 
1320
        printf("--------------\n");
 
1321
        headers.clearHeaders();
 
1322
}
 
1323
 
 
1324
int main(int argc, char **argv)
 
1325
{
 
1326
        CSThread *main_thread;
 
1327
        const char *pub_key;
 
1328
        const char *priv_key;
 
1329
        const char *server;
 
1330
        CSS3Protocol *prot = NULL;
 
1331
        
 
1332
        if (argc < 3) {
 
1333
                show_help_info(argv[0]);
 
1334
                return 0;
 
1335
        }
 
1336
        
 
1337
        if (! CSThread::startUp()) {
 
1338
                CSException::throwException(CS_CONTEXT, ENOMEM, "CSThread::startUp() failed.");
 
1339
                return 1;
 
1340
        }
 
1341
        
 
1342
        cs_init_memory();
 
1343
        
 
1344
        main_thread = new CSThread( NULL);
 
1345
        CSThread::setSelf(main_thread);
 
1346
        
 
1347
        enter_();
 
1348
        try_(a) {
 
1349
        
 
1350
                pub_key = getenv("S3_ACCESS_KEY_ID");
 
1351
                priv_key = getenv("S3_SECRET_ACCESS_KEY");
 
1352
                new_(prot, CSS3Protocol());
 
1353
                push_(prot);
 
1354
                
 
1355
                server = getenv("S3_SERVER");
 
1356
                if ((server == NULL) || (*server == 0))
 
1357
                        server = "s3.amazonaws.com/";
 
1358
                prot->s3_setServer(server);
 
1359
                prot->s3_setPublicKey(pub_key);
 
1360
                prot->s3_setPrivateKey(priv_key);
 
1361
                
 
1362
                switch (argv[1][0]) {
 
1363
                        case 'q': // Get the query string
 
1364
                                if (argc == 5) {
 
1365
                                        CSString *qstr = prot->s3_getDataURL(argv[2], argv[3], atoi(argv[4]));
 
1366
                                        printf("To test call:\ncurl -L -D - \"%s\"\n", qstr->getCString());
 
1367
                                        qstr->release();
 
1368
                                } else
 
1369
                                        printf("Bad command: q <bucket> <object_key> <timeout>\n");
 
1370
                                
 
1371
                                break;
 
1372
                        case 'd': // Delete the object
 
1373
                                if (argc == 4) {
 
1374
                                        printf("delete %s %s\n", argv[2], argv[3]);
 
1375
                                        if (!prot->s3_delete(argv[2], argv[3]))
 
1376
                                                printf("%s/%s could not be found.\n", argv[2], argv[3]);
 
1377
 
 
1378
                                } else
 
1379
                                        printf("Bad command: d <bucket> <object_key>\n");
 
1380
                                
 
1381
                                break;
 
1382
                        case 'D': // Delete  objects like
 
1383
                                if (argc == 4) {
 
1384
                                        CSVector *list;
 
1385
                                        CSString *key;
 
1386
                                        
 
1387
                                        list = prot->s3_list(argv[2], argv[3]);
 
1388
                                        push_(list);
 
1389
                                        while (key = (CSString*) list->take(0)) {
 
1390
                                                printf("Deleting %s\n", key->getCString());
 
1391
                                                prot->s3_delete(argv[2], key->getCString());
 
1392
                                                key->release();
 
1393
                                        }
 
1394
                                        release_(list);
 
1395
                                        
 
1396
                                } else
 
1397
                                        printf("Bad command: D <bucket> <object_key_prefix>\n");
 
1398
                                
 
1399
                                break;
 
1400
                        case 'g':  // Get the object
 
1401
                                if ((argc == 4) || (argc == 6)) {
 
1402
                                        CSFile *output; 
 
1403
                                        CSVector *headers;
 
1404
                                        bool found;                             
 
1405
                                        S3RangeRec *range_ptr = NULL, range =   {0,0};          
 
1406
                                        
 
1407
                                        if (argc == 6) {
 
1408
                                                range.startByte = atoi(argv[4]);
 
1409
                                                range.endByte = atoi(argv[5]);
 
1410
                                                range_ptr = &range;
 
1411
                                        }
 
1412
                                        
 
1413
                                        output = CSFile::newFile("prottest.out");
 
1414
                                        push_(output);
 
1415
                                        output->open(CSFile::CREATE | CSFile::TRUNCATE);
 
1416
                                        headers = prot->s3_receive(output->getOutputStream(), argv[2], argv[3], &found, range_ptr);
 
1417
                                        if (!found)
 
1418
                                                printf("%s/%s could not be found.\n", argv[2], argv[3]);
 
1419
                                                
 
1420
                                        dump_headers(headers);
 
1421
                                                
 
1422
                                        release_(output);
 
1423
                                } else
 
1424
                                        printf("Bad command: g <bucket> <object_key>\n");
 
1425
                                
 
1426
                                break;
 
1427
                                
 
1428
                        case 'h':  // Get the object header
 
1429
                                if (argc == 4) {
 
1430
                                        CSVector *headers;
 
1431
                                        bool found;     
 
1432
                                        S3RangeRec range =      {0,0};          
 
1433
                                        
 
1434
                                        headers = prot->s3_receive(NULL, argv[2], argv[3], &found);
 
1435
                                        if (!found)
 
1436
                                                printf("%s/%s could not be found.\n", argv[2], argv[3]);
 
1437
                                                
 
1438
                                        dump_headers(headers);
 
1439
                                                
 
1440
                                } else
 
1441
                                        printf("Bad command: h <bucket> <object_key>\n");
 
1442
                                
 
1443
                                break;
 
1444
                                
 
1445
                        case 'p':  // Put (Upload) the object
 
1446
                                if (argc == 5) {
 
1447
                                        CSFile *input;
 
1448
                                        Md5Digest digest;
 
1449
                                        CSVector *headers;
 
1450
                                        
 
1451
                                        input = CSFile::newFile(argv[4]);
 
1452
                                        push_(input);
 
1453
                                        input->open(CSFile::READONLY);
 
1454
                                        input->md5Digest(&digest);
 
1455
                                        headers = prot->s3_send(input->getInputStream(), argv[2], argv[3], input->myFilePath->getSize(), NULL, &digest);
 
1456
                                        dump_headers(headers);
 
1457
                                        release_(input);
 
1458
                                } else
 
1459
                                        printf("Bad command: p <bucket> <object_key> <file> \n");
 
1460
                                
 
1461
                                break;
 
1462
                                
 
1463
                        case 'c':  // Copy the object
 
1464
                                if (argc == 6) {
 
1465
                                        prot->s3_copy(NULL, argv[4], argv[5], argv[2], argv[3]);
 
1466
                                } else
 
1467
                                        printf("Bad command: c <src_bucket> <src_object_key> <dst_bucket> <dst_object_key>\n");
 
1468
                                
 
1469
                                break;
 
1470
                                
 
1471
                        case 'C':  // Copy  objects like
 
1472
                                if (argc == 5) {
 
1473
                                        CSVector *list;
 
1474
                                        CSString *key;
 
1475
                                        
 
1476
                                        list = prot->s3_list(argv[2], argv[3]);
 
1477
                                        push_(list);
 
1478
                                        while (key = (CSString*) list->take(0)) {
 
1479
                                                printf("Copying %s\n", key->getCString());
 
1480
                                                prot->s3_copy(NULL, argv[4], key->getCString(), argv[2], key->getCString());
 
1481
                                                key->release();
 
1482
                                        }
 
1483
                                        release_(list);
 
1484
                                        
 
1485
                                } else
 
1486
                                        printf("Bad command: C <src_bucket> <object_key_prefix> <dst_bucket>\n");
 
1487
                                
 
1488
                                break;
 
1489
                        case 'l':  // List the object
 
1490
                                if ((argc == 3) || (argc == 4) || (argc == 5)) {
 
1491
                                        uint32_t max = 0;
 
1492
                                        char *prefix = NULL;
 
1493
                                        CSVector *list;
 
1494
                                        CSString *key;
 
1495
                                        
 
1496
                                        if (argc > 3) {
 
1497
                                                prefix = argv[3];
 
1498
                                                if (!strlen(prefix))
 
1499
                                                        prefix = NULL;
 
1500
                                        }
 
1501
                                        
 
1502
                                        if (argc == 5) 
 
1503
                                                max = atol(argv[4]);
 
1504
                                                
 
1505
                                        list = prot->s3_list(argv[2], prefix, max);
 
1506
                                        push_(list);
 
1507
                                        while (key = (CSString*) list->take(0)) {
 
1508
                                                printf("%s\n", key->getCString());
 
1509
                                                key->release();
 
1510
                                        }
 
1511
                                        release_(list);
 
1512
                                        
 
1513
                                } else
 
1514
                                        printf("Bad command: l <bucket> [<object_prefix> [max_list_size]] \n");
 
1515
                                
 
1516
                                break;
 
1517
                        default:
 
1518
                                printf("Unknown command.\n");
 
1519
                                show_help_info(argv[0]);
 
1520
                }
 
1521
                
 
1522
                release_(prot);
 
1523
        }
 
1524
        
 
1525
        catch_(a);              
 
1526
        self->logException();
 
1527
        
 
1528
        cont_(a);
 
1529
                
 
1530
        outer_()
 
1531
        main_thread->release();
 
1532
        cs_exit_memory();
 
1533
        CSThread::shutDown();
 
1534
        return 0;
 
1535
}
 
1536
 
 
1537
#endif
 
1538
 
 
1539