1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
# Copyright 2009 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
# pylint: disable-msg=E0611,W0212
__metaclass__ = type
__all__ = ['GPGKey', 'GPGKeySet']
from sqlobject import (
BoolCol,
ForeignKey,
IntCol,
SQLObjectNotFound,
StringCol,
)
from zope.component import getUtility
from zope.interface import implements
from lp.registry.interfaces.gpg import (
GPGKeyAlgorithm,
IGPGKey,
IGPGKeySet,
)
from lp.services.database.enumcol import EnumCol
from lp.services.database.sqlbase import (
SQLBase,
sqlvalues,
)
from lp.services.gpg.interfaces import IGPGHandler
class GPGKey(SQLBase):
implements(IGPGKey)
_table = 'GPGKey'
_defaultOrder = ['owner', 'keyid']
owner = ForeignKey(dbName='owner', foreignKey='Person', notNull=True)
keyid = StringCol(dbName='keyid', notNull=True)
fingerprint = StringCol(dbName='fingerprint', notNull=True)
keysize = IntCol(dbName='keysize', notNull=True)
algorithm = EnumCol(dbName='algorithm', notNull=True,
enum=GPGKeyAlgorithm)
active = BoolCol(dbName='active', notNull=True)
can_encrypt = BoolCol(dbName='can_encrypt', notNull=False)
@property
def keyserverURL(self):
return getUtility(
IGPGHandler).getURLForKeyInServer(self.fingerprint, public=True)
@property
def displayname(self):
return '%s%s/%s' % (self.keysize, self.algorithm.title, self.keyid)
class GPGKeySet:
implements(IGPGKeySet)
def new(self, ownerID, keyid, fingerprint, keysize,
algorithm, active=True, can_encrypt=False):
"""See `IGPGKeySet`"""
return GPGKey(owner=ownerID, keyid=keyid,
fingerprint=fingerprint, keysize=keysize,
algorithm=algorithm, active=active,
can_encrypt=can_encrypt)
def activate(self, requester, key, can_encrypt):
"""See `IGPGKeySet`."""
fingerprint = key.fingerprint
lp_key = self.getByFingerprint(fingerprint)
if lp_key:
# Then the key already exists, so let's reactivate it.
lp_key.active = True
lp_key.can_encrypt = can_encrypt
return lp_key, False
ownerID = requester.id
keyid = key.keyid
keysize = key.keysize
algorithm = GPGKeyAlgorithm.items[key.algorithm]
lp_key = self.new(
ownerID, keyid, fingerprint, keysize, algorithm,
can_encrypt=can_encrypt)
return lp_key, True
def get(self, key_id, default=None):
"""See `IGPGKeySet`"""
try:
return GPGKey.get(key_id)
except SQLObjectNotFound:
return default
def getByFingerprint(self, fingerprint, default=None):
"""See `IGPGKeySet`"""
result = GPGKey.selectOneBy(fingerprint=fingerprint)
if result is None:
return default
return result
def getGPGKeysForPeople(self, people):
"""See `IGPGKeySet`"""
return GPGKey.select("""
GPGKey.owner IN %s AND
GPGKey.active = True
""" % sqlvalues([person.id for person in people]))
def getGPGKeys(self, ownerid=None, active=True):
"""See `IGPGKeySet`"""
if active is False:
query = """
active = false
AND fingerprint NOT IN
(SELECT fingerprint FROM LoginToken
WHERE fingerprint IS NOT NULL
AND requester = %s
AND date_consumed is NULL
)
""" % sqlvalues(ownerid)
else:
query = 'active=true'
if ownerid:
query += ' AND owner=%s' % sqlvalues(ownerid)
return GPGKey.select(query, orderBy='id')
|