tools: ynl: move the enum classes to shared code
[platform/kernel/linux-starfive.git] / tools / net / ynl / lib / nlspec.py
1 # SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
2
3 import collections
4 import importlib
5 import os
6 import yaml
7
8
9 # To be loaded dynamically as needed
10 jsonschema = None
11
12
13 class SpecElement:
14     """Netlink spec element.
15
16     Abstract element of the Netlink spec. Implements the dictionary interface
17     for access to the raw spec. Supports iterative resolution of dependencies
18     across elements and class inheritance levels. The elements of the spec
19     may refer to each other, and although loops should be very rare, having
20     to maintain correct ordering of instantiation is painful, so the resolve()
21     method should be used to perform parts of init which require access to
22     other parts of the spec.
23
24     Attributes:
25         yaml        raw spec as loaded from the spec file
26         family      back reference to the full family
27
28         name        name of the entity as listed in the spec (optional)
29         ident_name  name which can be safely used as identifier in code (optional)
30     """
31     def __init__(self, family, yaml):
32         self.yaml = yaml
33         self.family = family
34
35         if 'name' in self.yaml:
36             self.name = self.yaml['name']
37             self.ident_name = self.name.replace('-', '_')
38
39         self._super_resolved = False
40         family.add_unresolved(self)
41
42     def __getitem__(self, key):
43         return self.yaml[key]
44
45     def __contains__(self, key):
46         return key in self.yaml
47
48     def get(self, key, default=None):
49         return self.yaml.get(key, default)
50
51     def resolve_up(self, up):
52         if not self._super_resolved:
53             up.resolve()
54             self._super_resolved = True
55
56     def resolve(self):
57         pass
58
59
60 class SpecEnumEntry(SpecElement):
61     """ Entry within an enum declared in the Netlink spec.
62
63     Attributes:
64         doc         documentation string
65         enum_set    back reference to the enum
66         value       numerical value of this enum (use accessors in most situations!)
67
68     Methods:
69         raw_value   raw value, i.e. the id in the enum, unlike user value which is a mask for flags
70         user_value   user value, same as raw value for enums, for flags it's the mask
71     """
72     def __init__(self, enum_set, yaml, prev, value_start):
73         if isinstance(yaml, str):
74             yaml = {'name': yaml}
75         super().__init__(enum_set.family, yaml)
76
77         self.doc = yaml.get('doc', '')
78         self.enum_set = enum_set
79
80         if 'value' in yaml:
81             self.value = yaml['value']
82         elif prev:
83             self.value = prev.value + 1
84         else:
85             self.value = value_start
86
87     def has_doc(self):
88         return bool(self.doc)
89
90     def raw_value(self):
91         return self.value
92
93     def user_value(self):
94         if self.enum_set['type'] == 'flags':
95             return 1 << self.value
96         else:
97             return self.value
98
99
100 class SpecEnumSet(SpecElement):
101     """ Enum type
102
103     Represents an enumeration (list of numerical constants)
104     as declared in the "definitions" section of the spec.
105
106     Attributes:
107         type          enum or flags
108         entries       entries by name
109     Methods:
110         get_mask      for flags compute the mask of all defined values
111     """
112     def __init__(self, family, yaml):
113         super().__init__(family, yaml)
114
115         self.type = yaml['type']
116
117         prev_entry = None
118         value_start = self.yaml.get('value-start', 0)
119         self.entries = dict()
120         for entry in self.yaml['entries']:
121             e = self.new_entry(entry, prev_entry, value_start)
122             self.entries[e.name] = e
123             prev_entry = e
124
125     def new_entry(self, entry, prev_entry, value_start):
126         return SpecEnumEntry(self, entry, prev_entry, value_start)
127
128     def has_doc(self):
129         if 'doc' in self.yaml:
130             return True
131         for entry in self.entries.values():
132             if entry.has_doc():
133                 return True
134         return False
135
136     def get_mask(self):
137         mask = 0
138         idx = self.yaml.get('value-start', 0)
139         for _ in self.entries.values():
140             mask |= 1 << idx
141             idx += 1
142         return mask
143
144
145 class SpecAttr(SpecElement):
146     """ Single Netlink atttribute type
147
148     Represents a single attribute type within an attr space.
149
150     Attributes:
151         value      numerical ID when serialized
152         attr_set   Attribute Set containing this attr
153     """
154     def __init__(self, family, attr_set, yaml, value):
155         super().__init__(family, yaml)
156
157         self.value = value
158         self.attr_set = attr_set
159         self.is_multi = yaml.get('multi-attr', False)
160
161
162 class SpecAttrSet(SpecElement):
163     """ Netlink Attribute Set class.
164
165     Represents a ID space of attributes within Netlink.
166
167     Note that unlike other elements, which expose contents of the raw spec
168     via the dictionary interface Attribute Set exposes attributes by name.
169
170     Attributes:
171         attrs      ordered dict of all attributes (indexed by name)
172         attrs_by_val  ordered dict of all attributes (indexed by value)
173         subset_of  parent set if this is a subset, otherwise None
174     """
175     def __init__(self, family, yaml):
176         super().__init__(family, yaml)
177
178         self.subset_of = self.yaml.get('subset-of', None)
179
180         self.attrs = collections.OrderedDict()
181         self.attrs_by_val = collections.OrderedDict()
182
183         if self.subset_of is None:
184             val = 1
185             for elem in self.yaml['attributes']:
186                 if 'value' in elem:
187                     val = elem['value']
188
189                 attr = self.new_attr(elem, val)
190                 self.attrs[attr.name] = attr
191                 self.attrs_by_val[attr.value] = attr
192                 val += 1
193         else:
194             real_set = family.attr_sets[self.subset_of]
195             for elem in self.yaml['attributes']:
196                 attr = real_set[elem['name']]
197                 self.attrs[attr.name] = attr
198                 self.attrs_by_val[attr.value] = attr
199
200     def new_attr(self, elem, value):
201         return SpecAttr(self.family, self, elem, value)
202
203     def __getitem__(self, key):
204         return self.attrs[key]
205
206     def __contains__(self, key):
207         return key in self.attrs
208
209     def __iter__(self):
210         yield from self.attrs
211
212     def items(self):
213         return self.attrs.items()
214
215
216 class SpecOperation(SpecElement):
217     """Netlink Operation
218
219     Information about a single Netlink operation.
220
221     Attributes:
222         value       numerical ID when serialized, None if req/rsp values differ
223
224         req_value   numerical ID when serialized, user -> kernel
225         rsp_value   numerical ID when serialized, user <- kernel
226         is_call     bool, whether the operation is a call
227         is_async    bool, whether the operation is a notification
228         is_resv     bool, whether the operation does not exist (it's just a reserved ID)
229         attr_set    attribute set name
230
231         yaml        raw spec as loaded from the spec file
232     """
233     def __init__(self, family, yaml, req_value, rsp_value):
234         super().__init__(family, yaml)
235
236         self.value = req_value if req_value == rsp_value else None
237         self.req_value = req_value
238         self.rsp_value = rsp_value
239
240         self.is_call = 'do' in yaml or 'dump' in yaml
241         self.is_async = 'notify' in yaml or 'event' in yaml
242         self.is_resv = not self.is_async and not self.is_call
243
244         # Added by resolve:
245         self.attr_set = None
246         delattr(self, "attr_set")
247
248     def resolve(self):
249         self.resolve_up(super())
250
251         if 'attribute-set' in self.yaml:
252             attr_set_name = self.yaml['attribute-set']
253         elif 'notify' in self.yaml:
254             msg = self.family.msgs[self.yaml['notify']]
255             attr_set_name = msg['attribute-set']
256         elif self.is_resv:
257             attr_set_name = ''
258         else:
259             raise Exception(f"Can't resolve attribute set for op '{self.name}'")
260         if attr_set_name:
261             self.attr_set = self.family.attr_sets[attr_set_name]
262
263
264 class SpecFamily(SpecElement):
265     """ Netlink Family Spec class.
266
267     Netlink family information loaded from a spec (e.g. in YAML).
268     Takes care of unfolding implicit information which can be skipped
269     in the spec itself for brevity.
270
271     The class can be used like a dictionary to access the raw spec
272     elements but that's usually a bad idea.
273
274     Attributes:
275         proto     protocol type (e.g. genetlink)
276
277         attr_sets  dict of attribute sets
278         msgs       dict of all messages (index by name)
279         msgs_by_value  dict of all messages (indexed by name)
280         ops        dict of all valid requests / responses
281         consts     dict of all constants/enums
282     """
283     def __init__(self, spec_path, schema_path=None):
284         with open(spec_path, "r") as stream:
285             spec = yaml.safe_load(stream)
286
287         self._resolution_list = []
288
289         super().__init__(self, spec)
290
291         self.proto = self.yaml.get('protocol', 'genetlink')
292
293         if schema_path is None:
294             schema_path = os.path.dirname(os.path.dirname(spec_path)) + f'/{self.proto}.yaml'
295         if schema_path:
296             global jsonschema
297
298             with open(schema_path, "r") as stream:
299                 schema = yaml.safe_load(stream)
300
301             if jsonschema is None:
302                 jsonschema = importlib.import_module("jsonschema")
303
304             jsonschema.validate(self.yaml, schema)
305
306         self.attr_sets = collections.OrderedDict()
307         self.msgs = collections.OrderedDict()
308         self.req_by_value = collections.OrderedDict()
309         self.rsp_by_value = collections.OrderedDict()
310         self.ops = collections.OrderedDict()
311         self.consts = collections.OrderedDict()
312
313         last_exception = None
314         while len(self._resolution_list) > 0:
315             resolved = []
316             unresolved = self._resolution_list
317             self._resolution_list = []
318
319             for elem in unresolved:
320                 try:
321                     elem.resolve()
322                 except (KeyError, AttributeError) as e:
323                     self._resolution_list.append(elem)
324                     last_exception = e
325                     continue
326
327                 resolved.append(elem)
328
329             if len(resolved) == 0:
330                 raise last_exception
331
332     def new_enum(self, elem):
333         return SpecEnumSet(self, elem)
334
335     def new_attr_set(self, elem):
336         return SpecAttrSet(self, elem)
337
338     def new_operation(self, elem, req_val, rsp_val):
339         return SpecOperation(self, elem, req_val, rsp_val)
340
341     def add_unresolved(self, elem):
342         self._resolution_list.append(elem)
343
344     def _dictify_ops_unified(self):
345         val = 1
346         for elem in self.yaml['operations']['list']:
347             if 'value' in elem:
348                 val = elem['value']
349
350             op = self.new_operation(elem, val, val)
351             val += 1
352
353             self.msgs[op.name] = op
354
355     def _dictify_ops_directional(self):
356         req_val = rsp_val = 1
357         for elem in self.yaml['operations']['list']:
358             if 'notify' in elem:
359                 if 'value' in elem:
360                     rsp_val = elem['value']
361                 req_val_next = req_val
362                 rsp_val_next = rsp_val + 1
363                 req_val = None
364             elif 'do' in elem or 'dump' in elem:
365                 mode = elem['do'] if 'do' in elem else elem['dump']
366
367                 v = mode.get('request', {}).get('value', None)
368                 if v:
369                     req_val = v
370                 v = mode.get('reply', {}).get('value', None)
371                 if v:
372                     rsp_val = v
373
374                 rsp_inc = 1 if 'reply' in mode else 0
375                 req_val_next = req_val + 1
376                 rsp_val_next = rsp_val + rsp_inc
377             else:
378                 raise Exception("Can't parse directional ops")
379
380             op = self.new_operation(elem, req_val, rsp_val)
381             req_val = req_val_next
382             rsp_val = rsp_val_next
383
384             self.msgs[op.name] = op
385
386     def resolve(self):
387         self.resolve_up(super())
388
389         for elem in self.yaml['definitions']:
390             if elem['type'] == 'enum' or elem['type'] == 'flags':
391                 self.consts[elem['name']] = self.new_enum(elem)
392             else:
393                 self.consts[elem['name']] = elem
394
395         for elem in self.yaml['attribute-sets']:
396             attr_set = self.new_attr_set(elem)
397             self.attr_sets[elem['name']] = attr_set
398
399         msg_id_model = self.yaml['operations'].get('enum-model', 'unified')
400         if msg_id_model == 'unified':
401             self._dictify_ops_unified()
402         elif msg_id_model == 'directional':
403             self._dictify_ops_directional()
404
405         for op in self.msgs.values():
406             if op.req_value is not None:
407                 self.req_by_value[op.req_value] = op
408             if op.rsp_value is not None:
409                 self.rsp_by_value[op.rsp_value] = op
410             if not op.is_async and 'attribute-set' in op:
411                 self.ops[op.name] = op