4 # Copyright 2011 Stef Walter
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Lesser General Public License as published
8 # by the Free Software Foundation; either version 2 of the licence or (at
9 # your option) any later version.
11 # See the included COPYING file for more information.
29 COLLECTION_PREFIX = "/org/freedesktop/secrets/collection/"
31 bus_name = 'org.freedesktop.Secret.MockService'
35 class NotSupported(dbus.exceptions.DBusException):
36 def __init__(self, msg):
37 dbus.exceptions.DBusException.__init__(self, msg, name="org.freedesktop.DBus.Error.NotSupported")
39 class InvalidArgs(dbus.exceptions.DBusException):
40 def __init__(self, msg):
41 dbus.exceptions.DBusException.__init__(self, msg, name="org.freedesktop.DBus.Error.InvalidArgs")
43 class IsLocked(dbus.exceptions.DBusException):
44 def __init__(self, msg):
45 dbus.exceptions.DBusException.__init__(self, msg, name="org.freedesktop.Secret.Error.IsLocked")
47 class NoSuchObject(dbus.exceptions.DBusException):
48 def __init__(self, msg):
49 dbus.exceptions.DBusException.__init__(self, msg, name="org.freedesktop.Secret.Error.NoSuchObject")
52 unique_identifier = 111
53 def next_identifier(prefix=''):
54 global unique_identifier
55 unique_identifier += 1
56 return "%s%d" % (prefix, unique_identifier)
58 def encode_identifier(value):
59 return "".join([(c.isalpha() or c.isdigit()) and c or "_%02x" % ord(c) \
60 for c in value.encode('utf-8')])
62 def hex_encode(string):
63 return "".join([hex(ord(c))[2:].zfill(2) for c in string])
66 return "/org/freedesktop/secrets/aliases/%s" % name
68 class PlainAlgorithm():
69 def negotiate(self, service, sender, param):
70 if type (param) != dbus.String:
71 raise InvalidArgs("invalid argument passed to OpenSession")
72 session = SecretSession(service, sender, self, None)
73 return (dbus.String("", variant_level=1), session)
75 def encrypt(self, key, data):
78 def decrypt(self, param, data):
80 raise InvalidArgs("invalid secret plain parameter")
85 def negotiate(self, service, sender, param):
86 if type (param) != dbus.ByteArray:
87 raise InvalidArgs("invalid argument passed to OpenSession")
88 privat, publi = dh.generate_pair()
89 peer = dh.bytes_to_number(param)
90 # print "mock publi: ", hex(publi)
91 # print " mock peer: ", hex(peer)
92 ikm = dh.derive_key(privat, peer)
93 # print " mock ikm: ", hex_encode(ikm)
94 key = hkdf.hkdf(ikm, 16)
95 # print " mock key: ", hex_encode(key)
96 session = SecretSession(service, sender, self, key)
97 return (dbus.ByteArray(dh.number_to_bytes(publi), variant_level=1), session)
99 def encrypt(self, key, data):
101 data = aes.append_PKCS7_padding(data)
103 iv = [ord(i) for i in os.urandom(16)]
104 mode = aes.AESModeOfOperation.modeOfOperation["CBC"]
105 moo = aes.AESModeOfOperation()
106 (mode, length, ciph) = moo.encrypt(data, mode, key, keysize, iv)
107 return ("".join([chr(i) for i in iv]),
108 "".join([chr(i) for i in ciph]))
110 def decrypt(self, key, param, data):
113 iv = map(ord, param[:16])
114 data = map(ord, data)
115 moo = aes.AESModeOfOperation()
116 mode = aes.AESModeOfOperation.modeOfOperation["CBC"]
117 decr = moo.decrypt(data, None, mode, key, keysize, iv)
118 return aes.strip_PKCS7_padding(decr)
121 class SecretPrompt(dbus.service.Object):
122 def __init__(self, service, sender, prompt_name=None, delay=0,
123 dismiss=False, action=None):
125 self.service = service
128 self.result = dbus.String("", variant_level=1)
130 self.completed = False
132 self.path = "/org/freedesktop/secrets/prompts/%s" % prompt_name
134 self.path = "/org/freedesktop/secrets/prompts/%s" % next_identifier('p')
135 dbus.service.Object.__init__(self, service.bus_name, self.path)
136 service.add_prompt(self)
137 assert self.path not in objects
138 objects[self.path] = self
143 self.completed = True
144 self.Completed(self.dismiss, self.result)
145 self.remove_from_connection()
147 @dbus.service.method('org.freedesktop.Secret.Prompt')
148 def Prompt(self, window_id):
150 self.result = self.action()
151 gobject.timeout_add(self.delay * 1000, self._complete)
153 @dbus.service.method('org.freedesktop.Secret.Prompt')
157 @dbus.service.signal(dbus_interface='org.freedesktop.Secret.Prompt', signature='bv')
158 def Completed(self, dismiss, result):
162 class SecretSession(dbus.service.Object):
163 def __init__(self, service, sender, algorithm, key):
165 self.service = service
166 self.algorithm = algorithm
168 self.path = "/org/freedesktop/secrets/sessions/%s" % next_identifier('s')
169 dbus.service.Object.__init__(self, service.bus_name, self.path)
170 service.add_session(self)
171 objects[self.path] = self
173 def encode_secret(self, secret, content_type):
174 (params, data) = self.algorithm.encrypt(self.key, secret)
175 # print " mock iv: ", hex_encode(params)
176 # print " mock ciph: ", hex_encode(data)
177 return dbus.Struct((dbus.ObjectPath(self.path), dbus.ByteArray(params),
178 dbus.ByteArray(data), dbus.String(content_type)),
181 def decode_secret(self, value):
182 plain = self.algorithm.decrypt(self.key, value[1], value[2])
183 return (plain, value[3])
185 @dbus.service.method('org.freedesktop.Secret.Session')
187 self.remove_from_connection()
188 self.service.remove_session(self)
191 class SecretItem(dbus.service.Object):
192 SUPPORTS_MULTIPLE_OBJECT_PATHS = True
194 def __init__(self, collection, identifier=None, label="Item", attributes={ },
195 secret="", confirm=False, content_type="text/plain", type=None):
196 if identifier is None:
197 identifier = next_identifier()
198 identifier = encode_identifier(identifier)
199 self.collection = collection
200 self.identifier = identifier
201 self.label = label or "Unnamed item"
203 self.type = type or "org.freedesktop.Secret.Generic"
204 self.attributes = attributes
205 self.content_type = content_type
206 self.path = "%s/%s" % (collection.path, identifier)
207 self.confirm = confirm
208 self.created = self.modified = time.time()
209 dbus.service.Object.__init__(self, collection.service.bus_name, self.path)
210 self.collection.add_item(self)
211 objects[self.path] = self
213 def add_alias(self, name):
214 path = "%s/%s" % (alias_path(name), self.identifier)
216 self.add_to_connection(self.connection, path)
218 def remove_alias(self, name):
219 path = "%s/%s" % (alias_path(name), self.identifier)
221 self.remove_from_connection(self.connection, path)
223 def match_attributes(self, attributes):
224 for (key, value) in attributes.items():
225 if not self.attributes.get(key) == value:
229 def get_locked(self):
230 return self.collection.locked
232 def perform_xlock(self, lock):
233 return self.collection.perform_xlock(lock)
235 def perform_delete(self):
236 self.collection.remove_item(self)
237 del objects[self.path]
238 self.remove_from_connection()
240 @dbus.service.method('org.freedesktop.Secret.Item', sender_keyword='sender')
241 def GetSecret(self, session_path, sender=None):
242 session = objects.get(session_path, None)
243 if not session or session.sender != sender:
244 raise InvalidArgs("session invalid: %s" % session_path)
245 if self.get_locked():
246 raise IsLocked("secret is locked: %s" % self.path)
247 return session.encode_secret(self.secret, self.content_type)
249 @dbus.service.method('org.freedesktop.Secret.Item', sender_keyword='sender', byte_arrays=True)
250 def SetSecret(self, secret, sender=None):
251 session = objects.get(secret[0], None)
252 if not session or session.sender != sender:
253 raise InvalidArgs("session invalid: %s" % secret[0])
254 if self.get_locked():
255 raise IsLocked("secret is locked: %s" % self.path)
256 (self.secret, self.content_type) = session.decode_secret(secret)
258 @dbus.service.method('org.freedesktop.Secret.Item', sender_keyword='sender')
259 def Delete(self, sender=None):
261 def prompt_callback():
262 item.perform_delete()
263 return dbus.String("", variant_level=1)
265 prompt = SecretPrompt(self.collection.service, sender,
266 dismiss=False, action=prompt_callback)
267 return dbus.ObjectPath(prompt.path)
269 self.perform_delete()
270 return dbus.ObjectPath("/")
272 @dbus.service.method(dbus.PROPERTIES_IFACE, in_signature='ss', out_signature='v')
273 def Get(self, interface_name, property_name):
274 return self.GetAll(interface_name)[property_name]
276 @dbus.service.method(dbus.PROPERTIES_IFACE, in_signature='s', out_signature='a{sv}')
277 def GetAll(self, interface_name):
278 if interface_name == 'org.freedesktop.Secret.Item':
280 'Locked': self.get_locked(),
281 'Attributes': dbus.Dictionary(self.attributes, signature='ss', variant_level=1),
283 'Created': dbus.UInt64(self.created),
284 'Modified': dbus.UInt64(self.modified),
286 # For compatibility with libgnome-keyring, not part of spec
290 raise InvalidArgs('Unknown %s interface' % interface_name)
292 @dbus.service.method(dbus.PROPERTIES_IFACE, in_signature='ssv')
293 def Set(self, interface_name, property_name, new_value):
294 if interface_name != 'org.freedesktop.Secret.Item':
295 raise InvalidArgs('Unknown %s interface' % interface_name)
296 if property_name == "Label":
297 self.label = str(new_value)
298 elif property_name == "Attributes":
299 self.attributes = dict(new_value)
300 # For compatibility with libgnome-keyring, not part of spec
301 elif property_name == "Type":
302 self.type = str(new_value)
304 raise InvalidArgs('Not writable %s property' % property_name)
305 self.PropertiesChanged(interface_name, { property_name: new_value }, [])
307 @dbus.service.signal(dbus.PROPERTIES_IFACE, signature='sa{sv}as')
308 def PropertiesChanged(self, interface_name, changed_properties, invalidated_properties):
309 self.modified = time.time()
312 class SecretCollection(dbus.service.Object):
313 SUPPORTS_MULTIPLE_OBJECT_PATHS = True
315 def __init__(self, service, identifier=None, label="Collection", locked=False,
316 confirm=False, master=None):
317 if identifier is None:
319 identifier = encode_identifier(identifier)
320 self.service = service
321 self.identifier = identifier
322 self.label = label or "Unnamed collection"
325 self.confirm = confirm
327 self.created = self.modified = time.time()
329 self.path = "%s%s" % (COLLECTION_PREFIX, identifier)
330 dbus.service.Object.__init__(self, service.bus_name, self.path)
331 self.service.add_collection(self)
332 objects[self.path] = self
334 def add_item(self, item):
335 self.items[item.path] = item
336 for alias in self.aliased:
337 item.add_alias(alias)
339 def remove_item(self, item):
340 for alias in self.aliased:
341 item.remove_alias(alias)
342 del self.items[item.path]
344 def add_alias(self, name):
345 if name in self.aliased:
347 self.aliased.add(name)
348 for item in self.items.values():
350 path = alias_path(name)
352 self.add_to_connection(self.connection, path)
354 def remove_alias(self, name):
355 if name not in self.aliased:
357 path = alias_path(name)
358 self.aliased.remove(name)
360 self.remove_from_connection(self.connection, path)
361 for item in self.items.values():
362 item.remove_alias(name)
364 def search_items(self, attributes):
366 for item in self.items.values():
367 if item.match_attributes(attributes):
371 def get_locked(self):
374 def perform_xlock(self, lock):
376 for item in self.items.values():
377 self.PropertiesChanged('org.freedesktop.Secret.Item', { "Locked" : lock }, [])
378 self.PropertiesChanged('org.freedesktop.Secret.Collection', { "Locked" : lock }, [])
380 def perform_delete(self):
381 for item in self.items.values():
382 item.perform_delete()
383 del objects[self.path]
384 self.service.remove_collection(self)
385 for alias in list(self.aliased):
386 self.remove_alias(alias)
387 self.remove_from_connection()
389 @dbus.service.method('org.freedesktop.Secret.Collection', byte_arrays=True, sender_keyword='sender')
390 def CreateItem(self, properties, value, replace, sender=None):
391 session_path = value[0]
392 session = objects.get(session_path, None)
393 if not session or session.sender != sender:
394 raise InvalidArgs("session invalid: %s" % session_path)
396 attributes = properties.get("org.freedesktop.Secret.Item.Attributes", { })
397 label = properties.get("org.freedesktop.Secret.Item.Label", None)
398 (secret, content_type) = session.decode_secret(value)
401 # This is done for compatibility with libgnome-keyring, not part of spec
402 type = properties.get("org.freedesktop.Secret.Item.Type", None)
405 items = self.search_items(attributes)
409 item = SecretItem(self, next_identifier(), label, attributes, type=type,
410 secret=secret, confirm=False, content_type=content_type)
415 item.attributes = attributes
416 item.content_type = content_type
417 return (dbus.ObjectPath(item.path), dbus.ObjectPath("/"))
419 @dbus.service.method('org.freedesktop.Secret.Collection')
420 def SearchItems(self, attributes):
421 items = self.search_items(attributes)
422 return (dbus.Array([item.path for item in items], "o"))
424 @dbus.service.method('org.freedesktop.Secret.Collection', sender_keyword='sender')
425 def Delete(self, sender=None):
427 def prompt_callback():
428 collection.perform_delete()
429 return dbus.String("", variant_level=1)
431 prompt = SecretPrompt(self.service, sender, dismiss=False,
432 action=prompt_callback)
433 return dbus.ObjectPath(prompt.path)
435 self.perform_delete()
436 return dbus.ObjectPath("/")
438 @dbus.service.method(dbus.PROPERTIES_IFACE, in_signature='ss', out_signature='v')
439 def Get(self, interface_name, property_name):
440 return self.GetAll(interface_name)[property_name]
442 @dbus.service.method(dbus.PROPERTIES_IFACE, in_signature='s', out_signature='a{sv}')
443 def GetAll(self, interface_name):
444 if interface_name == 'org.freedesktop.Secret.Collection':
446 'Locked': self.get_locked(),
448 'Created': dbus.UInt64(self.created),
449 'Modified': dbus.UInt64(self.modified),
450 'Items': dbus.Array([dbus.ObjectPath(i.path) for i in self.items.values()], signature='o', variant_level=1)
453 raise InvalidArgs('Unknown %s interface' % interface_name)
455 @dbus.service.method(dbus.PROPERTIES_IFACE, in_signature='ssv')
456 def Set(self, interface_name, property_name, new_value):
457 if interface_name != 'org.freedesktop.Secret.Collection':
458 raise InvalidArgs('Unknown %s interface' % interface_name)
459 if property_name == "Label":
460 self.label = str(new_value)
462 raise InvalidArgs('Not a writable property %s' % property_name)
463 self.PropertiesChanged(interface_name, { property_name: new_value }, [])
465 @dbus.service.signal(dbus.PROPERTIES_IFACE, signature='sa{sv}as')
466 def PropertiesChanged(self, interface_name, changed_properties, invalidated_properties):
467 self.modified = time.time()
470 class SecretService(dbus.service.Object):
473 'plain': PlainAlgorithm(),
474 "dh-ietf1024-sha256-aes128-cbc-pkcs7": AesAlgorithm(),
477 def __init__(self, name=None):
480 bus = dbus.SessionBus()
481 self.bus_name = dbus.service.BusName(name, allow_replacement=True, replace_existing=True)
482 dbus.service.Object.__init__(self, self.bus_name, '/org/freedesktop/secrets')
485 self.collections = { }
489 def on_name_owner_changed(owned, old_owner, new_owner):
491 for session in list(self.sessions.get(old_owner, [])):
494 bus.add_signal_receiver(on_name_owner_changed,
496 'org.freedesktop.DBus')
498 def add_standard_objects(self):
499 collection = SecretCollection(self, "english", label="Collection One", locked=False)
500 SecretItem(collection, "1", label="Item One", secret="111",
501 attributes={ "number": "1", "string": "one", "even": "false", "xdg:schema": "org.mock.Schema" })
502 SecretItem(collection, "2", label="Item Two", secret="222",
503 attributes={ "number": "2", "string": "two", "even": "true", "xdg:schema": "org.mock.Schema" })
504 SecretItem(collection, "3", label="Item Three", secret="333",
505 attributes={ "number": "3", "string": "three", "even": "false", "xdg:schema": "org.mock.Schema" })
507 self.set_alias('default', collection)
509 collection = SecretCollection(self, "spanish", locked=True)
510 SecretItem(collection, "10", secret="111",
511 attributes={ "number": "1", "string": "uno", "even": "false", "xdg:schema": "org.mock.Schema" })
512 SecretItem(collection, "20", secret="222",
513 attributes={ "number": "2", "string": "dos", "even": "true", "xdg:schema": "org.mock.Schema" })
514 SecretItem(collection, "30", secret="3333",
515 attributes={ "number": "3", "string": "tres", "even": "false", "xdg:schema": "org.mock.Schema" })
517 collection = SecretCollection(self, "german", locked=True)
518 SecretItem(collection, "300", secret="333",
519 attributes={ "number": "3", "string": "drei", "prime": "true", "xdg:schema": "org.mock.Primes" })
520 SecretItem(collection, "400", secret="444",
521 attributes={ "number": "4", "string": "vier", "prime": "false", "xdg:schema": "org.mock.Primes" })
522 SecretItem(collection, "500", secret="555",
523 attributes={ "number": "5", "string": "fuenf", "prime": "true", "xdg:schema": "org.mock.Primes" })
524 SecretItem(collection, "600", secret="666",
525 attributes={ "number": "6", "string": "sechs", "prime": "false", "xdg:schema": "org.mock.Primes" })
527 collection = SecretCollection(self, "empty", locked=False)
528 collection = SecretCollection(self, "session", label="Session Keyring", locked=False)
530 self.set_alias('session', collection)
534 loop = gobject.MainLoop()
536 os.write(ready_pipe, "GO")
541 def add_session(self, session):
542 if session.sender not in self.sessions:
543 self.sessions[session.sender] = []
544 self.sessions[session.sender].append(session)
546 def remove_session(self, session):
547 self.sessions[session.sender].remove(session)
549 def add_collection(self, collection):
550 self.collections[collection.path] = collection
552 def remove_collection(self, collection):
553 for alias in list(collection.aliased):
554 self.remove_alias(alias)
555 del self.collections[collection.path]
557 def set_alias(self, name, collection):
558 self.remove_alias(name)
560 collection.add_alias(name)
561 self.aliases[name] = collection
562 elif name in self.aliases:
563 del self.aliases[name]
565 def remove_alias(self, name):
566 if name in self.aliases:
567 collection = self.aliases[name]
568 collection.remove_alias(name)
569 del self.aliases[name]
571 def add_prompt(self, prompt):
572 if prompt.sender not in self.prompts:
573 self.prompts[prompt.sender] = []
574 self.prompts[prompt.sender].append(prompt)
576 def remove_prompt (self, prompt):
577 self.prompts[prompt.sender].remove(prompt)
579 def find_item(self, object):
580 parts = object.rsplit("/", 1)
581 if len(parts) == 2 and parts[0] in self.collections:
582 return self.collections[parts[0]].get(parts[1], None)
585 @dbus.service.method('org.freedesktop.Secret.Service', sender_keyword='sender')
586 def Lock(self, paths, lock=True, sender=None):
590 if path not in objects:
592 object = objects[path]
593 if object.get_locked() == lock:
595 elif not object.confirm:
596 object.perform_xlock(lock)
599 prompts.append(object)
600 def prompt_callback():
601 for object in prompts:
602 object.perform_xlock(lock)
603 return dbus.Array([o.path for o in prompts], signature='o')
604 locked = dbus.Array(locked, signature='o')
606 prompt = SecretPrompt(self, sender, dismiss=False, action=prompt_callback)
607 return (locked, dbus.ObjectPath(prompt.path))
609 return (locked, dbus.ObjectPath("/"))
611 @dbus.service.method('org.freedesktop.Secret.Service', sender_keyword='sender')
612 def Unlock(self, paths, sender=None):
613 return self.Lock(paths, lock=False, sender=sender)
615 @dbus.service.method('org.freedesktop.Secret.Service', byte_arrays=True, sender_keyword='sender')
616 def OpenSession(self, algorithm, param, sender=None):
617 assert type(algorithm) == dbus.String
619 if algorithm not in self.algorithms:
620 raise NotSupported("algorithm %s is not supported" % algorithm)
622 return self.algorithms[algorithm].negotiate(self, sender, param)
624 @dbus.service.method('org.freedesktop.Secret.Service', sender_keyword='sender')
625 def CreateCollection(self, properties, alias, sender=None):
626 label = properties.get("org.freedesktop.Secret.Collection.Label", None)
628 def prompt_callback():
629 collection = SecretCollection(service, None, label, locked=False, confirm=True)
630 return dbus.ObjectPath(collection.path, variant_level=1)
631 prompt = SecretPrompt(self, sender, dismiss=False, action=prompt_callback)
632 return (dbus.ObjectPath("/"), dbus.ObjectPath(prompt.path))
634 @dbus.service.method('org.freedesktop.Secret.Service')
635 def SearchItems(self, attributes):
639 for collection in self.collections.values():
640 items = collection.search_items(attributes)
641 if collection.get_locked():
642 locked.extend([item.path for item in items])
644 unlocked.extend([item.path for item in items])
645 return (dbus.Array(unlocked, "o"), dbus.Array(locked, "o"))
647 @dbus.service.method('org.freedesktop.Secret.Service', sender_keyword='sender')
648 def GetSecrets(self, item_paths, session_path, sender=None):
649 session = objects.get(session_path, None)
650 if not session or session.sender != sender:
651 raise InvalidArgs("session invalid: %s" % session_path)
652 results = dbus.Dictionary(signature="o(oayays)")
653 for item_path in item_paths:
654 item = objects.get(item_path, None)
655 if item and not item.get_locked():
656 results[item_path] = item.GetSecret(session_path, sender)
659 @dbus.service.method('org.freedesktop.Secret.Service')
660 def ReadAlias(self, name):
661 if name not in self.aliases:
662 return dbus.ObjectPath("/")
663 return dbus.ObjectPath(self.aliases[name].path)
665 @dbus.service.method('org.freedesktop.Secret.Service')
666 def SetAlias(self, name, collection):
667 if collection == dbus.ObjectPath("/"):
668 self.set_alias(name, None)
670 if collection not in self.collections:
671 raise NoSuchObject("no such Collection")
672 self.set_alias(name, self.collections[collection])
674 @dbus.service.method(dbus.PROPERTIES_IFACE, in_signature='ss', out_signature='v')
675 def Get(self, interface_name, property_name):
676 return self.GetAll(interface_name)[property_name]
678 @dbus.service.method(dbus.PROPERTIES_IFACE, in_signature='s', out_signature='a{sv}')
679 def GetAll(self, interface_name):
680 if interface_name == 'org.freedesktop.Secret.Service':
682 'Collections': dbus.Array([dbus.ObjectPath(c.path) for c in self.collections.values()], signature='o', variant_level=1)
685 raise InvalidArgs('Unknown %s interface' % interface_name)
687 @dbus.service.method(dbus.PROPERTIES_IFACE, in_signature='ssv')
688 def Set(self, interface_name, property_name, new_value):
689 if interface_name != 'org.freedesktop.Secret.Collection':
690 raise InvalidArgs('Unknown %s interface' % interface_name)
691 raise InvalidArgs('Not a writable property %s' % property_name)
694 def parse_options(args):
695 global bus_name, ready_pipe
697 opts, args = getopt.getopt(args, "nr", ["name=", "ready="])
698 except getopt.GetoptError, err:
702 if o in ("-n", "--name"):
704 elif o in ("-r", "--ready"):
707 assert False, "unhandled option"
710 parse_options(sys.argv[1:])