~drizzle-trunk/drizzle/development

« back to all changes in this revision

Viewing changes to plugin/pbms/src/ms_api.cc

Added the PBMS daemon plugin.

(Augen zu und durch!)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2008 PrimeBase Technologies GmbH, Germany
 
2
 *
 
3
 * PrimeBase Media Stream for MySQL
 
4
 *
 
5
 * This program is free software; you can redistribute it and/or modify
 
6
 * it under the terms of the GNU General Public License as published by
 
7
 * the Free Software Foundation; either version 2 of the License, or
 
8
 * (at your option) any later version.
 
9
 *
 
10
 * This program is distributed in the hope that it will be useful,
 
11
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
 * GNU General Public License for more details.
 
14
 *
 
15
 * You should have received a copy of the GNU General Public License
 
16
 * along with this program; if not, write to the Free Software
 
17
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 
18
 *
 
19
 * Barry Leslie
 
20
 *
 
21
 * 2007-11-25
 
22
 *
 
23
 * H&G2JCtL
 
24
 *
 
25
 */
 
26
 
 
27
#include "CSConfig.h"
 
28
#include "CSGlobal.h"
 
29
#include "CSLog.h"
 
30
#include "CSStrUtil.h"
 
31
#include "CSHTTPStream.h"
 
32
#include "CSStream.h"
 
33
 
 
34
#include "Repository_ms.h"
 
35
#include "OpenTable_ms.h"
 
36
#include "Util_ms.h"
 
37
#include "ms_mysql.h"
 
38
 
 
39
//-----------------------------------------------------------------------------------------------
 
40
void PBMSGetError(void *v_bs_thread, PBMSResultPtr result)
 
41
{
 
42
        CSThread *ms_thread = (CSThread*)v_bs_thread;
 
43
        
 
44
        ASSERT(ms_thread);
 
45
        memset(result, 0, sizeof(PBMSResultRec));
 
46
        
 
47
        result->mr_code =  ms_thread->myException.getErrorCode();
 
48
        cs_strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message,  ms_thread->myException.getMessage());
 
49
}
 
50
 
 
51
//-----------------------------------------------------------------------------------------------
 
52
void *PBMSInitBlobStreamingThread(char *thread_name, PBMSResultPtr result)
 
53
{
 
54
        CSThread *ms_thread =  new CSThread( NULL);
 
55
        
 
56
        if (!ms_thread) {
 
57
                memset(result, 0, sizeof(PBMSResultRec));
 
58
                result->mr_code = ENOMEM;
 
59
                cs_strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "CSThread::newThread() failed.");
 
60
                return NULL; 
 
61
        }
 
62
        
 
63
        ms_thread->pbms_api_owner = true;
 
64
        if (!CSThread::attach(ms_thread)) {
 
65
                memset(result, 0, sizeof(PBMSResultRec));
 
66
                result->mr_code =  ms_thread->myException.getErrorCode();
 
67
                cs_strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message,  ms_thread->myException.getMessage());
 
68
                ms_thread->release();
 
69
                ms_thread = NULL;
 
70
        } else
 
71
                ms_thread->threadName = CSString::newString(thread_name);
 
72
        
 
73
        return ms_thread;
 
74
}
 
75
 
 
76
 
 
77
//-----------------------------------------------------------------------------------------------
 
78
void PBMSDeinitBlobStreamingThread(void *v_bs_thread)
 
79
{
 
80
        CSThread *ms_thread = (CSThread*)v_bs_thread;
 
81
        
 
82
        ASSERT(ms_thread);
 
83
 
 
84
        CSThread::detach(ms_thread);
 
85
        // ms_thread->release(); Don't do this. Ownership of the thread is passed to the attach call so the thread is released when it is detached.
 
86
}
 
87
 
 
88
//-----------------------------------------------------------------------------------------------
 
89
bool PBMSCreateBlob(PBMSBlobIDPtr blob_id, char *database_name, u_int64_t size)
 
