Imported Upstream version 7.59.0
[platform/upstream/curl.git] / tests / python_dependencies / impacket / spnego.py
1 # Copyright (c) 2003-2016 CORE Security Technologies
2 #
3 # This software is provided under under a slightly modified version
4 # of the Apache Software License. See the accompanying LICENSE file
5 # for more information.
6 #
7 # Author: Alberto Solino (beto@coresecurity.com)
8 #
9 # Description:
10 #   SPNEGO functions used by SMB, SMB2/3 and DCERPC
11 #
12
13 from struct import pack, unpack, calcsize
14
15 ############### GSS Stuff ################
16 GSS_API_SPNEGO_UUID              = '\x2b\x06\x01\x05\x05\x02' 
17 ASN1_SEQUENCE                    = 0x30
18 ASN1_AID                         = 0x60
19 ASN1_OID                         = 0x06
20 ASN1_OCTET_STRING                = 0x04
21 ASN1_MECH_TYPE                   = 0xa0
22 ASN1_MECH_TOKEN                  = 0xa2
23 ASN1_SUPPORTED_MECH              = 0xa1
24 ASN1_RESPONSE_TOKEN              = 0xa2
25 ASN1_ENUMERATED                  = 0x0a
26 MechTypes = {
27 '+\x06\x01\x04\x01\x827\x02\x02\x1e': 'SNMPv2-SMI::enterprises.311.2.2.30',
28 '+\x06\x01\x04\x01\x827\x02\x02\n': 'NTLMSSP - Microsoft NTLM Security Support Provider',
29 '*\x86H\x82\xf7\x12\x01\x02\x02': 'MS KRB5 - Microsoft Kerberos 5',
30 '*\x86H\x86\xf7\x12\x01\x02\x02': 'KRB5 - Kerberos 5',
31 '*\x86H\x86\xf7\x12\x01\x02\x02\x03': 'KRB5 - Kerberos 5 - User to User'
32 }
33 TypesMech = dict((v,k) for k, v in MechTypes.iteritems())
34
35 def asn1encode(data = ''):
36         #res = asn1.SEQUENCE(str).encode()
37         #import binascii
38         #print '\nalex asn1encode str: %s\n' % binascii.hexlify(str)
39         if 0 <= len(data) <= 0x7F:
40             res = pack('B', len(data)) + data
41         elif 0x80 <= len(data) <= 0xFF:
42             res = pack('BB', 0x81, len(data)) + data
43         elif 0x100 <= len(data) <= 0xFFFF:
44             res = pack('!BH', 0x82, len(data)) + data
45         elif 0x10000 <= len(data) <= 0xffffff:
46             res = pack('!BBH', 0x83, len(data) >> 16, len(data) & 0xFFFF) + data
47         elif 0x1000000 <= len(data) <= 0xffffffff:
48             res = pack('!BL', 0x84, len(data)) + data
49         else:
50             raise Exception('Error in asn1encode')
51         return str(res)
52
53 def asn1decode(data = ''):
54         len1 = unpack('B', data[:1])[0]
55         data = data[1:]
56         if len1 == 0x81:
57             pad = calcsize('B')
58             len2 = unpack('B',data[:pad])[0]
59             data = data[pad:]
60             ans = data[:len2]
61         elif len1 == 0x82:
62             pad = calcsize('H')
63             len2 = unpack('!H', data[:pad])[0]
64             data = data[pad:]
65             ans = data[:len2]
66         elif len1 == 0x83:
67             pad = calcsize('B') + calcsize('!H')
68             len2, len3 = unpack('!BH', data[:pad])
69             data = data[pad:]
70             ans = data[:len2 << 16 + len3]
71         elif len1 == 0x84:
72             pad = calcsize('!L')
73             len2 = unpack('!L', data[:pad])[0]
74             data = data[pad:]
75             ans = data[:len2]
76         # 1 byte length, string <= 0x7F
77         else:
78             pad = 0
79             ans = data[:len1]
80         return ans, len(ans)+pad+1
81
82 class GSSAPI:
83 # Generic GSSAPI Header Format 
84     def __init__(self, data = None):
85         self.fields = {}
86         self['UUID'] = GSS_API_SPNEGO_UUID
87         if data:
88              self.fromString(data)
89         pass
90
91     def __setitem__(self,key,value):
92         self.fields[key] = value
93
94     def __getitem__(self, key):
95         return self.fields[key]
96
97     def __delitem__(self, key):
98         del self.fields[key]
99
100     def __len__(self):
101         return len(self.getData())
102
103     def __str__(self):
104         return len(self.getData())
105
106     def fromString(self, data = None):
107         # Manual parse of the GSSAPI Header Format
108         # It should be something like
109         # AID = 0x60 TAG, BER Length
110         # OID = 0x06 TAG
111         # GSSAPI OID
112         # UUID data (BER Encoded)
113         # Payload
114         next_byte = unpack('B',data[:1])[0]
115         if next_byte != ASN1_AID:
116             raise Exception('Unknown AID=%x' % next_byte)
117         data = data[1:]
118         decode_data, total_bytes = asn1decode(data) 
119         # Now we should have a OID tag
120         next_byte = unpack('B',decode_data[:1])[0]
121         if next_byte !=  ASN1_OID:
122             raise Exception('OID tag not found %x' % next_byte)
123         decode_data = decode_data[1:]
124         # Now the OID contents, should be SPNEGO UUID
125         uuid, total_bytes = asn1decode(decode_data)                
126         self['OID'] = uuid
127         # the rest should be the data
128         self['Payload'] = decode_data[total_bytes:]
129         #pass
130         
131     def dump(self):
132         for i in self.fields.keys():
133             print "%s: {%r}" % (i,self[i])
134
135     def getData(self):
136         ans = pack('B',ASN1_AID)
137         ans += asn1encode(
138                pack('B',ASN1_OID) + 
139                asn1encode(self['UUID']) +
140                self['Payload'] )
141         return ans
142
143 class SPNEGO_NegTokenResp:
144     # http://tools.ietf.org/html/rfc4178#page-9
145     # NegTokenResp ::= SEQUENCE {
146     #     negState       [0] ENUMERATED {
147     #         accept-completed    (0),
148     #         accept-incomplete   (1),
149     #         reject              (2),
150     #         request-mic         (3)
151     #     }                                 OPTIONAL,
152     #       -- REQUIRED in the first reply from the target
153     #     supportedMech   [1] MechType      OPTIONAL,
154     #       -- present only in the first reply from the target
155     #     responseToken   [2] OCTET STRING  OPTIONAL,
156     #     mechListMIC     [3] OCTET STRING  OPTIONAL,
157     #     ...
158     # }
159     # This structure is not prepended by a GSS generic header!
160     SPNEGO_NEG_TOKEN_RESP = 0xa1
161     SPNEGO_NEG_TOKEN_TARG = 0xa0
162
163     def __init__(self, data = None):
164         self.fields = {}
165         if data:
166              self.fromString(data)
167         pass
168
169     def __setitem__(self,key,value):
170         self.fields[key] = value
171
172     def __getitem__(self, key):
173         return self.fields[key]
174
175     def __delitem__(self, key):
176         del self.fields[key]
177
178     def __len__(self):
179         return len(self.getData())
180
181     def __str__(self):
182         return len(self.getData())
183
184     def fromString(self, data = 0):
185         payload = data
186         next_byte = unpack('B', payload[:1])[0]
187         if next_byte != SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_RESP:
188             raise Exception('NegTokenResp not found %x' % next_byte)
189         payload = payload[1:]
190         decode_data, total_bytes = asn1decode(payload)
191         next_byte = unpack('B', decode_data[:1])[0]
192         if next_byte != ASN1_SEQUENCE:
193             raise Exception('SEQUENCE tag not found %x' % next_byte)
194         decode_data = decode_data[1:]
195         decode_data, total_bytes = asn1decode(decode_data)
196         next_byte = unpack('B',decode_data[:1])[0]
197
198         if next_byte != ASN1_MECH_TYPE:
199             # MechType not found, could be an AUTH answer
200             if next_byte != ASN1_RESPONSE_TOKEN:
201                raise Exception('MechType/ResponseToken tag not found %x' % next_byte)
202         else:
203             decode_data2 = decode_data[1:]
204             decode_data2, total_bytes = asn1decode(decode_data2)
205             next_byte = unpack('B', decode_data2[:1])[0]
206             if next_byte != ASN1_ENUMERATED:
207                 raise Exception('Enumerated tag not found %x' % next_byte)
208             item, total_bytes2 = asn1decode(decode_data)
209             self['NegResult'] = item
210             decode_data = decode_data[1:]
211             decode_data = decode_data[total_bytes:]
212
213             # Do we have more data?
214             if len(decode_data) == 0:
215                 return
216
217             next_byte = unpack('B', decode_data[:1])[0]
218             if next_byte != ASN1_SUPPORTED_MECH:
219                 if next_byte != ASN1_RESPONSE_TOKEN:
220                     raise Exception('Supported Mech/ResponseToken tag not found %x' % next_byte)
221             else:
222                 decode_data2 = decode_data[1:]
223                 decode_data2, total_bytes = asn1decode(decode_data2)
224                 next_byte = unpack('B', decode_data2[:1])[0]
225                 if next_byte != ASN1_OID:
226                     raise Exception('OID tag not found %x' % next_byte)
227                 decode_data2 = decode_data2[1:]
228                 item, total_bytes2 = asn1decode(decode_data2)
229                 self['SupportedMech'] = item
230
231                 decode_data = decode_data[1:]
232                 decode_data = decode_data[total_bytes:]
233                 next_byte = unpack('B', decode_data[:1])[0]
234                 if next_byte != ASN1_RESPONSE_TOKEN:
235                     raise Exception('Response token tag not found %x' % next_byte)
236
237         decode_data = decode_data[1:]
238         decode_data, total_bytes = asn1decode(decode_data)
239         next_byte = unpack('B', decode_data[:1])[0]
240         if next_byte != ASN1_OCTET_STRING:
241             raise Exception('Octet string token tag not found %x' % next_byte)
242         decode_data = decode_data[1:]
243         decode_data, total_bytes = asn1decode(decode_data)
244         self['ResponseToken'] = decode_data
245
246     def dump(self):
247         for i in self.fields.keys():
248             print "%s: {%r}" % (i,self[i])
249         
250     def getData(self):
251         ans = pack('B',SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_RESP)
252         if self.fields.has_key('NegResult') and self.fields.has_key('SupportedMech'):
253             # Server resp
254             ans += asn1encode(
255                pack('B', ASN1_SEQUENCE) +
256                asn1encode(
257                pack('B',SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_TARG) +
258                asn1encode(
259                pack('B',ASN1_ENUMERATED) + 
260                asn1encode( self['NegResult'] )) +
261                pack('B',ASN1_SUPPORTED_MECH) +
262                asn1encode( 
263                pack('B',ASN1_OID) +
264                asn1encode(self['SupportedMech'])) +
265                pack('B',ASN1_RESPONSE_TOKEN ) +
266                asn1encode(
267                pack('B', ASN1_OCTET_STRING) + asn1encode(self['ResponseToken']))))
268         elif self.fields.has_key('NegResult'):
269             # Server resp
270             ans += asn1encode(
271                pack('B', ASN1_SEQUENCE) + 
272                asn1encode(
273                pack('B', SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_TARG) +
274                asn1encode(
275                pack('B',ASN1_ENUMERATED) +
276                asn1encode( self['NegResult'] ))))
277         else:
278             # Client resp
279             ans += asn1encode(
280                pack('B', ASN1_SEQUENCE) +
281                asn1encode(
282                pack('B', ASN1_RESPONSE_TOKEN) +
283                asn1encode(
284                pack('B', ASN1_OCTET_STRING) + asn1encode(self['ResponseToken']))))
285         return ans
286
287 class SPNEGO_NegTokenInit(GSSAPI):
288     # http://tools.ietf.org/html/rfc4178#page-8 
289     # NegTokeInit :: = SEQUENCE {
290     #   mechTypes       [0] MechTypeList,
291     #   reqFlags        [1] ContextFlags OPTIONAL,
292     #   mechToken       [2] OCTET STRING OPTIONAL,      
293     #   mechListMIC     [3] OCTET STRING OPTIONAL,
294     # }
295     SPNEGO_NEG_TOKEN_INIT = 0xa0
296     def fromString(self, data = 0):
297         GSSAPI.fromString(self, data)
298         payload = self['Payload']
299         next_byte = unpack('B', payload[:1])[0] 
300         if next_byte != SPNEGO_NegTokenInit.SPNEGO_NEG_TOKEN_INIT:
301             raise Exception('NegTokenInit not found %x' % next_byte)
302         payload = payload[1:]
303         decode_data, total_bytes = asn1decode(payload)
304         # Now we should have a SEQUENCE Tag
305         next_byte = unpack('B', decode_data[:1])[0]
306         if next_byte != ASN1_SEQUENCE:
307             raise Exception('SEQUENCE tag not found %x' % next_byte)
308         decode_data = decode_data[1:]
309         decode_data, total_bytes2 = asn1decode(decode_data)
310         next_byte = unpack('B',decode_data[:1])[0]
311         if next_byte != ASN1_MECH_TYPE:
312             raise Exception('MechType tag not found %x' % next_byte)
313         decode_data = decode_data[1:]
314         remaining_data = decode_data
315         decode_data, total_bytes3 = asn1decode(decode_data)
316         next_byte = unpack('B', decode_data[:1])[0]
317         if next_byte != ASN1_SEQUENCE:
318             raise Exception('SEQUENCE tag not found %x' % next_byte)
319         decode_data = decode_data[1:]
320         decode_data, total_bytes4 = asn1decode(decode_data)
321         # And finally we should have the MechTypes
322         self['MechTypes'] = []
323         while decode_data:
324            next_byte = unpack('B', decode_data[:1])[0]
325            if next_byte != ASN1_OID:    
326              # Not a valid OID, there must be something else we won't unpack
327              break
328            decode_data = decode_data[1:]
329            item, total_bytes = asn1decode(decode_data)
330            self['MechTypes'].append(item)
331            decode_data = decode_data[total_bytes:]
332
333         # Do we have MechTokens as well?
334         decode_data = remaining_data[total_bytes3:]
335         if len(decode_data) > 0:
336             next_byte = unpack('B', decode_data[:1])[0]
337             if next_byte == ASN1_MECH_TOKEN:
338                 # We have tokens in here!
339                 decode_data = decode_data[1:]
340                 decode_data, total_bytes = asn1decode(decode_data)
341                 next_byte = unpack('B', decode_data[:1])[0]
342                 if next_byte ==  ASN1_OCTET_STRING:
343                     decode_data = decode_data[1:]
344                     decode_data, total_bytes = asn1decode(decode_data)
345                     self['MechToken'] =  decode_data
346
347     def getData(self):
348         mechTypes = ''
349         for i in self['MechTypes']:
350             mechTypes += pack('B', ASN1_OID)
351             mechTypes += asn1encode(i)
352
353         mechToken = ''
354         # Do we have tokens to send?
355         if self.fields.has_key('MechToken'):
356             mechToken = pack('B', ASN1_MECH_TOKEN) + asn1encode(
357                 pack('B', ASN1_OCTET_STRING) + asn1encode(
358                     self['MechToken']))
359
360         ans = pack('B',SPNEGO_NegTokenInit.SPNEGO_NEG_TOKEN_INIT)
361         ans += asn1encode(
362                pack('B', ASN1_SEQUENCE) +
363                asn1encode(
364                pack('B', ASN1_MECH_TYPE) +
365                asn1encode(
366                pack('B', ASN1_SEQUENCE) + 
367                asn1encode(mechTypes)) + mechToken ))
368
369
370         self['Payload'] = ans
371         return GSSAPI.getData(self)
372