1 # SPDX-License-Identifier: BSD-3-Clause
9 # To be loaded dynamically as needed
14 """Netlink spec element.
16 Abstract element of the Netlink spec. Implements the dictionary interface
17 for access to the raw spec. Supports iterative resolution of dependencies
18 across elements and class inheritance levels. The elements of the spec
19 may refer to each other, and although loops should be very rare, having
20 to maintain correct ordering of instantiation is painful, so the resolve()
21 method should be used to perform parts of init which require access to
22 other parts of the spec.
25 yaml raw spec as loaded from the spec file
26 family back reference to the full family
28 name name of the entity as listed in the spec (optional)
29 ident_name name which can be safely used as identifier in code (optional)
31 def __init__(self, family, yaml):
35 if 'name' in self.yaml:
36 self.name = self.yaml['name']
37 self.ident_name = self.name.replace('-', '_')
39 self._super_resolved = False
40 family.add_unresolved(self)
42 def __getitem__(self, key):
45 def __contains__(self, key):
46 return key in self.yaml
48 def get(self, key, default=None):
49 return self.yaml.get(key, default)
51 def resolve_up(self, up):
52 if not self._super_resolved:
54 self._super_resolved = True
60 class SpecAttr(SpecElement):
61 """ Single Netlink atttribute type
63 Represents a single attribute type within an attr space.
66 value numerical ID when serialized
67 attr_set Attribute Set containing this attr
69 def __init__(self, family, attr_set, yaml, value):
70 super().__init__(family, yaml)
73 self.attr_set = attr_set
74 self.is_multi = yaml.get('multi-attr', False)
77 class SpecAttrSet(SpecElement):
78 """ Netlink Attribute Set class.
80 Represents a ID space of attributes within Netlink.
82 Note that unlike other elements, which expose contents of the raw spec
83 via the dictionary interface Attribute Set exposes attributes by name.
86 attrs ordered dict of all attributes (indexed by name)
87 attrs_by_val ordered dict of all attributes (indexed by value)
88 subset_of parent set if this is a subset, otherwise None
90 def __init__(self, family, yaml):
91 super().__init__(family, yaml)
93 self.subset_of = self.yaml.get('subset-of', None)
95 self.attrs = collections.OrderedDict()
96 self.attrs_by_val = collections.OrderedDict()
99 for elem in self.yaml['attributes']:
103 attr = self.new_attr(elem, val)
104 self.attrs[attr.name] = attr
105 self.attrs_by_val[attr.value] = attr
108 def new_attr(self, elem, value):
109 return SpecAttr(self.family, self, elem, value)
111 def __getitem__(self, key):
112 return self.attrs[key]
114 def __contains__(self, key):
115 return key in self.attrs
118 yield from self.attrs
121 return self.attrs.items()
124 class SpecOperation(SpecElement):
127 Information about a single Netlink operation.
130 value numerical ID when serialized, None if req/rsp values differ
132 req_value numerical ID when serialized, user -> kernel
133 rsp_value numerical ID when serialized, user <- kernel
134 is_call bool, whether the operation is a call
135 is_async bool, whether the operation is a notification
136 is_resv bool, whether the operation does not exist (it's just a reserved ID)
137 attr_set attribute set name
139 yaml raw spec as loaded from the spec file
141 def __init__(self, family, yaml, req_value, rsp_value):
142 super().__init__(family, yaml)
144 self.value = req_value if req_value == rsp_value else None
145 self.req_value = req_value
146 self.rsp_value = rsp_value
148 self.is_call = 'do' in yaml or 'dump' in yaml
149 self.is_async = 'notify' in yaml or 'event' in yaml
150 self.is_resv = not self.is_async and not self.is_call
154 delattr(self, "attr_set")
157 self.resolve_up(super())
159 if 'attribute-set' in self.yaml:
160 attr_set_name = self.yaml['attribute-set']
161 elif 'notify' in self.yaml:
162 msg = self.family.msgs[self.yaml['notify']]
163 attr_set_name = msg['attribute-set']
167 raise Exception(f"Can't resolve attribute set for op '{self.name}'")
169 self.attr_set = self.family.attr_sets[attr_set_name]
172 class SpecFamily(SpecElement):
173 """ Netlink Family Spec class.
175 Netlink family information loaded from a spec (e.g. in YAML).
176 Takes care of unfolding implicit information which can be skipped
177 in the spec itself for brevity.
179 The class can be used like a dictionary to access the raw spec
180 elements but that's usually a bad idea.
183 proto protocol type (e.g. genetlink)
185 attr_sets dict of attribute sets
186 msgs dict of all messages (index by name)
187 msgs_by_value dict of all messages (indexed by name)
188 ops dict of all valid requests / responses
190 def __init__(self, spec_path, schema_path=None):
191 with open(spec_path, "r") as stream:
192 spec = yaml.safe_load(stream)
194 self._resolution_list = []
196 super().__init__(self, spec)
198 self.proto = self.yaml.get('protocol', 'genetlink')
200 if schema_path is None:
201 schema_path = os.path.dirname(os.path.dirname(spec_path)) + f'/{self.proto}.yaml'
205 with open(schema_path, "r") as stream:
206 schema = yaml.safe_load(stream)
208 if jsonschema is None:
209 jsonschema = importlib.import_module("jsonschema")
211 jsonschema.validate(self.yaml, schema)
213 self.attr_sets = collections.OrderedDict()
214 self.msgs = collections.OrderedDict()
215 self.req_by_value = collections.OrderedDict()
216 self.rsp_by_value = collections.OrderedDict()
217 self.ops = collections.OrderedDict()
219 last_exception = None
220 while len(self._resolution_list) > 0:
222 unresolved = self._resolution_list
223 self._resolution_list = []
225 for elem in unresolved:
228 except (KeyError, AttributeError) as e:
229 self._resolution_list.append(elem)
233 resolved.append(elem)
235 if len(resolved) == 0:
238 def new_attr_set(self, elem):
239 return SpecAttrSet(self, elem)
241 def new_operation(self, elem, req_val, rsp_val):
242 return SpecOperation(self, elem, req_val, rsp_val)
244 def add_unresolved(self, elem):
245 self._resolution_list.append(elem)
247 def _dictify_ops_unified(self):
249 for elem in self.yaml['operations']['list']:
253 op = self.new_operation(elem, val, val)
256 self.msgs[op.name] = op
258 def _dictify_ops_directional(self):
259 req_val = rsp_val = 0
260 for elem in self.yaml['operations']['list']:
263 rsp_val = elem['value']
264 req_val_next = req_val
265 rsp_val_next = rsp_val + 1
267 elif 'do' in elem or 'dump' in elem:
268 mode = elem['do'] if 'do' in elem else elem['dump']
270 v = mode.get('request', {}).get('value', None)
273 v = mode.get('reply', {}).get('value', None)
277 rsp_inc = 1 if 'reply' in mode else 0
278 req_val_next = req_val + 1
279 rsp_val_next = rsp_val + rsp_inc
281 raise Exception("Can't parse directional ops")
283 op = self.new_operation(elem, req_val, rsp_val)
284 req_val = req_val_next
285 rsp_val = rsp_val_next
287 self.msgs[op.name] = op
290 self.resolve_up(super())
292 for elem in self.yaml['attribute-sets']:
293 attr_set = self.new_attr_set(elem)
294 self.attr_sets[elem['name']] = attr_set
296 msg_id_model = self.yaml['operations'].get('enum-model', 'unified')
297 if msg_id_model == 'unified':
298 self._dictify_ops_unified()
299 elif msg_id_model == 'directional':
300 self._dictify_ops_directional()
302 for op in self.msgs.values():
303 if op.req_value is not None:
304 self.req_by_value[op.req_value] = op
305 if op.rsp_value is not None:
306 self.rsp_by_value[op.rsp_value] = op
307 if not op.is_async and 'attribute-set' in op:
308 self.ops[op.name] = op