#along with this program; if not, write to the Free Software
#Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
-#import registry
+import registry
-#Registry = registry.Registry(reg)
+#Registry = registry.Registry()
#registry.Registry = Registry
#del registry
from accessible import *
from application import *
from component import *
-from stateset import *
+from desktop import *
+from state import *
from relation import *
-
-from test import *
+from role import *
#from utils import *
import interfaces
from base import BaseProxy, Enum
from factory import create_accessible, add_accessible_class
-from stateset import StateSet, _marshal_state_set
+from state import StateSet, _marshal_state_set
from relation import _marshal_relation_set
+from role import Role
__all__ = [
"LOCALE_TYPE",
class BoundingBox(list):
def __new__(cls, x, y, width, height):
- list.__new__(cls, (x, y, width, height))
+ return list.__new__(cls, (x, y, width, height))
def __init__(self, x, y, width, height):
list.__init__(self, (x, y, width, height))
'children' and position in the accessible-object hierarchy,
whether or not they actually have children.
"""
+
+ def __len__(self):
+ return self.getChildCount()
+
+ def __getitem__(self, index):
+ return self.getChildAtIndex(index)
def getApplication(self):
"""
@return : a Role indicating the type of UI role played by this
object.
"""
- return self.cached_data.role
+ return Role(self.cached_data.role)
def getRoleName(self):
"""
class Enum(int):
def __str__(self):
- return self._enum_lookup(int(self))
+ return self._enum_lookup[int(self)]
#------------------------------------------------------------------------------
root = property(fget=_get_root)
+
#END---------------------------------------------------------------------------
import interfaces
from base import BaseProxy, Enum
from factory import create_accessible, add_accessible_class
+from accessible import BoundingBox
from dbus.types import Int16
visual representation.
"""
func = self.get_dbus_method("getExtents")
- return func(Int16(coord_type))
+ extents = func(Int16(coord_type))
+ return BoundingBox(*extents)
def getLayer(self, *args, **kwargs):
"""
@return the ComponentLayer in which this object resides.
"""
func = self.get_dbus_method("getLayer")
- return func(*args, **kwargs)
+ return ComponentLayer(func(*args, **kwargs))
def getMDIZOrder(self):
"""
#Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
import interfaces
-from factory import create_accessible
+from base import BaseProxyMeta
+from accessible import BoundingBox
+from cache import AccessibleCache
+from state import StateSet
+from role import ROLE_UNKNOWN
+from component import LAYER_WIDGET
__all__ = [
"Desktop",
#------------------------------------------------------------------------------
-class Accessible(BaseProxy):
+class ApplicationCache(object):
+
+ def __init__(self, connection, bus_name):
+ self._connection = connection
+ self._bus_name = bus_name
+ self._accessible_cache = AccessibleCache(connection, bus_name)
+
+ def __getitem__(self, key):
+ return self._accessible_cache
+
+ def __contains__(self, key):
+ if key == self._bus_name:
+ return True
+ else:
+ return False
+
+ def get_application_at_index(self, index):
+ pass
+
+ def get_application_count(self):
+ return 1
+
+#------------------------------------------------------------------------------
+
+class DesktopComponent(object):
+ """
+ The Component interface is implemented by objects which occupy
+ on-screen space, e.g. objects which have onscreen visual representations.
+ The methods in Component allow clients to identify where the
+ objects lie in the onscreen coordinate system, their relative
+ size, stacking order, and position. It also provides a mechanism
+ whereby keyboard focus may be transferred to specific user interface
+ elements programmatically. This is a 2D API, coordinates of 3D
+ objects are projected into the 2-dimensional screen view for
+ purposes of this interface.
+ """
+
+ def contains(self, *args, **kwargs):
+ """
+ @return True if the specified point lies within the Component's
+ bounding box, False otherwise.
+ """
+ return False
+
+ def deregisterFocusHandler(self, *args, **kwargs):
+ """
+ Request that an EventListener registered via registerFocusHandler
+ no longer be notified when this object receives keyboard focus.
+ """
+ pass
+
+ def getAccessibleAtPoint(self, *args, **kwargs):
+ """
+ @return the Accessible child whose bounding box contains the
+ specified point.
+ """
+ return None
+
+ def getAlpha(self, *args, **kwargs):
+ """
+ Obtain the alpha value of the component. An alpha value of 1.0
+ or greater indicates that the object is fully opaque, and an
+ alpha value of 0.0 indicates that the object is fully transparent.
+ Negative alpha values have no defined meaning at this time.
+ """
+ return 1.0
+
+ def getExtents(self, coord_type):
+ """
+ Obtain the Component's bounding box, in pixels, relative to the
+ specified coordinate system.
+ @param coord_type
+ @return a BoundingBox which entirely contains the object's onscreen
+ visual representation.
+ """
+ #TODO This needs to return the window size
+ return BoundingBox(*(0,0,1024,768))
+
+ def getLayer(self, *args, **kwargs):
+ """
+ @return the ComponentLayer in which this object resides.
+ """
+ return LAYER_WIDGET
+
+ def getMDIZOrder(self):
+ """
+ Obtain the relative stacking order (i.e. 'Z' order) of an object.
+ Larger values indicate that an object is on "top" of the stack,
+ therefore objects with smaller MDIZOrder may be obscured by objects
+ with a larger MDIZOrder, but not vice-versa.
+ @return an integer indicating the object's place in the stacking
+ order.
+ """
+ return 0
+
+ def getPosition(self, coord_type):
+ """
+ Obtain the position of the current component in the coordinate
+ system specified by coord_type.
+ @param : coord_type
+ @param : x
+ an out parameter which will be back-filled with the returned
+ x coordinate.
+ @param : y
+ an out parameter which will be back-filled with the returned
+ y coordinate.
+ """
+ return (0,0)
+
+ def getSize(self, *args, **kwargs):
+ """
+ Obtain the size, in the coordinate system specified by coord_type,
+ of the rectangular area which fully contains the object's visual
+ representation, without accounting for viewport clipping.
+ @param : width
+ the object's horizontal extents in the specified coordinate system.
+ @param : height
+ the object's vertical extents in the specified coordinate system.
+ """
+ #TODO Need to return window size
+ return (1024, 768)
+
+ def grabFocus(self, *args, **kwargs):
+ """
+ Request that the object obtain keyboard focus.
+ @return True if keyboard focus was successfully transferred to
+ the Component.
+ """
+ return False
+
+ def registerFocusHandler(self, *args, **kwargs):
+ """
+ Register an EventListener for notification when this object receives
+ keyboard focus.
+ """
+ pass
+
+#------------------------------------------------------------------------------
+
+class Desktop(object):
"""
The base interface which is implemented by all accessible objects.
All objects support interfaces for querying their contained
'children' and position in the accessible-object hierarchy,
whether or not they actually have children.
"""
-
+
+ __metaclass__ = BaseProxyMeta
+
+ def __init__(self, cache):
+ """
+ Creates a desktop object. There should be one single desktop
+ object for the Registry object.
+
+ @param cache - The application cache.
+ @kwarf application - The application D-Bus name
+
+ If the application name is provided the Desktop is being used for
+ test and will only report the application provided as its single child.
+ """
+ self._cache = cache
+ self._app_name = '/'
+
+ def __len__(self):
+ return self.getChildCount()
+
+ def __getitem__(self, index):
+ return self.getChildAtIndex(index)
+
def getApplication(self):
"""
Get the containing Application for this object.
an in parameter indicating which child is requested (zero-indexed).
@return : the 'nth' Accessible child of this object.
"""
- path = self.cached_data.children[index]
- return create_accessible(self._cache,
- self._app_name,
- path,
- interfaces.ATSPI_ACCESSIBLE,
- connection=self._cache._connection)
+ return self._cache.get_application_at_index(index)
def getIndexInParent(self):
"""
@return : a Role indicating the type of UI role played by this
object.
"""
- return self.cached_data.role
+ return ROLE_UNKNOWN
def getRoleName(self):
"""
@return : a StateSet encapsulating the currently true states
of the object.
"""
- return []
+ return StateSet()
def isEqual(self, accessible):
"""
@return : a boolean indicating whether the two object references
point to the same object.
"""
- return (self._app_name == accessible._app_name) and \
- (self._acc_path == accessible._acc_path)
+ return self == accessible
def get_childCount(self):
- return len(self.cached_data.children)
+ return self._cache.get_application_count()
_childCountDoc = \
"""
childCount: the number of children contained by this object.
"""
parent = property(fget=get_parent, doc=_parentDoc)
+ def queryInterface(self, interface):
+ """
+ Gets a different accessible interface for this object
+ or raises a NotImplemented error if the given interface
+ is not supported.
+ """
+ if interface == interfaces.ATSPI_ACCESSIBLE:
+ return self
+ elif interface == interfaces.ATSPI_COMPONENT:
+ return DesktopComponent()
+ else:
+ raise NotImplementedError(
+ "%s not supported by accessible object at path %s"
+ % (interface, self.path))
+
#END----------------------------------------------------------------------------
#authors: Peter Parente, Mark Doffman
+#import signal
+#import time
+#import weakref
+#import Queue
+#import traceback
+#import gobject
+#import utils
+#import constants
+#import event
+
+ATSPI_DEVICE_EVENT_CONTROLLER = 'org.freedesktop.atspi.DeviceEventController'
+ATSPI_DEVICE_EVENT_LISTENER = 'org.freedesktop.atspi.DeviceEventListener'
+
import constants
+class _Observer(object):
+ """
+ Parent class for all event observers. Dispatches all received events to the
+ L{Registry} that created this L{_Observer}. Provides basic reference counting
+ functionality needed by L{Registry} to determine when an L{_Observer} can be
+ released for garbage collection.
+
+ The reference counting provided by this class is independent of the reference
+ counting used by CORBA. Keeping the counts separate makes it easier for the
+ L{Registry} to detect when an L{_Observer} can be freed in the
+ L{Registry._unregisterObserver} method.
+
+ @ivar registry: Reference to the L{Registry} that created this L{_Observer}
+ @type registry: weakref.proxy to L{Registry}
+ @ivar ref_count: Reference count on this L{_Observer}
+ @type ref_count: integer
+ """
+ def __init__(self, registry):
+ """
+ Stores a reference to the creating L{Registry}. Intializes the reference
+ count on this object to zero.
+
+ @param registry: The L{Registry} that created this observer
+ @type registry: weakref.proxy to L{Registry}
+ """
+ self.registry = weakref.proxy(registry)
+ self.ref_count = 0
+
+ def clientRef(self):
+ """
+ Increments the Python reference count on this L{_Observer} by one. This
+ method is called when a new client is registered in L{Registry} to receive
+ notification of an event type monitored by this L{_Observer}.
+ """
+ self.ref_count += 1
+
+ def clientUnref(self):
+ """
+ Decrements the pyatspi reference count on this L{_Observer} by one. This
+ method is called when a client is unregistered in L{Registry} to stop
+ receiving notifications of an event type monitored by this L{_Observer}.
+ """
+ self.ref_count -= 1
+
+ def getClientRefCount(self):
+ """
+ @return: Current Python reference count on this L{_Observer}
+ @rtype: integer
+ """
+ return self.ref_count
+
+ def ref(self):
+ """Required by CORBA. Does nothing."""
+ pass
+
+ def unref(self):
+ """Required by CORBA. Does nothing."""
+ pass
+
+class _DeviceObserver(_Observer, Accessibility__POA.DeviceEventListener):
+ """
+ Observes keyboard press and release events.
+
+ @ivar registry: The L{Registry} that created this observer
+ @type registry: L{Registry}
+ @ivar key_set: Set of keys to monitor
+ @type key_set: list of integer
+ @ivar mask: Watch for key events while these modifiers are held
+ @type mask: integer
+ @ivar kind: Kind of events to monitor
+ @type kind: integer
+ @ivar mode: Keyboard event mode
+ @type mode: Accessibility.EventListenerMode
+ """
+ def __init__(self, registry, synchronous, preemptive, global_):
+ """
+ Creates a mode object that defines when key events will be received from
+ the system. Stores all other information for later registration.
+
+ @param registry: The L{Registry} that created this observer
+ @type registry: L{Registry}
+ @param synchronous: Handle the key event synchronously?
+ @type synchronous: boolean
+ @param preemptive: Allow event to be consumed?
+ @type preemptive: boolean
+ @param global_: Watch for events on inaccessible applications too?
+ @type global_: boolean
+ """
+ _Observer.__init__(self, registry)
+ self.mode = Accessibility.EventListenerMode()
+ self.mode.preemptive = preemptive
+ self.mode.synchronous = synchronous
+ self.mode._global = global_
+
+ def register(self, dc, key_set, mask, kind):
+ """
+ Starts keyboard event monitoring.
+
+ @param dc: Reference to a device controller
+ @type dc: Accessibility.DeviceEventController
+ @param key_set: Set of keys to monitor
+ @type key_set: list of integer
+ @param mask: Integer modifier mask or an iterable over multiple masks to
+ unapply all at once
+ @type mask: integer, iterable, or None
+ @param kind: Kind of events to monitor
+ @type kind: integer
+ """
+ try:
+ # check if the mask is iterable
+ iter(mask)
+ except TypeError:
+ # register a single integer if not
+ dc.registerKeystrokeListener(self._this(), key_set, mask, kind,
+ self.mode)
+ else:
+ for m in mask:
+ dc.registerKeystrokeListener(self._this(), key_set, m, kind, self.mode)
+
+ def unregister(self, dc, key_set, mask, kind):
+ """
+ Stops keyboard event monitoring.
+
+ @param dc: Reference to a device controller
+ @type dc: Accessibility.DeviceEventController
+ @param key_set: Set of keys to monitor
+ @type key_set: list of integer
+ @param mask: Integer modifier mask or an iterable over multiple masks to
+ unapply all at once
+ @type mask: integer, iterable, or None
+ @param kind: Kind of events to monitor
+ @type kind: integer
+ """
+ try:
+ # check if the mask is iterable
+ iter(mask)
+ except TypeError:
+ # unregister a single integer if not
+ dc.deregisterKeystrokeListener(self._this(), key_set, mask, kind)
+ else:
+ for m in mask:
+ dc.deregisterKeystrokeListener(self._this(), key_set, m, kind)
+
+ def queryInterface(self, repo_id):
+ """
+ Reports that this class only implements the AT-SPI DeviceEventListener
+ interface. Required by AT-SPI.
+
+ @param repo_id: Request for an interface
+ @type repo_id: string
+ @return: The underlying CORBA object for the device event listener
+ @rtype: Accessibility.EventListener
+ """
+ if repo_id == utils.getInterfaceIID(Accessibility.DeviceEventListener):
+ return self._this()
+ else:
+ return None
+
+ def notifyEvent(self, ev):
+ """
+ Notifies the L{Registry} that an event has occurred. Wraps the raw event
+ object in our L{Event} class to support automatic ref and unref calls. An
+ observer can return True to indicate this event should not be allowed to pass
+ to other AT-SPI observers or the underlying application.
+
+ @param ev: Keyboard event
+ @type ev: Accessibility.DeviceEvent
+ @return: Should the event be consumed (True) or allowed to pass on to other
+ AT-SPI observers (False)?
+ @rtype: boolean
+ """
+ # wrap the device event
+ ev = event.DeviceEvent(ev)
+ return self.registry.handleDeviceEvent(ev, self)
+
+class _EventObserver(_Observer, Accessibility__POA.EventListener):
+ """
+ Observes all non-keyboard AT-SPI events. Can be reused across event types.
+ """
+ def register(self, reg, name):
+ """
+ Starts monitoring for the given event.
+
+ @param name: Name of the event to start monitoring
+ @type name: string
+ @param reg: Reference to the raw registry object
+ @type reg: Accessibility.Registry
+ """
+ reg.registerGlobalEventListener(self._this(), name)
+
+ def unregister(self, reg, name):
+ """
+ Stops monitoring for the given event.
+
+ @param name: Name of the event to stop monitoring
+ @type name: string
+ @param reg: Reference to the raw registry object
+ @type reg: Accessibility.Registry
+ """
+ reg.deregisterGlobalEventListener(self._this(), name)
+
+ def queryInterface(self, repo_id):
+ """
+ Reports that this class only implements the AT-SPI DeviceEventListener
+ interface. Required by AT-SPI.
+
+ @param repo_id: Request for an interface
+ @type repo_id: string
+ @return: The underlying CORBA object for the device event listener
+ @rtype: Accessibility.EventListener
+ """
+ if repo_id == utils.getInterfaceIID(Accessibility.EventListener):
+ return self._this()
+ else:
+ return None
+
+ def notifyEvent(self, ev):
+ """
+ Notifies the L{Registry} that an event has occurred. Wraps the raw event
+ object in our L{Event} class to support automatic ref and unref calls.
+ Aborts on any exception indicating the event could not be wrapped.
+
+ @param ev: AT-SPI event signal (anything but keyboard)
+ @type ev: Accessibility.Event
+ """
+ # wrap raw event so ref counts are correct before queueing
+ ev = event.Event(ev)
+ self.registry.handleEvent(ev)
+
class DeviceEvent(object):
"""
Wraps an AT-SPI device event with a more Pythonic interface. Primarily adds
for i in xrange(len(split)):
# store values of attributes in this object
setattr(self, self.format[i], split[i])
+
+class Registry(object):
+ """
+ Wraps the Accessibility.Registry to provide more Pythonic registration for
+ events.
+
+ This object should be treated as a singleton, but such treatment is not
+ enforced. You can construct another instance of this object and give it a
+ reference to the Accessibility.Registry singleton. Doing so is harmless and
+ has no point.
+
+ @ivar async: Should event dispatch to local listeners be decoupled from event
+ receiving from the registry?
+ @type async: boolean
+ @ivar reg: Reference to the real, wrapped registry object
+ @type reg: Accessibility.Registry
+ @ivar dev: Reference to the device controller
+ @type dev: Accessibility.DeviceEventController
+ @ivar queue: Queue of events awaiting local dispatch
+ @type queue: Queue.Queue
+ @ivar clients: Map of event names to client listeners
+ @type clients: dictionary
+ @ivar observers: Map of event names to AT-SPI L{_Observer} objects
+ @type observers: dictionary
+ """
+
+ _REGISTRY_NAME = 'org.freedesktop.atspi.Registry'
+
+ def __init__(self, app_name=None):
+ """
+ Stores a reference to the AT-SPI registry. Gets and stores a reference
+ to the DeviceEventController.
+
+ @param reg: Reference to the AT-SPI registry daemon
+ @type reg: Accessibility.Registry
+ """
+ self._bus = dbus.SessionBus()
+ if app_path:
+ self._app_name = app_name
+ self._cache = TestApplicationCache(self._bus, app_name)
+
+ #self.async = None
+ #self.reg = reg
+ #self.dev = self.reg.getDeviceEventController()
+ #self.queue = Queue.Queue()
+ #self.clients = {}
+ #self.observers = {}
+
+ def __call__(self):
+ """
+ @return: This instance of the registry
+ @rtype: L{Registry}
+ """
+ return self
+
+ def start(self, async=False, gil=True):
+ """
+ Enter the main loop to start receiving and dispatching events.
+
+ @param async: Should event dispatch be asynchronous (decoupled) from
+ event receiving from the AT-SPI registry?
+ @type async: boolean
+ @param gil: Add an idle callback which releases the Python GIL for a few
+ milliseconds to allow other threads to run? Necessary if other threads
+ will be used in this process.
+ Note - No Longer used.
+ @type gil: boolean
+ """
+ self._loop = gobject.MainLoop()
+ self._loop.run()
+
+ def stop(self, *args):
+ """Quits the main loop."""
+ self._loop.quit()
+ self.flushEvents()
+
+ def getDesktopCount(self):
+ """
+ Gets the number of available desktops.
+
+ @return: Number of desktops
+ @rtype: integer
+ """
+ return 1
+
+ def getDesktop(self, i):
+ """
+ Gets a reference to the i-th desktop.
+
+ @param i: Which desktop to get
+ @type i: integer
+ @return: Desktop reference
+ @rtype: Accessibility.Desktop
+ """
+ return Desktop(self._cache)
+
+ def registerEventListener(self, client, *names):
+ """
+ Registers a new client callback for the given event names. Supports
+ registration for all subevents if only partial event name is specified.
+ Do not include a trailing colon.
+
+ For example, 'object' will register for all object events,
+ 'object:property-change' will register for all property change events,
+ and 'object:property-change:accessible-parent' will register only for the
+ parent property change event.
+
+ Registered clients will not be automatically removed when the client dies.
+ To ensure the client is properly garbage collected, call
+ L{deregisterEventListener}.
+
+ @param client: Callable to be invoked when the event occurs
+ @type client: callable
+ @param names: List of full or partial event names
+ @type names: list of string
+ """
+ for name in names:
+ # store the callback for each specific event name
+ self._registerClients(client, name)
+
+ def deregisterEventListener(self, client, *names):
+ """
+ Unregisters an existing client callback for the given event names. Supports
+ unregistration for all subevents if only partial event name is specified.
+ Do not include a trailing colon.
+
+ This method must be called to ensure a client registered by
+ L{registerEventListener} is properly garbage collected.
+
+ @param client: Client callback to remove
+ @type client: callable
+ @param names: List of full or partial event names
+ @type names: list of string
+ @return: Were event names specified for which the given client was not
+ registered?
+ @rtype: boolean
+ """
+ missed = False
+ for name in names:
+ # remove the callback for each specific event name
+ missed |= self._unregisterClients(client, name)
+ return missed
+
+ def registerKeystrokeListener(self, client, key_set=[], mask=0,
+ kind=(constants.KEY_PRESSED_EVENT,
+ constants.KEY_RELEASED_EVENT),
+ synchronous=True, preemptive=True,
+ global_=False):
+ """
+ Registers a listener for key stroke events.
+
+ @param client: Callable to be invoked when the event occurs
+ @type client: callable
+ @param key_set: Set of hardware key codes to stop monitoring. Leave empty
+ to indicate all keys.
+ @type key_set: list of integer
+ @param mask: When the mask is None, the codes in the key_set will be
+ monitored only when no modifier is held. When the mask is an
+ integer, keys in the key_set will be monitored only when the modifiers in
+ the mask are held. When the mask is an iterable over more than one
+ integer, keys in the key_set will be monitored when any of the modifier
+ combinations in the set are held.
+ @type mask: integer, iterable, None
+ @param kind: Kind of events to watch, KEY_PRESSED_EVENT or
+ KEY_RELEASED_EVENT.
+ @type kind: list
+ @param synchronous: Should the callback notification be synchronous, giving
+ the client the chance to consume the event?
+ @type synchronous: boolean
+ @param preemptive: Should the callback be allowed to preempt / consume the
+ event?
+ @type preemptive: boolean
+ @param global_: Should callback occur even if an application not supporting
+ AT-SPI is in the foreground? (requires xevie)
+ @type global_: boolean
+ """
+ try:
+ # see if we already have an observer for this client
+ ob = self.clients[client]
+ except KeyError:
+ # create a new device observer for this client
+ ob = _DeviceObserver(self, synchronous, preemptive, global_)
+ # store the observer to client mapping, and the inverse
+ self.clients[ob] = client
+ self.clients[client] = ob
+ if mask is None:
+ # None means all modifier combinations
+ mask = utils.allModifiers()
+ # register for new keystrokes on the observer
+ ob.register(self.dev, key_set, mask, kind)
+
+ def deregisterKeystrokeListener(self, client, key_set=[], mask=0,
+ kind=(constants.KEY_PRESSED_EVENT,
+ constants.KEY_RELEASED_EVENT)):
+ """
+ Deregisters a listener for key stroke events.
+
+ @param client: Callable to be invoked when the event occurs
+ @type client: callable
+ @param key_set: Set of hardware key codes to stop monitoring. Leave empty
+ to indicate all keys.
+ @type key_set: list of integer
+ @param mask: When the mask is None, the codes in the key_set will be
+ monitored only when no modifier is held. When the mask is an
+ integer, keys in the key_set will be monitored only when the modifiers in
+ the mask are held. When the mask is an iterable over more than one
+ integer, keys in the key_set will be monitored when any of the modifier
+ combinations in the set are held.
+ @type mask: integer, iterable, None
+ @param kind: Kind of events to stop watching, KEY_PRESSED_EVENT or
+ KEY_RELEASED_EVENT.
+ @type kind: list
+ @raise KeyError: When the client isn't already registered for events
+ """
+ # see if we already have an observer for this client
+ ob = self.clients[client]
+ if mask is None:
+ # None means all modifier combinations
+ mask = utils.allModifiers()
+ # register for new keystrokes on the observer
+ ob.unregister(self.dev, key_set, mask, kind)
+
+ def generateKeyboardEvent(self, keycode, keysym, kind):
+ """
+ Generates a keyboard event. One of the keycode or the keysym parameters
+ should be specified and the other should be None. The kind parameter is
+ required and should be one of the KEY_PRESS, KEY_RELEASE, KEY_PRESSRELEASE,
+ KEY_SYM, or KEY_STRING.
+
+ @param keycode: Hardware keycode or None
+ @type keycode: integer
+ @param keysym: Symbolic key string or None
+ @type keysym: string
+ @param kind: Kind of event to synthesize
+ @type kind: integer
+ """
+ if keysym is None:
+ self.dev.generateKeyboardEvent(keycode, '', kind)
+ else:
+ self.dev.generateKeyboardEvent(None, keysym, kind)
+
+ def generateMouseEvent(self, x, y, name):
+ """
+ Generates a mouse event at the given absolute x and y coordinate. The kind
+ of event generated is specified by the name. For example, MOUSE_B1P
+ (button 1 press), MOUSE_REL (relative motion), MOUSE_B3D (butten 3
+ double-click).
+
+ @param x: Horizontal coordinate, usually left-hand oriented
+ @type x: integer
+ @param y: Vertical coordinate, usually left-hand oriented
+ @type y: integer
+ @param name: Name of the event to generate
+ @type name: string
+ """
+ self.dev.generateMouseEvent(x, y, name)
+
+ def handleDeviceEvent(self, event, ob):
+ """
+ Dispatches L{event.DeviceEvent}s to registered clients. Clients are called
+ in the order they were registered for the given AT-SPI event. If any
+ client returns True, callbacks cease for the event for clients of this registry
+ instance. Clients of other registry instances and clients in other processes may
+ be affected depending on the values of synchronous and preemptive used when invoking
+ L{registerKeystrokeListener}.
+
+ @note: Asynchronous dispatch of device events is not supported.
+
+ @param event: AT-SPI device event
+ @type event: L{event.DeviceEvent}
+ @param ob: Observer that received the event
+ @type ob: L{_DeviceObserver}
+
+ @return: Should the event be consumed (True) or allowed to pass on to other
+ AT-SPI observers (False)?
+ @rtype: boolean
+ """
+ try:
+ # try to get the client registered for this event type
+ client = self.clients[ob]
+ except KeyError:
+ # client may have unregistered recently, ignore event
+ return False
+ # make the call to the client
+ try:
+ return client(event) or event.consume
+ except Exception:
+ # print the exception, but don't let it stop notification
+ traceback.print_exc()
+
+ def handleEvent(self, event):
+ """
+ Handles an AT-SPI event by either queuing it for later dispatch when the
+ L{Registry.async} flag is set, or dispatching it immediately.
+
+ @param event: AT-SPI event
+ @type event: L{event.Event}
+ """
+ if self.async:
+ # queue for now
+ self.queue.put_nowait(event)
+ else:
+ # dispatch immediately
+ self._dispatchEvent(event)
+
+ def _dispatchEvent(self, event):
+ """
+ Dispatches L{event.Event}s to registered clients. Clients are called in
+ the order they were registered for the given AT-SPI event. If any client
+ returns True, callbacks cease for the event for clients of this registry
+ instance. Clients of other registry instances and clients in other processes
+ are unaffected.
+
+ @param event: AT-SPI event
+ @type event: L{event.Event}
+ """
+ et = event.type
+ try:
+ # try to get the client registered for this event type
+ clients = self.clients[et.name]
+ except KeyError:
+ try:
+ # we may not have registered for the complete subtree of events
+ # if our tree does not list all of a certain type (e.g.
+ # object:state-changed:*); try again with klass and major only
+ if et.detail is not None:
+ # Strip the 'detail' field.
+ clients = self.clients['%s:%s:%s' % (et.klass, et.major, et.minor)]
+ elif et.minor is not None:
+ # The event could possibly be object:state-changed:*.
+ clients = self.clients['%s:%s' % (et.klass, et.major)]
+ except KeyError:
+ # client may have unregistered recently, ignore event
+ return
+ # make the call to each client
+ consume = False
+ for client in clients:
+ try:
+ consume = client(event) or False
+ except Exception:
+ # print the exception, but don't let it stop notification
+ traceback.print_exc()
+ if consume or event.consume:
+ # don't allow further processing if a client returns True
+ break
+
+ def flushEvents(self):
+ """
+ Flushes the event queue by destroying it and recreating it.
+ """
+ self.queue = Queue.Queue()
+
+ def pumpQueuedEvents(self, num=-1):
+ """
+ Provides asynch processing of events in the queue by executeing them with
+ _dispatchEvent() (as is done immediately when synch processing).
+ This method would normally be called from a main loop or idle function.
+
+ @param num: Number of events to pump. If number is negative it pumps
+ the entire queue. Default is -1.
+ @type num: integer
+ @return: True if queue is not empty after events were pumped.
+ @rtype: boolean
+ """
+ if num < 0:
+ # Dequeue as many events as currently in the queue.
+ num = self.queue.qsize()
+ for i in xrange(num):
+ try:
+ # get next waiting event
+ event = self.queue.get_nowait()
+ except Queue.Empty:
+ break
+ self._dispatchEvent(event)
+
+ return not self.queue.empty()
+
+ def _registerClients(self, client, name):
+ """
+ Internal method that recursively associates a client with AT-SPI event
+ names. Allows a client to incompletely specify an event name in order to
+ register for subevents without specifying their full names manually.
+
+ @param client: Client callback to receive event notifications
+ @type client: callable
+ @param name: Partial or full event name
+ @type name: string
+ """
+ try:
+ # look for an event name in our event tree dictionary
+ events = constants.EVENT_TREE[name]
+ except KeyError:
+ # if the event name doesn't exist, it's a leaf event meaning there are
+ # no subtypes for that event
+ # add this client to the list of clients already in the dictionary
+ # using the event name as the key; if there are no clients yet for this
+ # event, insert an empty list into the dictionary before appending
+ # the client
+ et = event.EventType(name)
+ clients = self.clients.setdefault(et.name, [])
+ try:
+ # if this succeeds, this client is already registered for the given
+ # event type, so ignore the request
+ clients.index(client)
+ except ValueError:
+ # else register the client
+ clients.append(client)
+ self._registerObserver(name)
+ else:
+ # if the event name does exist in the tree, there are subevents for
+ # this event; loop through them calling this method again to get to
+ # the leaf events
+ for e in events:
+ self._registerClients(client, e)
+
+ def _unregisterClients(self, client, name):
+ """
+ Internal method that recursively unassociates a client with AT-SPI event
+ names. Allows a client to incompletely specify an event name in order to
+ unregister for subevents without specifying their full names manually.
+
+ @param client: Client callback to receive event notifications
+ @type client: callable
+ @param name: Partial or full event name
+ @type name: string
+ """
+ missed = False
+ try:
+ # look for an event name in our event tree dictionary
+ events = constants.EVENT_TREE[name]
+ except KeyError:
+ try:
+ # if the event name doesn't exist, it's a leaf event meaning there are
+ # no subtypes for that event
+ # get the list of registered clients and try to remove the one provided
+ et = event.EventType(name)
+ clients = self.clients[et.name]
+ clients.remove(client)
+ self._unregisterObserver(name)
+ except (ValueError, KeyError):
+ # ignore any exceptions indicating the client is not registered
+ missed = True
+ return missed
+ # if the event name does exist in the tree, there are subevents for this
+ # event; loop through them calling this method again to get to the leaf
+ # events
+ for e in events:
+ missed |= self._unregisterClients(client, e)
+ return missed
+
+ def _registerObserver(self, name):
+ """
+ Creates a new L{_Observer} to watch for events of the given type or
+ returns the existing observer if one is already registered. One
+ L{_Observer} is created for each leaf in the L{constants.EVENT_TREE} or
+ any event name not found in the tree.
+
+ @param name: Raw name of the event to observe
+ @type name: string
+ @return: L{_Observer} object that is monitoring the event
+ @rtype: L{_Observer}
+ """
+ et = event.EventType(name)
+ try:
+ # see if an observer already exists for this event
+ ob = self.observers[et.name]
+ except KeyError:
+ # build a new observer if one does not exist
+ ob = _EventObserver(self)
+ # we have to register for the raw name because it may be different from
+ # the parsed name determined by EventType (e.g. trailing ':' might be
+ # missing)
+ ob.register(self.reg, name)
+ self.observers[et.name] = ob
+ # increase our client ref count so we know someone new is watching for the
+ # event
+ ob.clientRef()
+ return ob
+
+ def _unregisterObserver(self, name):
+ """
+ Destroys an existing L{_Observer} for the given event type only if no
+ clients are registered for the events it is monitoring.
+
+ @param name: Name of the event to observe
+ @type name: string
+ @raise KeyError: When an observer for the given event is not regist
+ """
+ et = event.EventType(name)
+ # see if an observer already exists for this event
+ ob = self.observers[et.name]
+ ob.clientUnref()
+ if ob.getClientRefCount() == 0:
+ ob.unregister(self.reg, name)
+ del self.observers[et.name]
#authors: Peter Parente, Mark Doffman
-import signal
-import time
-import weakref
-import Queue
-import traceback
-import gobject
-import utils
-import constants
-import event
+import dbus
-ATSPI_DEVICE_EVENT_CONTROLLER = 'org.freedesktop.atspi.DeviceEventController'
-ATSPI_DEVICE_EVENT_LISTENER = 'org.freedesktop.atspi.DeviceEventListener'
+from dbus.mainloop.glib import DBusGMainLoop
+DBusGMainLoop(set_as_default=True)
-class _Observer(object):
- """
- Parent class for all event observers. Dispatches all received events to the
- L{Registry} that created this L{_Observer}. Provides basic reference counting
- functionality needed by L{Registry} to determine when an L{_Observer} can be
- released for garbage collection.
-
- The reference counting provided by this class is independent of the reference
- counting used by CORBA. Keeping the counts separate makes it easier for the
- L{Registry} to detect when an L{_Observer} can be freed in the
- L{Registry._unregisterObserver} method.
-
- @ivar registry: Reference to the L{Registry} that created this L{_Observer}
- @type registry: weakref.proxy to L{Registry}
- @ivar ref_count: Reference count on this L{_Observer}
- @type ref_count: integer
- """
- def __init__(self, registry):
- """
- Stores a reference to the creating L{Registry}. Intializes the reference
- count on this object to zero.
-
- @param registry: The L{Registry} that created this observer
- @type registry: weakref.proxy to L{Registry}
- """
- self.registry = weakref.proxy(registry)
- self.ref_count = 0
-
- def clientRef(self):
- """
- Increments the Python reference count on this L{_Observer} by one. This
- method is called when a new client is registered in L{Registry} to receive
- notification of an event type monitored by this L{_Observer}.
- """
- self.ref_count += 1
-
- def clientUnref(self):
- """
- Decrements the pyatspi reference count on this L{_Observer} by one. This
- method is called when a client is unregistered in L{Registry} to stop
- receiving notifications of an event type monitored by this L{_Observer}.
- """
- self.ref_count -= 1
-
- def getClientRefCount(self):
- """
- @return: Current Python reference count on this L{_Observer}
- @rtype: integer
- """
- return self.ref_count
-
- def ref(self):
- """Required by CORBA. Does nothing."""
- pass
-
- def unref(self):
- """Required by CORBA. Does nothing."""
- pass
-
-class _DeviceObserver(_Observer, Accessibility__POA.DeviceEventListener):
- """
- Observes keyboard press and release events.
-
- @ivar registry: The L{Registry} that created this observer
- @type registry: L{Registry}
- @ivar key_set: Set of keys to monitor
- @type key_set: list of integer
- @ivar mask: Watch for key events while these modifiers are held
- @type mask: integer
- @ivar kind: Kind of events to monitor
- @type kind: integer
- @ivar mode: Keyboard event mode
- @type mode: Accessibility.EventListenerMode
- """
- def __init__(self, registry, synchronous, preemptive, global_):
- """
- Creates a mode object that defines when key events will be received from
- the system. Stores all other information for later registration.
-
- @param registry: The L{Registry} that created this observer
- @type registry: L{Registry}
- @param synchronous: Handle the key event synchronously?
- @type synchronous: boolean
- @param preemptive: Allow event to be consumed?
- @type preemptive: boolean
- @param global_: Watch for events on inaccessible applications too?
- @type global_: boolean
- """
- _Observer.__init__(self, registry)
- self.mode = Accessibility.EventListenerMode()
- self.mode.preemptive = preemptive
- self.mode.synchronous = synchronous
- self.mode._global = global_
-
- def register(self, dc, key_set, mask, kind):
- """
- Starts keyboard event monitoring.
-
- @param dc: Reference to a device controller
- @type dc: Accessibility.DeviceEventController
- @param key_set: Set of keys to monitor
- @type key_set: list of integer
- @param mask: Integer modifier mask or an iterable over multiple masks to
- unapply all at once
- @type mask: integer, iterable, or None
- @param kind: Kind of events to monitor
- @type kind: integer
- """
- try:
- # check if the mask is iterable
- iter(mask)
- except TypeError:
- # register a single integer if not
- dc.registerKeystrokeListener(self._this(), key_set, mask, kind,
- self.mode)
- else:
- for m in mask:
- dc.registerKeystrokeListener(self._this(), key_set, m, kind, self.mode)
-
- def unregister(self, dc, key_set, mask, kind):
- """
- Stops keyboard event monitoring.
-
- @param dc: Reference to a device controller
- @type dc: Accessibility.DeviceEventController
- @param key_set: Set of keys to monitor
- @type key_set: list of integer
- @param mask: Integer modifier mask or an iterable over multiple masks to
- unapply all at once
- @type mask: integer, iterable, or None
- @param kind: Kind of events to monitor
- @type kind: integer
- """
- try:
- # check if the mask is iterable
- iter(mask)
- except TypeError:
- # unregister a single integer if not
- dc.deregisterKeystrokeListener(self._this(), key_set, mask, kind)
- else:
- for m in mask:
- dc.deregisterKeystrokeListener(self._this(), key_set, m, kind)
-
- def queryInterface(self, repo_id):
- """
- Reports that this class only implements the AT-SPI DeviceEventListener
- interface. Required by AT-SPI.
-
- @param repo_id: Request for an interface
- @type repo_id: string
- @return: The underlying CORBA object for the device event listener
- @rtype: Accessibility.EventListener
- """
- if repo_id == utils.getInterfaceIID(Accessibility.DeviceEventListener):
- return self._this()
- else:
- return None
-
- def notifyEvent(self, ev):
- """
- Notifies the L{Registry} that an event has occurred. Wraps the raw event
- object in our L{Event} class to support automatic ref and unref calls. An
- observer can return True to indicate this event should not be allowed to pass
- to other AT-SPI observers or the underlying application.
-
- @param ev: Keyboard event
- @type ev: Accessibility.DeviceEvent
- @return: Should the event be consumed (True) or allowed to pass on to other
- AT-SPI observers (False)?
- @rtype: boolean
- """
- # wrap the device event
- ev = event.DeviceEvent(ev)
- return self.registry.handleDeviceEvent(ev, self)
-
-class _EventObserver(_Observer, Accessibility__POA.EventListener):
- """
- Observes all non-keyboard AT-SPI events. Can be reused across event types.
- """
- def register(self, reg, name):
- """
- Starts monitoring for the given event.
-
- @param name: Name of the event to start monitoring
- @type name: string
- @param reg: Reference to the raw registry object
- @type reg: Accessibility.Registry
- """
- reg.registerGlobalEventListener(self._this(), name)
-
- def unregister(self, reg, name):
- """
- Stops monitoring for the given event.
-
- @param name: Name of the event to stop monitoring
- @type name: string
- @param reg: Reference to the raw registry object
- @type reg: Accessibility.Registry
- """
- reg.deregisterGlobalEventListener(self._this(), name)
-
- def queryInterface(self, repo_id):
- """
- Reports that this class only implements the AT-SPI DeviceEventListener
- interface. Required by AT-SPI.
-
- @param repo_id: Request for an interface
- @type repo_id: string
- @return: The underlying CORBA object for the device event listener
- @rtype: Accessibility.EventListener
- """
- if repo_id == utils.getInterfaceIID(Accessibility.EventListener):
- return self._this()
- else:
- return None
-
- def notifyEvent(self, ev):
- """
- Notifies the L{Registry} that an event has occurred. Wraps the raw event
- object in our L{Event} class to support automatic ref and unref calls.
- Aborts on any exception indicating the event could not be wrapped.
-
- @param ev: AT-SPI event signal (anything but keyboard)
- @type ev: Accessibility.Event
- """
- # wrap raw event so ref counts are correct before queueing
- ev = event.Event(ev)
- self.registry.handleEvent(ev)
+from test import TestApplicationCache
+from desktop import Desktop
class Registry(object):
"""
@ivar observers: Map of event names to AT-SPI L{_Observer} objects
@type observers: dictionary
"""
- def __init__(self, reg):
+
+ _REGISTRY_NAME = 'org.freedesktop.atspi.Registry'
+
+ def __init__(self, app_name=None):
"""
Stores a reference to the AT-SPI registry. Gets and stores a reference
to the DeviceEventController.
@param reg: Reference to the AT-SPI registry daemon
@type reg: Accessibility.Registry
"""
- self.async = None
- self.reg = reg
- self.dev = self.reg.getDeviceEventController()
- self.queue = Queue.Queue()
- self.clients = {}
- self.observers = {}
+ self._bus = dbus.SessionBus()
+ if app_name:
+ self._app_name = app_name
+ self._cache = TestApplicationCache(self._bus, app_name)
def __call__(self):
"""
@param gil: Add an idle callback which releases the Python GIL for a few
milliseconds to allow other threads to run? Necessary if other threads
will be used in this process.
+ Note - No Longer used.
@type gil: boolean
"""
- self.async = async
-
- if gil:
- def releaseGIL():
- try:
- time.sleep(1e-5)
- except KeyboardInterrupt, e:
- # store the exception for later
- releaseGIL.keyboard_exception = e
- self.stop()
- return True
- # make room for an exception if one occurs during the
- releaseGIL.keyboard_exception = None
- i = gobject.idle_add(releaseGIL)
-
- # enter the main loop
- try:
- bonobo.main()
- finally:
- # clear all observers
- for name, ob in self.observers.items():
- ob.unregister(self.reg, name)
- if gil:
- gobject.source_remove(i)
- if releaseGIL.keyboard_exception is not None:
- # raise an keyboard exception we may have gotten earlier
- raise releaseGIL.keyboard_exception
+ self._loop = gobject.MainLoop()
+ self._loop.run()
def stop(self, *args):
"""Quits the main loop."""
- try:
- bonobo.main_quit()
- except RuntimeError:
- # ignore errors when quitting (probably already quitting)
- pass
+ self._loop.quit()
self.flushEvents()
def getDesktopCount(self):
@return: Number of desktops
@rtype: integer
- @raise LookupError: When the count cannot be retrieved
"""
- try:
- return self.reg.getDesktopCount()
- except Exception:
- raise LookupError
+ return 1
def getDesktop(self, i):
"""
@type i: integer
@return: Desktop reference
@rtype: Accessibility.Desktop
- @raise LookupError: When the i-th desktop cannot be retrieved
- """
- try:
- return self.reg.getDesktop(i)
- except Exception, e:
- raise LookupError(e)
-
- def registerEventListener(self, client, *names):
- """
- Registers a new client callback for the given event names. Supports
- registration for all subevents if only partial event name is specified.
- Do not include a trailing colon.
-
- For example, 'object' will register for all object events,
- 'object:property-change' will register for all property change events,
- and 'object:property-change:accessible-parent' will register only for the
- parent property change event.
-
- Registered clients will not be automatically removed when the client dies.
- To ensure the client is properly garbage collected, call
- L{deregisterEventListener}.
-
- @param client: Callable to be invoked when the event occurs
- @type client: callable
- @param names: List of full or partial event names
- @type names: list of string
- """
- for name in names:
- # store the callback for each specific event name
- self._registerClients(client, name)
-
- def deregisterEventListener(self, client, *names):
- """
- Unregisters an existing client callback for the given event names. Supports
- unregistration for all subevents if only partial event name is specified.
- Do not include a trailing colon.
-
- This method must be called to ensure a client registered by
- L{registerEventListener} is properly garbage collected.
-
- @param client: Client callback to remove
- @type client: callable
- @param names: List of full or partial event names
- @type names: list of string
- @return: Were event names specified for which the given client was not
- registered?
- @rtype: boolean
- """
- missed = False
- for name in names:
- # remove the callback for each specific event name
- missed |= self._unregisterClients(client, name)
- return missed
-
- def registerKeystrokeListener(self, client, key_set=[], mask=0,
- kind=(constants.KEY_PRESSED_EVENT,
- constants.KEY_RELEASED_EVENT),
- synchronous=True, preemptive=True,
- global_=False):
- """
- Registers a listener for key stroke events.
-
- @param client: Callable to be invoked when the event occurs
- @type client: callable
- @param key_set: Set of hardware key codes to stop monitoring. Leave empty
- to indicate all keys.
- @type key_set: list of integer
- @param mask: When the mask is None, the codes in the key_set will be
- monitored only when no modifier is held. When the mask is an
- integer, keys in the key_set will be monitored only when the modifiers in
- the mask are held. When the mask is an iterable over more than one
- integer, keys in the key_set will be monitored when any of the modifier
- combinations in the set are held.
- @type mask: integer, iterable, None
- @param kind: Kind of events to watch, KEY_PRESSED_EVENT or
- KEY_RELEASED_EVENT.
- @type kind: list
- @param synchronous: Should the callback notification be synchronous, giving
- the client the chance to consume the event?
- @type synchronous: boolean
- @param preemptive: Should the callback be allowed to preempt / consume the
- event?
- @type preemptive: boolean
- @param global_: Should callback occur even if an application not supporting
- AT-SPI is in the foreground? (requires xevie)
- @type global_: boolean
- """
- try:
- # see if we already have an observer for this client
- ob = self.clients[client]
- except KeyError:
- # create a new device observer for this client
- ob = _DeviceObserver(self, synchronous, preemptive, global_)
- # store the observer to client mapping, and the inverse
- self.clients[ob] = client
- self.clients[client] = ob
- if mask is None:
- # None means all modifier combinations
- mask = utils.allModifiers()
- # register for new keystrokes on the observer
- ob.register(self.dev, key_set, mask, kind)
-
- def deregisterKeystrokeListener(self, client, key_set=[], mask=0,
- kind=(constants.KEY_PRESSED_EVENT,
- constants.KEY_RELEASED_EVENT)):
- """
- Deregisters a listener for key stroke events.
-
- @param client: Callable to be invoked when the event occurs
- @type client: callable
- @param key_set: Set of hardware key codes to stop monitoring. Leave empty
- to indicate all keys.
- @type key_set: list of integer
- @param mask: When the mask is None, the codes in the key_set will be
- monitored only when no modifier is held. When the mask is an
- integer, keys in the key_set will be monitored only when the modifiers in
- the mask are held. When the mask is an iterable over more than one
- integer, keys in the key_set will be monitored when any of the modifier
- combinations in the set are held.
- @type mask: integer, iterable, None
- @param kind: Kind of events to stop watching, KEY_PRESSED_EVENT or
- KEY_RELEASED_EVENT.
- @type kind: list
- @raise KeyError: When the client isn't already registered for events
- """
- # see if we already have an observer for this client
- ob = self.clients[client]
- if mask is None:
- # None means all modifier combinations
- mask = utils.allModifiers()
- # register for new keystrokes on the observer
- ob.unregister(self.dev, key_set, mask, kind)
-
- def generateKeyboardEvent(self, keycode, keysym, kind):
- """
- Generates a keyboard event. One of the keycode or the keysym parameters
- should be specified and the other should be None. The kind parameter is
- required and should be one of the KEY_PRESS, KEY_RELEASE, KEY_PRESSRELEASE,
- KEY_SYM, or KEY_STRING.
-
- @param keycode: Hardware keycode or None
- @type keycode: integer
- @param keysym: Symbolic key string or None
- @type keysym: string
- @param kind: Kind of event to synthesize
- @type kind: integer
- """
- if keysym is None:
- self.dev.generateKeyboardEvent(keycode, '', kind)
- else:
- self.dev.generateKeyboardEvent(None, keysym, kind)
-
- def generateMouseEvent(self, x, y, name):
- """
- Generates a mouse event at the given absolute x and y coordinate. The kind
- of event generated is specified by the name. For example, MOUSE_B1P
- (button 1 press), MOUSE_REL (relative motion), MOUSE_B3D (butten 3
- double-click).
-
- @param x: Horizontal coordinate, usually left-hand oriented
- @type x: integer
- @param y: Vertical coordinate, usually left-hand oriented
- @type y: integer
- @param name: Name of the event to generate
- @type name: string
- """
- self.dev.generateMouseEvent(x, y, name)
-
- def handleDeviceEvent(self, event, ob):
- """
- Dispatches L{event.DeviceEvent}s to registered clients. Clients are called
- in the order they were registered for the given AT-SPI event. If any
- client returns True, callbacks cease for the event for clients of this registry
- instance. Clients of other registry instances and clients in other processes may
- be affected depending on the values of synchronous and preemptive used when invoking
- L{registerKeystrokeListener}.
-
- @note: Asynchronous dispatch of device events is not supported.
-
- @param event: AT-SPI device event
- @type event: L{event.DeviceEvent}
- @param ob: Observer that received the event
- @type ob: L{_DeviceObserver}
-
- @return: Should the event be consumed (True) or allowed to pass on to other
- AT-SPI observers (False)?
- @rtype: boolean
- """
- try:
- # try to get the client registered for this event type
- client = self.clients[ob]
- except KeyError:
- # client may have unregistered recently, ignore event
- return False
- # make the call to the client
- try:
- return client(event) or event.consume
- except Exception:
- # print the exception, but don't let it stop notification
- traceback.print_exc()
-
- def handleEvent(self, event):
- """
- Handles an AT-SPI event by either queuing it for later dispatch when the
- L{Registry.async} flag is set, or dispatching it immediately.
-
- @param event: AT-SPI event
- @type event: L{event.Event}
- """
- if self.async:
- # queue for now
- self.queue.put_nowait(event)
- else:
- # dispatch immediately
- self._dispatchEvent(event)
-
- def _dispatchEvent(self, event):
- """
- Dispatches L{event.Event}s to registered clients. Clients are called in
- the order they were registered for the given AT-SPI event. If any client
- returns True, callbacks cease for the event for clients of this registry
- instance. Clients of other registry instances and clients in other processes
- are unaffected.
-
- @param event: AT-SPI event
- @type event: L{event.Event}
- """
- et = event.type
- try:
- # try to get the client registered for this event type
- clients = self.clients[et.name]
- except KeyError:
- try:
- # we may not have registered for the complete subtree of events
- # if our tree does not list all of a certain type (e.g.
- # object:state-changed:*); try again with klass and major only
- if et.detail is not None:
- # Strip the 'detail' field.
- clients = self.clients['%s:%s:%s' % (et.klass, et.major, et.minor)]
- elif et.minor is not None:
- # The event could possibly be object:state-changed:*.
- clients = self.clients['%s:%s' % (et.klass, et.major)]
- except KeyError:
- # client may have unregistered recently, ignore event
- return
- # make the call to each client
- consume = False
- for client in clients:
- try:
- consume = client(event) or False
- except Exception:
- # print the exception, but don't let it stop notification
- traceback.print_exc()
- if consume or event.consume:
- # don't allow further processing if a client returns True
- break
-
- def flushEvents(self):
- """
- Flushes the event queue by destroying it and recreating it.
- """
- self.queue = Queue.Queue()
-
- def pumpQueuedEvents(self, num=-1):
- """
- Provides asynch processing of events in the queue by executeing them with
- _dispatchEvent() (as is done immediately when synch processing).
- This method would normally be called from a main loop or idle function.
-
- @param num: Number of events to pump. If number is negative it pumps
- the entire queue. Default is -1.
- @type num: integer
- @return: True if queue is not empty after events were pumped.
- @rtype: boolean
- """
- if num < 0:
- # Dequeue as many events as currently in the queue.
- num = self.queue.qsize()
- for i in xrange(num):
- try:
- # get next waiting event
- event = self.queue.get_nowait()
- except Queue.Empty:
- break
- self._dispatchEvent(event)
-
- return not self.queue.empty()
-
- def _registerClients(self, client, name):
- """
- Internal method that recursively associates a client with AT-SPI event
- names. Allows a client to incompletely specify an event name in order to
- register for subevents without specifying their full names manually.
-
- @param client: Client callback to receive event notifications
- @type client: callable
- @param name: Partial or full event name
- @type name: string
- """
- try:
- # look for an event name in our event tree dictionary
- events = constants.EVENT_TREE[name]
- except KeyError:
- # if the event name doesn't exist, it's a leaf event meaning there are
- # no subtypes for that event
- # add this client to the list of clients already in the dictionary
- # using the event name as the key; if there are no clients yet for this
- # event, insert an empty list into the dictionary before appending
- # the client
- et = event.EventType(name)
- clients = self.clients.setdefault(et.name, [])
- try:
- # if this succeeds, this client is already registered for the given
- # event type, so ignore the request
- clients.index(client)
- except ValueError:
- # else register the client
- clients.append(client)
- self._registerObserver(name)
- else:
- # if the event name does exist in the tree, there are subevents for
- # this event; loop through them calling this method again to get to
- # the leaf events
- for e in events:
- self._registerClients(client, e)
-
- def _unregisterClients(self, client, name):
- """
- Internal method that recursively unassociates a client with AT-SPI event
- names. Allows a client to incompletely specify an event name in order to
- unregister for subevents without specifying their full names manually.
-
- @param client: Client callback to receive event notifications
- @type client: callable
- @param name: Partial or full event name
- @type name: string
- """
- missed = False
- try:
- # look for an event name in our event tree dictionary
- events = constants.EVENT_TREE[name]
- except KeyError:
- try:
- # if the event name doesn't exist, it's a leaf event meaning there are
- # no subtypes for that event
- # get the list of registered clients and try to remove the one provided
- et = event.EventType(name)
- clients = self.clients[et.name]
- clients.remove(client)
- self._unregisterObserver(name)
- except (ValueError, KeyError):
- # ignore any exceptions indicating the client is not registered
- missed = True
- return missed
- # if the event name does exist in the tree, there are subevents for this
- # event; loop through them calling this method again to get to the leaf
- # events
- for e in events:
- missed |= self._unregisterClients(client, e)
- return missed
-
- def _registerObserver(self, name):
- """
- Creates a new L{_Observer} to watch for events of the given type or
- returns the existing observer if one is already registered. One
- L{_Observer} is created for each leaf in the L{constants.EVENT_TREE} or
- any event name not found in the tree.
-
- @param name: Raw name of the event to observe
- @type name: string
- @return: L{_Observer} object that is monitoring the event
- @rtype: L{_Observer}
- """
- et = event.EventType(name)
- try:
- # see if an observer already exists for this event
- ob = self.observers[et.name]
- except KeyError:
- # build a new observer if one does not exist
- ob = _EventObserver(self)
- # we have to register for the raw name because it may be different from
- # the parsed name determined by EventType (e.g. trailing ':' might be
- # missing)
- ob.register(self.reg, name)
- self.observers[et.name] = ob
- # increase our client ref count so we know someone new is watching for the
- # event
- ob.clientRef()
- return ob
-
- def _unregisterObserver(self, name):
- """
- Destroys an existing L{_Observer} for the given event type only if no
- clients are registered for the events it is monitoring.
-
- @param name: Name of the event to observe
- @type name: string
- @raise KeyError: When an observer for the given event is not regist
"""
- et = event.EventType(name)
- # see if an observer already exists for this event
- ob = self.observers[et.name]
- ob.clientUnref()
- if ob.getClientRefCount() == 0:
- ob.unregister(self.reg, name)
- del self.observers[et.name]
+ return Desktop(self._cache)
else:
return False
- def get_root(self):
+ def get_application_at_index(self, index):
return create_accessible(self,
self._bus_name,
self._accessible_cache.root,
interfaces.ATSPI_ACCESSIBLE,
connection=self._connection)
- root = property(fget=get_root)
+ def get_application_count(self):
+ return 1
#END----------------------------------------------------------------------------
self._path = path
def setup(self, test):
- self._cache = pyatspi.TestApplicationCache(self._bus, self._path)
+ self._registry = pyatspi.registry.Registry(self._path)
+ self._desktop = self._registry.getDesktop(0)
def test_name(self, test):
- root = self._cache.root
+ root = self._desktop.getChildAtIndex(0)
test.assertEqual(root.name, "main", "Expected name - \"main\". Recieved - \"%s\"" % (root.name,))
def test_getChildAtIndex(self, test):
- root = self._cache.root
+ root = self._desktop.getChildAtIndex(0)
a = root.getChildAtIndex(0)
test.assertEqual(a.name, "gnome-settings-daemon",
"Expected name - \"gnome-settings-daemon\". Recieved - \"%s\"" % (a.name,))
"Expected name - \"nautilus\". Recieved - \"%s\"" % (c.name,))
def test_isEqual(self, test):
- root = self._cache.root
+ root = self._desktop.getChildAtIndex(0)
a = root.getChildAtIndex(1)
if not a.isEqual(a):
test.fail("Different accessibles found equal")
def test_getApplication(self, test):
- root = self._cache.root
+ root = self._desktop.getChildAtIndex(0)
application = root.getApplication()
if not root.isEqual(application):
test.fail("Root accessible does not provide itself as its Application")
def test_getAttributes(self, test):
- root = self._cache.root
+ root = self._desktop.getChildAtIndex(0)
#TODO The AttributeSet test needs expanding. Check attributes are passed correctly.
attr = root.getAttributes()
def test_parent(self, test):
- root = self._cache.root
+ root = self._desktop.getChildAtIndex(0)
a = root.getChildAtIndex(1)
pa = a.parent
test.fail("Child does not correctly report its parent")
def test_getIndexInParent(self, test):
- root = self._cache.root
+ root = self._desktop.getChildAtIndex(0)
for i in range(0, root.childCount):
child = root.getChildAtIndex(i)
test.assertEqual(i, child.getIndexInParent(), "Childs index in parent reported incorrectly")
def test_getLocalizedRoleName(self, test):
- root = self._cache.root
+ root = self._desktop.getChildAtIndex(0)
ans = "window"
res = root.getLocalizedRoleName()
"Expected LocalizedRoleName - \"%s\". Recieved - \"%s\"" % (ans, res,))
def test_getRelationSet(self, test):
- root = self._cache.root
+ root = self._desktop.getChildAtIndex(0)
# Complete test of Relation interface is separate
rset = root.getRelationSet()
def test_getRole(self, test):
- root = self._cache.root
+ root = self._desktop.getChildAtIndex(0)
test.assertEqual(root.getRole(), 69,
"Expected role - \"69\". Recieved - \"%d\"" % (root.getRole(),))
def test_getRoleName(self, test):
- root = self._cache.root
+ root = self._desktop.getChildAtIndex(0)
ans = "window"
res = root.getRoleName()
def test_getState(self, test):
# Complete test of StateSet interface is separate
- root = self._cache.root
+ root = self._desktop.getChildAtIndex(0)
state = root.getState()
list = state.getStates()
def test_childCount(self, test):
- root = self._cache.root
+ root = self._desktop.getChildAtIndex(0)
test.assertEqual(root.childCount, 11,
"Expected role - \"11\". Recieved - \"%d\"" % (root.childCount,))
def test_description(self, test):
- root = self._cache.root
+ root = self._desktop.getChildAtIndex(0)
description = "The main accessible object, root of the accessible tree"
test.assertEqual(root.description, description,
"Expected description - \"%s\". Recieved - \"%s\"" % (description, root.description,))
It checks a tree of these values is correctly
passed from Application to AT.
"""
- root = self._cache.root
+ root = self._desktop.getChildAtIndex(0)
doc = minidom.Document()
_createNode(root, doc)
import pyatspi
from pyatspi import Accessible
+from pyatspi import BoundingBox
ATSPI_LAYER_WIDGET = 3
ATSPI_LAYER_MDI = 4
self._path = path
def setup(self, test):
- self._cache = pyatspi.TestApplicationCache(self._bus, self._path)
+ self._registry = pyatspi.registry.Registry(self._path)
+ self._desktop = self._registry.getDesktop(0)
def test_contains(self, test):
pass
pass
def test_getExtents(self, test):
- root = self._cache.root
+ root = self._desktop.getChildAtIndex(0)
one = root.getChildAtIndex(0)
two = root.getChildAtIndex(1)
for expected, comp in zip(extents_expected, comps):
extents = comp.getExtents(0)
- test.assertEqual(extents, expected,
+ test.assertEqual(extents, BoundingBox(*expected),
"Extents not correct. Expected (%d, %d, %d, %d), Recieved (%d, %d, %d, %d)"
% (expected[0], expected[1], expected[2], expected[3],
extents[0], extents[1], extents[2], extents[3]))
def test_getPosition(self, test):
pass
- root = self._cache.root
+ root = self._desktop.getChildAtIndex(0)
one = root.getChildAtIndex(0)
two = root.getChildAtIndex(1)
% (expected[0], expected[1], position[0], position[1]))
def test_getSize(self, test):
- root = self._cache.root
+ root = self._desktop.getChildAtIndex(0)
one = root.getChildAtIndex(0)
two = root.getChildAtIndex(1)
% (expected[0], expected[1], size[0], size[1]))
def test_getLayer(self, test):
- root = self._cache.root
+ root = self._desktop.getChildAtIndex(0)
one = root.getChildAtIndex(0)
two = root.getChildAtIndex(1)
% (layer, expected))
def test_getMDIZOrder(self, test):
- root = self._cache.root
+ root = self._desktop.getChildAtIndex(0)
one = root.getChildAtIndex(0)
two = root.getChildAtIndex(1)
--- /dev/null
+import dbus
+import gobject
+import os.path
+
+from xml.dom import minidom
+import os
+
+from pasytest import PasyTest as _PasyTest
+
+import pyatspi
+
+class DesktopTest(_PasyTest):
+
+ __tests__ = ["setup",
+ "test_name",
+ "test_getChildAtIndex",
+ "test_isEqual",
+ "test_getApplication",
+ "test_getAttributes",
+ "test_parent",
+ "test_getIndexInParent",
+ "test_getLocalizedRoleName",
+ "test_getRelationSet",
+ "test_getRole",
+ "test_getRoleName",
+ "test_getState",
+ "test_childCount",
+ "test_description",
+ "test_contains",
+ "test_getAccessibleAtPoint",
+ "test_getExtents",
+ "test_getPosition",
+ "test_getSize",
+ "test_getLayer",
+ "test_getMDIZOrder",
+ "test_grabFocus",
+ "test_registerFocusHandler",
+ "test_deregisterFocusHandler",
+ "test_getAlpha",
+ "teardown",
+ ]
+
+ def __init__(self, bus, path):
+ _PasyTest.__init__(self, "Accessible", False)
+ self._bus = bus
+ self._path = path
+
+ def setup(self, test):
+ self._registry = pyatspi.registry.Registry(self._path)
+ self._desktop = self._registry.getDesktop(0)
+
+ def test_name(self, test):
+ name = self._desktop.name
+ test.assertEqual(name, "main", "Expected name - \"main\". Recieved - \"%s\"" % (name,))
+
+ def test_getChildAtIndex(self, test):
+ root = self._desktop.getChildAtIndex(0)
+ test.assertEqual(root.name, "main",
+ "Expected name - \"main\". Recieved - \"%s\"" % (root.name,))
+
+ def test_isEqual(self, test):
+ if not self._desktop.isEqual(self._desktop):
+ test.fail("Same accessible found unequal to self")
+
+ root = self._desktop.getChildAtIndex(0)
+ if root.isEqual(self._desktop):
+ test.fail("Different accessibles found equal")
+ if self._desktop.isEqual(root):
+ test.fail("Different accessibles found equal")
+
+ def test_getApplication(self, test):
+ test.assertEqual(self._desktop.getApplication(), None,
+ "Expected application - \"None\". Recieved - \"%s\"" % (self._desktop.getApplication(),))
+
+ def test_getAttributes(self, test):
+ test.assertEqual(self._desktop.getAttributes(), [],
+ "Expected attributes - \"[]\". Recieved - \"%s\"" % (self._desktop.getAttributes(),))
+
+ def test_parent(self, test):
+ test.assertEqual(self._desktop.parent, None,
+ "Expected parent - \"None\". Recieved - \"%s\"" % (self._desktop.parent,))
+
+ def test_getIndexInParent(self, test):
+ test.assertEqual(self._desktop.getIndexInParent(), -1,
+ "Expected index - \"-1\". Recieved - \"%d\"" % (self._desktop.getIndexInParent(),))
+
+ def test_getLocalizedRoleName(self, test):
+ ans = "unknown"
+ res = self._desktop.getLocalizedRoleName()
+ test.assertEqual(ans, res,
+ "Expected LocalizedRoleName - \"%s\". Recieved - \"%s\"" % (ans, res,))
+
+ def test_getRelationSet(self, test):
+ rset = self._desktop.getRelationSet()
+ test.assertEqual(rset, [],
+ "Expected relation set - \"[]\". Recieved - \"%s\"" % (rset,))
+
+ def test_getRole(self, test):
+ test.assertEqual(self._desktop.getRole(), pyatspi.ROLE_UNKNOWN,
+ "Expected role - \"ROLE_UNKNOWN\". Recieved - \"%s\"" % (self._desktop.getRole(),))
+
+ def test_getRoleName(self, test):
+ ans = "unknown"
+ res = self._desktop.getRoleName()
+ test.assertEqual(ans, res,
+ "Expected RoleName - \"%s\". Recieved - \"%s\"" % (ans, res,))
+
+ def test_getState(self, test):
+ state = self._desktop.getState()
+ res = pyatspi.StateSet()
+ if not state.equals(res):
+ test.fail("Desktop state set is not empty")
+
+ def test_childCount(self, test):
+ test.assertEqual(self._desktop.childCount, 1,
+ "Expected childCount - \"1\". Recieved - \"%d\"" % (self._desktop.childCount,))
+
+ def test_description(self, test):
+ description = ""
+ test.assertEqual(self._desktop.description, description,
+ "Expected description - \"%s\". Recieved - \"%s\"" % (description, self._desktop.description,))
+
+ def test_contains(self, test):
+ pass
+
+ def test_getAccessibleAtPoint(self, test):
+ pass
+
+ def test_getExtents(self, test):
+ comp = self._desktop.queryComponent()
+
+ extents = comp.getExtents(0)
+ expected = pyatspi.BoundingBox(*(0,0,1024, 768))
+ test.assertEqual(extents, expected,
+ "Extents not correct. Expected (%d, %d, %d, %d), Recieved (%d, %d, %d, %d)"
+ % (expected[0], expected[1], expected[2], expected[3],
+ extents[0], extents[1], extents[2], extents[3]))
+
+ def test_getPosition(self, test):
+ comp = self._desktop.queryComponent()
+
+ position = comp.getPosition(0)
+ test.assertEqual(position, (0,0),
+ "Position not correct. Expected (%d, %d) Recieved (%d, %d)"
+ % (0, 0, position[0], position[1]))
+
+ def test_getSize(self, test):
+ comp = self._desktop.queryComponent()
+
+ size = comp.getSize()
+ test.assertEqual(size, (1024, 768),
+ "Size not correct. Expected (%d, %d) Recieved (%d, %d)"
+ % (1024, 768, size[0], size[1]))
+
+ def test_getLayer(self, test):
+ comp = self._desktop.queryComponent()
+
+ layer = comp.getLayer()
+ test.assertEqual(layer, pyatspi.LAYER_WIDGET,
+ "Layer not correct. Expected %d, Recieved %d"
+ % (layer, pyatspi.LAYER_WIDGET))
+
+ def test_getMDIZOrder(self, test):
+ comp = self._desktop.queryComponent()
+
+ mdizo = comp.getMDIZOrder()
+ test.assertEqual(mdizo, 0,
+ "ZOrder not correct. Expected %d, Recieved %d"
+ % (0, mdizo))
+
+ def test_grabFocus(self, test):
+ pass
+
+ def test_registerFocusHandler(self, test):
+ pass
+
+ def test_deregisterFocusHandler(self, test):
+ pass
+
+ def test_getAlpha(self, test):
+ pass
+
+ def teardown(self, test):
+ pass