Accessibility.Accessible base class to make it more Pythonic.
Based on public domain code originally posted at
-http://wwwx.cs.unc.edu/~parente/cgi-bin/RuntimeClassMixins.
-
-@todo: PP: implement caching for basic attributes
+U{http://wwwx.cs.unc.edu/~parente/cgi-bin/RuntimeClassMixins}.
+@var _ACCESSIBLE_CACHE: Pairs hash values for accessible objects to
+ L{_PropertyCache} bags. We do not store actual accessibles in the dictionary
+ because that would +1 their ref counts and cause __del__ to never be called
+ which is the method we rely on to properly invalidate cache entries.
+@type _ACCESSIBLE_CACHE: dictionary
@var _CACHE_LEVEL: Current level of caching enabled. Checked dynamically by
L{_AccessibleMixin}
@type _CACHE_LEVEL: integer
import Accessibility
import constants
import utils
+import registry
+_ACCESSIBLE_CACHE = {}
_CACHE_LEVEL = None
+class _PropertyCache(object):
+ '''Fixed-size bag class for holding cached values.'''
+ __slots__ = ('name', 'description', 'rolename')
+
def getCacheLevel():
'''
Gets the current level of caching.
def setCacheLevel(val):
'''
Sets the desired level of caching for all accessible objects created after
- this function is invoked.
+ this function is invoked. Immediately clears the current accessible cache.
@param val: None indicating no caching is in effect.
L{constants.CACHE_INTERFACES} indicating all interface query results are
cached. L{constants.CACHE_PROPERTIES} indicating all basic accessible
- properties are cached.
+ properties are cached plus all interfaces.
@type val: integer
'''
global _CACHE_LEVEL
+ if _CACHE_LEVEL != val:
+ # empty our accessible cache
+ _ACCESSIBLE_CACHE.clear()
+ # need to register/unregister for listeners depending on caching level
+ if val == constants.CACHE_PROPERTIES:
+ r = registry.Registry()
+ r.registerEventListener(_updateCache, *constants.CACHE_EVENTS)
+ else:
+ r = registry.Registry()
+ r.deregisterEventListener(_updateCache, *constants.CACHE_EVENTS)
_CACHE_LEVEL = val
+
+def clearCache():
+ '''Forces a clear of the entire cache.'''
+ _ACCESSIBLE_CACHE.clear()
+
+def printCache(template='%s'):
+ '''
+ Prints the contents of the cache.
+
+ @param template: Format string to use when printing
+ @type template: string
+ '''
+ print template % _ACCESSIBLE_CACHE
+
+def _updateCache(event):
+ '''
+ Invalidates an entry in the cache when the hash value of a source of an event
+ matches an entry in the cache.
+
+ @param event: One of the L{constants.CACHE_EVENTS} event types
+ @type event: L{event.Event}
+ '''
+ try:
+ del _ACCESSIBLE_CACHE[hash(event.source)]
+ except KeyError:
+ return
def _makeQuery(iid):
'''
Builds a function querying to a specific interface and returns it.
- @ivar iid: Interface identifier to use when querying
+ @param iid: Interface identifier to use when querying
@type iid: string
@return: Function querying to the given interface
@rtype: function
@raise NotImplementedError: When the desired interface is not supported
'''
try:
- return self._cache[iid]
+ i = self._icache[iid]
except KeyError:
# interface not cached
caching = True
- except AttributeError:
- # not caching at present
- caching = False
-
+ except TypeError:
+ # determine if we're caching
+ caching = _CACHE_LEVEL is not None
+ if caching:
+ # initialize the cache
+ self._icache = {}
+ else:
+ # check if our cached result was an interface, or an indicator that the
+ # interface is not supported
+ if i is None:
+ raise NotImplementedError
+ else:
+ return i
+
try:
+ # do the query remotely
i = self.queryInterface(iid)
except Exception, e:
raise LookupError(e)
if i is None:
+ # cache that the interface is not supported
+ if caching:
+ self._icache[iid] = None
raise NotImplementedError
# not needed according to ORBit2 spec, but makes Java queries work
i._narrow(i.__class__)
if caching:
# cache the narrow'ed result, but only if we're caching for this object
- self._cache[iid] = i
+ self._icache[iid] = i
return i
return _inner
elif isinstance(obj, property):
# wrap the getters and setters
if obj.fget:
- getter = _makeExceptionHandler(getattr(cls, obj.fget.__name__))
+ func = getattr(cls, obj.fget.__name__)
+ getter = _makeExceptionHandler(func)
else:
getter = None
if obj.fset:
- setter = _makeExceptionHandler(getattr(cls, obj.fset.__name__))
+ func = getattr(cls, obj.fset.__name__)
+ setter = _makeExceptionHandler(func)
else:
setter = None
setattr(cls, name, property(getter, setter))
def _mixClass(cls, new_cls):
- '''
+ '''
Adds the methods in new_cls to cls. After mixing, all instances of cls will
have the new methods. If there is a method name clash, the method already in
cls will be prefixed with '_mix_' before the new method of the same name is
'''
# loop over all names in the new class
for name, func in new_cls.__dict__.items():
- # get only functions from the new_class
- if (isinstance(func, types.FunctionType)):
+ if name in ['_get_name', '_get_description']:
+ continue
+ if isinstance(func, types.FunctionType):
# build a new function that is a clone of the one from new_cls
method = new.function(func.func_code, func.func_globals, name,
func.func_defaults, func.func_closure)
# rename the old method so we can still call it if need be
setattr(cls, '_mix_'+name, old_method)
setattr(cls, name, func)
+ elif isinstance(func, property):
+ try:
+ # check if a method of the same name already exists in the target
+ old_prop = getattr(cls, name)
+ except AttributeError:
+ pass
+ else:
+ # IMPORTANT: We save the old property before overwriting it, even
+ # though we never end up calling the old prop from our mixin class.
+ # If we don't save the old one, we seem to introduce a Python ref count
+ # problem where the property get/set methods disappear before we can
+ # use them at a later time. This is a minor waste of memory because
+ # a property is a class object and we only overwrite a few of them.
+ setattr(cls, '_mix_'+name, old_prop)
+ setattr(cls, name, func)
class _AccessibleMixin(object):
'''
@type SLOTS: tuple
'''
SLOTTED_CLASSES = {}
- SLOTS = ('_cache', '_cache_level')
+ SLOTS = ('_icache',)
def __new__(cls):
'''
'__slots__' : _AccessibleMixin.SLOTS})
_AccessibleMixin.SLOTTED_CLASSES[cls] = new_cls
obj = cls._mix___new__(new_cls)
- obj._cache_level = _CACHE_LEVEL
- if obj._cache_level is not None:
- # be sure to create the cache dictionary, if we're caching
- obj._cache = {}
+ # don't create the interface cache until we need it
+ obj._icache = None
return obj
def __del__(self):
- '''
+ '''
Decrements the reference count on the accessible object when there are no
- Python references to this object. This provides automatic reference
- counting for AT-SPI objects.
+ Python references to this object. This provides automatic reference
+ counting for AT-SPI objects. Also removes this object from the cache if
+ we're caching properties.
'''
try:
+ del _ACCESSIBLE_CACHE[hash(self)]
+ except KeyError:
+ pass
+ try:
self.unref()
except Exception:
pass
@return: Name of the accessible
@rtype: string
'''
- if self._cache_level != constants.CACHE_PROPERTIES:
- return self._mix__get_name()
-
+ if _CACHE_LEVEL != constants.CACHE_PROPERTIES:
+ return self._get_name()
+
+ cache = _ACCESSIBLE_CACHE
+ h = hash(self)
try:
- return self._cache['name']
+ return cache[h].name
except KeyError:
- name = self._mix__get_name()
- self._cache['name'] = name
+ # no cached info for this object yet
+ name = self._get_name()
+ pc = _PropertyCache()
+ pc.name = name
+ cache[h] = pc
+ return name
+ except AttributeError:
+ # no cached name for this object yet
+ name = self._get_name()
+ cache[h].name = name
return name
+
+ name = property(_get_name, Accessibility.Accessible._set_name)
def getRoleName(self):
'''
@return: Role name of the accessible
@rtype: string
'''
- if self._cache_level != constants.CACHE_PROPERTIES:
+ if _CACHE_LEVEL != constants.CACHE_PROPERTIES:
return self._mix_getRoleName()
+ cache = _ACCESSIBLE_CACHE
+ h = hash(self)
try:
- return self._cache['rolename']
- except KeyError:
- name = self._mix_getRoleName()
- self._cache['rolename'] = name
- return name
+ return cache[h].rolename
+ except KeyError, e:
+ # no cached info for this object yet
+ rolename = self._mix_getRoleName()
+ pc = _PropertyCache()
+ pc.rolename = rolename
+ cache[h] = pc
+ return rolename
+ except AttributeError, e:
+ # no cached name for this object yet
+ rolename = self._mix_getRoleName()
+ cache[h].rolename = rolename
+ return rolename
def _get_description(self):
'''
@return: Description of the accessible
@rtype: string
'''
- if self._cache_level != constants.CACHE_PROPERTIES:
- return self._mix__get_description()
+ if _CACHE_LEVEL != constants.CACHE_PROPERTIES:
+ return self._get_description()
+ cache = _ACCESSIBLE_CACHE
+ h = hash(self)
try:
- return self._cache['description']
+ return cache[h].description
except KeyError:
- name = self._mix__get_description()
- self._cache['description'] = name
- return name
+ # no cached info for this object yet
+ description = self._get_description()
+ pc = _PropertyCache()
+ pc.description = description
+ cache[h] = pc
+ return description
+ except AttributeError:
+ # no cached name for this object yet
+ description = self._get_description()
+ cache[h].description = description
+ return description
+
+ description = property(_get_description,
+ Accessibility.Accessible._set_description)
def getIndexInParent(self):
'''
# return None if the application isn't reachable for any reason
return None
-# mix the new functions
-_mixClass(Accessibility.Accessible, _AccessibleMixin)
-# mix queryInterface convenience methods
-_mixInterfaces(Accessibility.Accessible, constants.ALL_INTERFACES)
-# mix the exception handlers into all queryable interfaces
+# 1. mix the exception handlers into all queryable interfaces
map(_mixExceptions, constants.ALL_INTERFACES)
-# mix the exception handlers into other Accessibility objects
+# 2. mix the exception handlers into other Accessibility objects
map(_mixExceptions, [Accessibility.StateSet])
+# 3. mix the new functions
+_mixClass(Accessibility.Accessible, _AccessibleMixin)
+# 4. mix queryInterface convenience methods
+_mixInterfaces(Accessibility.Accessible, constants.ALL_INTERFACES)
class _Observer(object):
'''
Parent class for all event observers. Dispatches all received events to the
- L{Registry} that created this L{Observer}. Provides basic reference counting
- functionality needed by L{Registry} to determine when an L{Observer} can be
+ L{Registry} that created this L{_Observer}. Provides basic reference counting
+ functionality needed by L{Registry} to determine when an L{_Observer} can be
released for garbage collection.
The reference counting provided by this class is independent of the reference
counting used by CORBA. Keeping the counts separate makes it easier for the
- L{Registry} to detect when an L{Observer} can be freed in the
+ L{Registry} to detect when an L{_Observer} can be freed in the
L{Registry._unregisterObserver} method.
- @ivar registry: Reference to the L{Registry} that created this L{Observer}
+ @ivar registry: Reference to the L{Registry} that created this L{_Observer}
@type registry: weakref.proxy to L{Registry}
- @ivar ref_count: Reference count on this L{Observer}
+ @ivar ref_count: Reference count on this L{_Observer}
@type ref_count: integer
'''
def __init__(self, registry):
def clientUnref(self):
'''
- Decrements the L{pyLinAcc} reference count on this L{_Observer} by one.
- This method is called when a client is unregistered in L{Registry} to stop
+ Decrements the pyatspi reference count on this L{_Observer} by one. This
+ method is called when a client is unregistered in L{Registry} to stop
receiving notifications of an event type monitored by this L{_Observer}.
'''
self.ref_count -= 1
'''
Starts keyboard event monitoring.
- @param reg: Reference to the raw registry object
- @type reg: Accessibility.Registry
+ @param dc: Reference to a device controller
+ @type dc: Accessibility.DeviceEventController
@param key_set: Set of keys to monitor
@type key_set: list of integer
@param mask: Integer modifier mask or an iterable over multiple masks to
'''
Stops keyboard event monitoring.
- @param reg: Reference to the raw registry object
- @type reg: Accessibility.Registry
+ @param dc: Reference to a device controller
+ @type dc: Accessibility.DeviceEventController
@param key_set: Set of keys to monitor
@type key_set: list of integer
@param mask: Integer modifier mask or an iterable over multiple masks to
reference to the Accessibility.Registry singleton. Doing so is harmless and
has no point.
- @ivar async
+ @ivar async: Should event dispatch to local listeners be decoupled from event
+ receiving from the registry?
@type async: boolean
- @ivar reg:
+ @ivar reg: Reference to the real, wrapped registry object
@type reg: Accessibility.Registry
- @ivar dev:
+ @ivar dev: Reference to the device controller
@type dev: Accessibility.DeviceEventController
- @ivar queue:
+ @ivar queue: Queue of events awaiting local dispatch
@type queue: Queue.Queue
- @ivar clients:
+ @ivar clients: Map of event names to client listeners
@type clients: dictionary
- @ivar observers:
+ @ivar observers: Map of event names to AT-SPI L{_Observer} objects
@type observers: dictionary
'''
def __init__(self, reg):
Registered clients will not be automatically removed when the client dies.
To ensure the client is properly garbage collected, call
- L{Manager.removeClient}.
+ L{deregisterEventListener}.
@param client: Callable to be invoked when the event occurs
@type client: callable
unregistration for all subevents if only partial event name is specified.
Do not include a trailing colon.
- This method must be called to ensure a client registered by
- L{Manager.addClient} is properly garbage collected.
+ This method must be called to ensure a client registered by
+ L{registerEventListener} is properly garbage collected.
@param client: Client callback to remove
@type client: callable
def handleEvent(self, event):
'''
Handles an AT-SPI event by either queuing it for later dispatch when the
- L{async} flag is set, or dispatching it immediately.
+ L{Registry.async} flag is set, or dispatching it immediately.
@param event: AT-SPI event
@type event: L{event.Event}
return ob
def _unregisterObserver(self, name):
- '''
- Destroys an existing L{Observer} for the given event type only if no clients
- are registered for the events it is monitoring.
+ '''
+ Destroys an existing L{_Observer} for the given event type only if no
+ clients are registered for the events it is monitoring.
@param name: Name of the event to observe
@type name: string
IBM Corporation under the BSD license, available at
U{http://www.opensource.org/licenses/bsd-license.php}
'''
+import Accessibility__POA
+
def getInterfaceIID(cls):
'''
Gets the ID of an interface class in string format for use in queryInterface.
- All alpha characters in the suffix are mapped to their uppercase.
The resulting name is used with getattr to look up a constant with that name
- in the L{pyLinAcc.Constants} module. If such a constant does not exist, the
- string suffix is returned instead.
+ in the L{constants} module. If such a constant does not exist, the string
+ suffix is returned instead.
This method allows strings to be used to refer to roles, relations, etc.
without direct access to the constants. It also supports the future expansion
def stateToString(value):
'''
Converts a state value to a string based on the name of the state constant in
- the L{Constants} module that has the given value.
+ the L{constants} module that has the given value.
@param value: An AT-SPI state
@type value: Accessibility.StateType
def relationToString(value):
'''
Converts a relation value to a string based on the name of the state constant
- in the L{Constants} module that has the given value.
+ in the L{constants} module that has the given value.
@param value: An AT-SPI relation
@type value: Accessibility.RelationType
def allModifiers():
'''
Generates all possible keyboard modifiers for use with
- L{Registry.Registry.registerKeystrokeListener}.
+ L{registry.Registry.registerKeystrokeListener}.
'''
mask = 0
while mask <= (1 << constants.MODIFIER_NUMLOCK):
if ret is not None: return ret
def _findDescendantBreadth(acc, pred):
- '''
- Internal function for locating one descendant. Called by
- L{AccessibleMixin.findDescendant} to start the search.
+ '''
+ Internal function for locating one descendant. Called by L{findDescendant} to
+ start the search.
@param acc: Root accessible of the search
@type acc: Accessibility.Accessible
def _findDescendantDepth(acc, pred):
'''
- Internal function for locating one descendant. Called by
- L{AccessibleMixin.findDescendant} to start the search.
+ Internal function for locating one descendant. Called by L{findDescendant} to
+ start the search.
@param acc: Root accessible of the search
@type acc: Accessibility.Accessible
except Exception:
raise LookupError
acc = acc.parent
+
+class StateSet(Accessibility__POA.StateSet):
+ '''
+ Convenience implementation of AT-SPI StateSet, for future use with Collection
+ interface.
+
+ @param states: Set of states
+ @type states: set
+ '''
+ def __init__(self, *states):
+ '''Initializes the state set with the given states.'''
+ self.states = set(states)
+
+ def contains(self, state):
+ '''
+ Checks if this L{StateSet} contains the given state.
+
+ @param state: State to check
+ @type state: Accessibility.StateType
+ @return: True if the set contains the given state
+ @rtype: boolean
+ '''
+ return state in self.states
+
+ def add(self, *state):
+ '''
+ Adds one or more states to this set.
+
+ @param state: State(s) to add
+ @type state: Accessibility.StateType
+ '''
+ self.states.add(state)
+
+ def remove(self, *state):
+ '''
+ Removes one or more states from this set.
+
+ @param state: State(s) to remove
+ @type state: Accessibility.StateType
+ '''
+ self.states.remove(state)
+
+ def equals(self, state_set):
+ '''
+ Checks if this L{StateSet} contains exactly the same members as the given
+ L{StateSet}.
+
+ @param state_set: Another set
+ @type state_set: L{StateSet}
+ @return: Are the sets equivalent in terms of their contents?
+ @rtype: boolean
+ '''
+ return self.state_set == self.states
+
+ def compare(self, state_set):
+ '''
+ Computes the symmetric differences of this L{StateSet} and the given
+ L{StateSet}.
+
+ @param state_set: Another set
+ @type state_set: L{StateSet}
+ @return: Elements in only one of the two sets
+ @rtype: L{StateSet}
+ '''
+ diff = self.states.symmetric_difference(state_set.states)
+ return StateSet(*diff)
+
+ def isEmpty(self):
+ '''
+ Checks if this L{StateSet} is empty.
+
+ @return: Is it empty?
+ @rtype: boolean
+ '''
+ return len(self.states) == 0
+
+ def getStates(self):
+ '''
+ Gets the sequence of all states in this set.
+
+ @return: List of states
+ @rtype: list
+ '''
+ return list(self.states)
\ No newline at end of file