90
{
 
91
        MSOpenTable *otab = NULL;
 
92
        CSString *iTableURI =  NULL;
 
93
        CSString *CSContenttype = NULL;
 
94
        bool done_ok = true;
 
95
 
 
96
        enter_();
 
97
 
 
98
        try_(a) {
 
99
                otab = MSTableList::getOpenTableForDB(MSDatabase::getDatabaseID(database_name, false));
 
100
                
 
101
                otab->createBlob(blob_id, size, NULL, 0);
 
102
        }
 
103
        
 
104
        catch_(a) {
 
105
                done_ok = false;
 
106
        }
 
107
        cont_(a);
 
108
 
 
109
        exit:
 
110
        if (otab)
 
111
                otab->returnToPool();
 
112
        
 
113
        if (CSContenttype)
 
114
                CSContenttype->release();
 
115
                
 
116
        if (iTableURI)
 
117
                iTableURI->release();
 
118
                
 
119
        return_(done_ok);
 
120
}
 
121
 
 
122
//-----------------------------------------------------------------------------------------------
 
123
bool PBMSWriteBlob(PBMSBlobIDPtr blob_id, char *data, size_t size, size_t offset)
 
124
{
 
125
        MSOpenTable *otab;
 
126
        MSRepoFile      *repo_file;
 
127
        bool done_ok = true;
 
128
 
 
129
        enter_();
 
130
 
 
131
        try_(a) {
 
132
                if (!(otab = MSTableList::getOpenTableForDB(blob_id->bi_db_id))) {
 
133
                        char buffer[CS_EXC_MESSAGE_SIZE];
 
134
                        char id_str[12];
 
135
                        
 
136
                        snprintf(id_str, 12, "%"PRIu32"", blob_id->bi_db_id);
 
137
 
 
138
                        cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown database id #  ");
 
139
                        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, id_str);
 
140
                        CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, buffer);
 
141
                }
 
142
                frompool_(otab);
 
143
                repo_file = otab->getDB()->getRepoFileFromPool( blob_id->bi_tab_id, false);
 
144
                frompool_(repo_file);
 
145
                // It is assumed that at this point the blob is a repository blob and so the 
 
146
                // blob_id->bi_blob_id is actually the repository blob offset. 
 
147
                repo_file->writeBlobChunk(blob_id, blob_id->bi_blob_id, offset, size, data);
 
148
                backtopool_(repo_file);
 
149
                backtopool_(otab);
 
150
 
 
151
        }
 
152
        catch_(a) {
 
153
                done_ok = false;
 
154
        }
 
155
        
 
156
        cont_(a);
 
157
                
 
158
        return_(done_ok);
 
159
}
 
160
 
 
161
//-----------------------------------------------------------------------------------------------
 
162
bool PBMSReadBlob(PBMSBlobIDPtr blob_id, char *buffer, size_t *size, size_t offset)
 
163
{
 
164
        MSOpenTable *otab;
 
165
        MSRepoFile      *repo_file;
 
166
        bool done_ok = true, is_repository_blob;
 
167
 
 
168
        enter_();
 
169
 
 
170
        is_repository_blob = (blob_id->bi_blob_type == MS_URL_TYPE_REPO);
 
171
        try_(a) {
 
172
                if (!(otab = MSTableList::getOpenTableByID(blob_id->bi_db_id, blob_id->bi_tab_id))) {
 
173
                        char buffer[CS_EXC_MESSAGE_SIZE];
 
174
                        char id_str[12];
 
175
                        
 
176
        
 
177
                        cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown database: ID # ");
 
178
                        snprintf(id_str, 12, "%"PRIu32"", blob_id->bi_db_id);
 
179
                        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, id_str);
 
180
                        cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, " or table: ID #");
 
181
                        snprintf(id_str, 12, "%"PRIu32"", blob_id->bi_tab_id);
 
182
                        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, id_str);
 
