~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/open_table_ms.cc

  • Committer: Lee Bieber
  • Date: 2010-10-22 16:47:38 UTC
  • mfrom: (1841.1.7 drizzle_pbms)
  • Revision ID: kalebral@gmail.com-20101022164738-vv8w22b8towpb307
Merge Barry - fix bug 657830: PBMS build failure in GCC 4.5

Show diffs side-by-side

added added

removed removed

Lines of Context:
26
26
 * Media Stream Tables.
27
27
 *
28
28
 */
29
 
 
30
 
#include "config.h"
 
29
#include "cslib/CSConfig.h"
 
30
 
 
31
#include "defs_ms.h"
 
32
 
 
33
#include "cslib/CSGlobal.h"
 
34
#include "cslib/CSLog.h"
 
35
#include "cslib/CSStrUtil.h"
 
36
#include "cslib/CSPath.h"
31
37
 
32
38
#include "open_table_ms.h"
33
39
#include "table_ms.h"
36
42
#include "transaction_ms.h"
37
43
#include "parameters_ms.h"
38
44
 
39
 
#include "cslib/CSConfig.h"
40
 
 
41
 
#include "cslib/CSGlobal.h"
42
 
#include "cslib/CSLog.h"
43
 
#include "cslib/CSStrUtil.h"
44
 
