1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
import logging
import time
from gi.repository import Atspi
from gi.repository import GLib
Atspi.set_timeout(-1, -1)
def get_root():
return Node(Atspi.get_desktop(0))
def _retry_find(func):
def wrapped(*args, **kwargs):
result = None
n_retries = 1
while n_retries <= 50:
logging.info("Try %d, name=%s role_name=%s" %
(n_retries,
kwargs.get("name", None),
kwargs.get("role_name", None)))
try:
result = func(*args, **kwargs)
except GLib.GError, e:
# The application is not responding, try again
if e.code == Atspi.Error.IPC:
continue
logging.error("GError code %d", e.code)
raise
expect_none = kwargs.get("expect_none", False)
if (not expect_none and result) or \
(expect_none and not result):
return result
time.sleep(1)
n_retries = n_retries + 1
return result
return wrapped
class Node:
def __init__(self, accessible):
self._accessible = accessible
def dump(self):
lines = []
self._crawl_accessible(self, 0, lines)
return "\n".join(lines)
def do_action(self, name):
for i in range(self._accessible.get_n_actions()):
if Atspi.Action.get_name(self._accessible, i) == name:
self._accessible.do_action(i)
def click(self, button=1):
point = self._accessible.get_position(Atspi.CoordType.SCREEN)
Atspi.generate_mouse_event(point.x, point.y, "b%sc" % button)
@property
def name(self):
return self._accessible.get_name()
@property
def role_name(self):
return self._accessible.get_role_name()
@property
def text(self):
return Atspi.Text.get_text(self._accessible, 0, -1)
def get_children(self):
children = []
for i in range(self._accessible.get_child_count()):
child = self._accessible.get_child_at_index(i)
# We sometimes get none children from atspi
if child is not None:
children.append(Node(child))
return children
@_retry_find
def find_children(self, name=None, role_name=None):
def predicate(node):
return self._predicate(node, name, role_name)
descendants = []
self._find_all_descendants(self, predicate, descendants)
if not descendants:
return []
return descendants
@_retry_find
def find_child(self, name=None, role_name=None, expect_none=False):
def predicate(node):
return self._predicate(node, name, role_name)
node = self._find_descendant(self, predicate)
if node is None:
return None
return node
def __str__(self):
return "[%s | %s]" % (self.name, self.role_name)
def _predicate(self, node, name, role_name):
if name is not None and name != node.name:
return False
if role_name is not None and role_name != node.role_name:
return False
return True
def _find_descendant(self, node, predicate):
if predicate(node):
return node
for child in node.get_children():
descendant = self._find_descendant(child, predicate)
if descendant is not None:
return descendant
return None
def _find_all_descendants(self, node, predicate, matches):
if predicate(node):
matches.append(node)
for child in node.get_children():
self._find_all_descendants(child, predicate, matches)
def _crawl_accessible(self, node, depth, lines):
lines.append(" " * depth + str(node))
for child in node.get_children():
self._crawl_accessible(child, depth + 1, lines)
|