183
                        CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, buffer);
 
184
                }
 
185
                uint32_t repo_id;
 
186
                uint64_t rep_offset;
 
187
 
 
188
                
 
189
                frompool_(otab);
 
190
                if (is_repository_blob) {
 
191
                        repo_id = blob_id->bi_tab_id;
 
192
                        rep_offset = blob_id->bi_blob_id;
 
193
                } else {
 
194
                        uint64_t blob_size;
 
195
                        uint16_t header_size; 
 
196
                        otab->getDBTable()->readBlobHandle(otab, blob_id->bi_blob_id, &(blob_id->bi_auth_code), &repo_id, &rep_offset, &blob_size, &header_size, true);
 
197
                }
 
198
                
 
199
                repo_file = otab->getDB()->getRepoFileFromPool( repo_id, false);
 
200
                frompool_(repo_file);
 
201
                *size = repo_file->readBlobChunk(blob_id, rep_offset, offset, *size, buffer);
 
202
                backtopool_(repo_file);
 
203
                backtopool_(otab);
 
204
 
 
205
        }
 
206
        catch_(a) {
 
207
                done_ok = false;
 
208
        }
 
209
        
 
210
        cont_(a);
 
211
                
 
212
        return_(done_ok);
 
213
}
 
214
 
 
215
//-----------------------------------------------------------------------------------------------
 
216
bool PBMSIDToURL(PBMSBlobIDPtr blob_id, char *url)
 
217
{       
 
218
        MSBlobURL ms_blob;
 
219
 
 
220
        ms_blob.bu_db_id = blob_id->bi_db_id;
 
221
        ms_blob.bu_blob_id = blob_id->bi_blob_id;
 
222
        ms_blob.bu_blob_ref_id = blob_id->bi_blob_ref_id;
 
223
        ms_blob.bu_tab_id = blob_id->bi_tab_id;
 
224
        ms_blob.bu_auth_code = blob_id->bi_auth_code;
 
225
        ms_blob.bu_type = blob_id->bi_blob_type;
 
226
        ms_blob.bu_blob_size = blob_id->bi_blob_size; 
 
227
        ms_blob.bu_server_id = ms_my_get_server_id();
 
228
        
 
229
        ms_build_blob_url(&ms_blob, url);
 
230
        return true;
 
231
}
 
232
 
 
233
//-----------------------------------------------------------------------------------------------
 
234
bool PBMSURLToID(char *url, PBMSBlobIDPtr blob_id)
 
235
{       
 
236
        MSBlobURL ms_blob;
 
237
        bool done_ok = true;
 
238
        enter_();
 
239
 
 
240
        try_(a) {
 
241
        
 
242
                if (!ms_parse_blob_url(&ms_blob, url)){
 
243
                        char buffer[CS_EXC_MESSAGE_SIZE];
 
244
 
 
245
                        cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Incorrect URL: ");
 
246
                        cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, url);
 
247
                        CSException::throwException(CS_CONTEXT, MS_ERR_INCORRECT_URL, buffer);
 
248
                }
 
249
                
 
250
                blob_id->bi_db_id = ms_blob.bu_db_id;
 
251
                blob_id->bi_blob_id = ms_blob.bu_blob_id;
 
252
                blob_id->bi_blob_ref_id = ms_blob.bu_blob_ref_id;
 
253
                blob_id->bi_tab_id = ms_blob.bu_tab_id;
 
254
                blob_id->bi_auth_code = ms_blob.bu_auth_code;
 
255
                blob_id->bi_blob_type = ms_blob.bu_type;
 
256
                blob_id->bi_blob_size = ms_blob.bu_blob_size; 
 
257
                
 
258
        }
 
259
        catch_(a) {
 
260
                done_ok = false;
 
261
        }
 
262
        
 
263
        cont_(a);
 
264
        
 
265
        return_(done_ok);
 
266
}
 
267
 
 
268
 
 
269