#include "cslib/CSPath.h"
45
 
 
46
 
 
47
45
/*
48
46
 * ---------------------------------------------------------------
49
47
 * OPEN TABLES
101
99
        MSTableList::releaseTable(this);
102
100
}
103
101
 
 
102
// This cleanup class is used to reset the 
 
103
// repository size if something goes wrong.
 
104
class CreateBlobCleanUp : public CSRefObject {
 
105
        bool do_cleanup;
 
106
        uint64_t old_size;
 
107
        MSOpenTable *ot;
 
108
        MSRepository *repo;
 
109
 
 
110
        public:
 
111
        
 
112
        CreateBlobCleanUp(): CSRefObject(),
 
113
                do_cleanup(false){}
 
114
                
 
115
        ~CreateBlobCleanUp() 
 
116
        {
 
117
                if (do_cleanup) {
 
118
                        repo->setRepoFileSize(ot, old_size);
 
119
 
 
120
                }
 
121
        }
 
122
        
 
123
        void setCleanUp(MSOpenTable *ot_arg, MSRepository *repo_arg, uint64_t size)
 
124
        {
 
125
                old_size = size;
 
126
                repo = repo_arg;
 
127
                ot = ot_arg;
 
128
                do_cleanup = true;
 
129
        }
 
130
        
 
131
        void cancelCleanUp()
 
132
        {
 
133
                do_cleanup = false;
 
134
        }
 
135
        
 
136
};
 
137
 
104
138
void MSOpenTable::createBlob(PBMSBlobURLPtr bh, uint64_t blob_size, char *metadata, uint16_t metadata_size, CSInputStream *stream, CloudKeyPtr cloud_key, Md5Digest *checksum)
105
139
{
106
140
        uint64_t repo_offset;
114
148
        uint64_t repo_id;
115
149
        Md5Digest my_checksum;
116
150
        CloudKeyRec cloud_key_rec;
 
151
        CreateBlobCleanUp *cleanup;
117
152
        enter_();
118
153
        
119
 
        CLOBBER_PROTECT(cloud_key);
120
 
        CLOBBER_PROTECT(checksum);
121
 
 
 
154
        new_(cleanup, CreateBlobCleanUp());
 
155
        push_(cleanup);
 
156
        
122
157
        if (!checksum)
123
158
                checksum = &my_checksum;
124
159
                
129
164
        repo_size = myWriteRepo->getRepoFileSize();
130
165
        temp_time =     myWriteRepo->myLastTempTime;
131
166
 
132
 
        try_(a) {
133
 
                head_size = myWriteRepo->getDefaultHeaderSize(metadata_size);
134
 
                if (getDB()->myBlobType == MS_STANDARD_STORAGE) {
 
167
        // If an exception occurs the cleanup operation will be called.
 
168
        cleanup->setCleanUp(this, myWriteRepo, repo_size);
 
169
 
 
170
        head_size = myWriteRepo->getDefaultHeaderSize(metadata_size);
 
171
        if (getDB()->myBlobType == MS_STANDARD_STORAGE) {
 
172
                pop_(stream);
 
173
                repo_offset = myWriteRepo->receiveBlob(this, head_size, blob_size, checksum, stream);
 
174
        } else {
 
175
                ASSERT(getDB()->myBlobType == MS_CLOUD_STORAGE);
 
176
                CloudDB *cloud = getDB()->myBlobCloud;
 
177
                
 
178
                if (!cloud)
 
179
                        CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Creating cloud BLOB without cloud.");
 
180
        
 
181
                repo_offset = repo_size + head_size;
 
182
                memset(checksum, 0, sizeof(Md5Digest)); // The checksum is only for local storage.
 
183
                
 
184
                // If there is a stream then the data has not been sent to the cloud yet.
 
185
                if (stream) { 
 
186
                        cloud_key = &cloud_key_rec;
 
187
                        cloud->cl_getNewKey(cloud_key);
135
188
                        pop_(stream);
136
 
                        repo_offset = myWriteRepo->receiveBlob(this, head_size, blob_size, checksum, stream);
137
 
                } else {
138
 
                        ASSERT(getDB()->myBlobType == MS_CLOUD_STORAGE);
139
 
                        CloudDB *cloud = getDB()->myBlobCloud;
140
 
                        
141
 
                        if (!cloud)
142
 
                                CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Creating cloud BLOB without cloud.");
143
 
                
144
 
                        repo_offset = repo_size + head_size;
145
 
                        memset(checksum, 0, sizeof(Md5Digest)); // The checksum is only for local storage.
146
 
                        
147
 
                        // If there is a stream then the data has not been sent to the cloud yet.
148
 
                        if (stream) { 
149
 
                                cloud_key = &cloud_key_rec;
150
 
                                cloud->cl_getNewKey(cloud_key);
151
 
                                pop_(stream);
152
 
                                cloud->cl_putData(cloud_key, stream, blob_size);
153
 
                        }
154
 
                        
155
 
                }
156
 
                
157
 
                repo_id = myWriteRepo->myRepoID;
158
 
                if (isNotATable) {      
159
 
                        getDB()->queueForDeletion(this, MS_TL_REPO_REF, repo_id, repo_offset, auth_code, &log_id, &log_offset, &temp_time);
160
 
                        formatRepoURL(bh, repo_id, repo_offset, auth_code, blob_size);
161
 
                }
162
 
                else {
163
 
                        blob_id = getDBTable()->createBlobHandle(this, myWriteRepo->myRepoID, repo_offset, blob_size, head_size, auth_code);
164
 
                        getDB()->queueForDeletion(this, MS_TL_BLOB_REF, getDBTable()->myTableID, blob_id, auth_code, &log_id, &log_offset, &temp_time);
165
 
                        formatBlobURL(bh, blob_id, auth_code, blob_size, 0);
166
 
                }
167
 
                
168
 
                myWriteRepo->writeBlobHead(this, repo_offset, myWriteRepo->myRepoDefRefSize, head_size, blob_size, checksum, metadata, metadata_size, blob_id, auth_code, log_id, log_offset, getDB()->myBlobType, cloud_key);
169
 
        }
170
 
        
171
 
        catch_(a);
172
 
        // In the event of an error reset the repository size to its original size.
173
 
        myWriteRepo->setRepoFileSize(this, repo_size);
174
 
        throw_();
175
 
        
176
 
        cont_(a);
 
189
                        cloud->cl_putData(cloud_key, stream, blob_size);
 
190
                }
 
191
                
 
192
        }
 
193
        
 
194
        repo_id = myWriteRepo->myRepoID;
 
195
        if (isNotATable) {      
 
196
                getDB()->queueForDeletion(this, MS_TL_REPO_REF, repo_id, repo_offset, auth_code, &log_id, &log_offset, &temp_time);
 
197
                formatRepoURL(bh, repo_id, repo_offset, auth_code, blob_size);
 
198
        }
 
199
        else {
 
200
                blob_id = getDBTable()->createBlobHandle(this, myWriteRepo->myRepoID, repo_offset, blob_size, head_size, auth_code);
 
201
                getDB()->queueForDeletion(this, MS_TL_BLOB_REF, getDBTable()->myTableID, blob_id, auth_code, &log_id, &log_offset, &temp_time);
 
202
                formatBlobURL(bh, blob_id, auth_code, blob_size, 0);
 
203
        }
 
204
        
 
205
        myWriteRepo->writeBlobHead(this, repo_offset, myWriteRepo->myRepoDefRefSize, head_size, blob_size, checksum, metadata, metadata_size, blob_id, auth_code, log_id, log_offset, getDB()->myBlobType, cloud_key);
 
206
        
 
207
        cleanup->cancelCleanUp();
 
208
        release_(cleanup);
 
209
        
177
210
        exit_();
178
211
}
179
212
 
188
221
        uint32_t        log_id;
189
222
        uint32_t log_offset;
190
223
        uint32_t temp_time;
 
224
        CreateBlobCleanUp *cleanup;
191
225
        enter_();
 
226
        
 
227
        new_(cleanup, CreateBlobCleanUp());
 
228
        push_(cleanup);
192
229
 
193
230
        openForWriting();
194
231
        ASSERT(myWriteRepo);
195
232
        auth_code = random();
196
233
        
197
234
        repo_size = myWriteRepo->getRepoFileSize();
198
 
        try_(a) {
199
 
                head_size = myWriteRepo->getDefaultHeaderSize(metadata_size);
200
 
 
201
 
                repo_offset = myWriteRepo->receiveBlob(this, head_size, blob_size);
202
 
                repo_id = myWriteRepo->myRepoID;
203
 
                temp_time = myWriteRepo->myLastTempTime;
204
 
                getDB()->queueForDeletion(this, MS_TL_REPO_REF, repo_id, repo_offset, auth_code, &log_id, &log_offset, &temp_time);
205
 
                myWriteRepo->myLastTempTime = temp_time;
206
 
                myWriteRepo->writeBlobHead(this, repo_offset, myWriteRepo->myRepoDefRefSize, head_size, blob_size, NULL, metadata, metadata_size, 0, auth_code, log_id, log_offset, MS_STANDARD_STORAGE, NULL);
207
 
                // myWriteRepo->setRepoFileSize(this, repo_offset + head_size + blob_size);This is now set by writeBlobHead()
208
 
                
209
 
                blob_id->bi_db_id = getDB()->myDatabaseID;
210
 
                blob_id->bi_blob_id = repo_offset;
211
 
                blob_id->bi_tab_id = repo_id;
212
 
                blob_id->bi_auth_code = auth_code;
213
 
                blob_id->bi_blob_size = blob_size;
214
 
                blob_id->bi_blob_type = MS_URL_TYPE_REPO;
215
 
                blob_id->bi_blob_ref_id = 0;
216
 
        
217
 
        }
218
 
        
219
 
        catch_(a);
220
 
        // BLOBs created with this method are always created as standard local BLOBs. (No cloud storage)
221
 
        myWriteRepo->setRepoFileSize(this, repo_size);
222
 
        throw_();
223
 
        
224
 
        cont_(a);
 
235
        
 
236
        // If an exception occurs the cleanup operation will be called.
 
237
        cleanup->setCleanUp(this, myWriteRepo, repo_size);
 
238
 
 
239
        head_size = myWriteRepo->getDefaultHeaderSize(metadata_size);
 
240
 
 
241
        repo_offset = myWriteRepo->receiveBlob(this, head_size, blob_size);
 
242
        repo_id = myWriteRepo->myRepoID;
 
243
        temp_time = myWriteRepo->myLastTempTime;
 
244
        getDB()->queueForDeletion(this, MS_TL_REPO_REF, repo_id, repo_offset, auth_code, &log_id, &log_offset, &temp_time);
 
245
        myWriteRepo->myLastTempTime = temp_time;
 
246
        myWriteRepo->writeBlobHead(this, repo_offset, myWriteRepo->myRepoDefRefSize, head_size, blob_size, NULL, metadata, metadata_size, 0, auth_code, log_id, log_offset, MS_STANDARD_STORAGE, NULL);
 
247
        // myWriteRepo->setRepoFileSize(this, repo_offset + head_size + blob_size);This is now set by writeBlobHead()
 
248
        
 
249
        blob_id->bi_db_id = getDB()->myDatabaseID;
 
250
        blob_id->bi_blob_id = repo_offset;
 
251
        blob_id->bi_tab_id = repo_id;
 
252
        blob_id->bi_auth_code = auth_code;
 
253
        blob_id->bi_blob_size = blob_size;
 
254
        blob_id->bi_blob_type = MS_URL_TYPE_REPO;
 
255
        blob_id->bi_blob_ref_id = 0;
 
256
        
 
257
        cleanup->cancelCleanUp();
 
258
        release_(cleanup);
 
259
 
225
260
        exit_();
226
261
}
227
262