1 # Copyright (c) 2003-2016 CORE Security Technologies
3 # This software is provided under under a slightly modified version
4 # of the Apache Software License. See the accompanying LICENSE file
5 # for more information.
7 # Author: Alberto Solino (beto@coresecurity.com)
10 # SPNEGO functions used by SMB, SMB2/3 and DCERPC
13 from struct import pack, unpack, calcsize
15 ############### GSS Stuff ################
16 GSS_API_SPNEGO_UUID = '\x2b\x06\x01\x05\x05\x02'
20 ASN1_OCTET_STRING = 0x04
22 ASN1_MECH_TOKEN = 0xa2
23 ASN1_SUPPORTED_MECH = 0xa1
24 ASN1_RESPONSE_TOKEN = 0xa2
25 ASN1_ENUMERATED = 0x0a
27 '+\x06\x01\x04\x01\x827\x02\x02\x1e': 'SNMPv2-SMI::enterprises.311.2.2.30',
28 '+\x06\x01\x04\x01\x827\x02\x02\n': 'NTLMSSP - Microsoft NTLM Security Support Provider',
29 '*\x86H\x82\xf7\x12\x01\x02\x02': 'MS KRB5 - Microsoft Kerberos 5',
30 '*\x86H\x86\xf7\x12\x01\x02\x02': 'KRB5 - Kerberos 5',
31 '*\x86H\x86\xf7\x12\x01\x02\x02\x03': 'KRB5 - Kerberos 5 - User to User'
33 TypesMech = dict((v,k) for k, v in MechTypes.iteritems())
35 def asn1encode(data = ''):
36 #res = asn1.SEQUENCE(str).encode()
38 #print '\nalex asn1encode str: %s\n' % binascii.hexlify(str)
39 if 0 <= len(data) <= 0x7F:
40 res = pack('B', len(data)) + data
41 elif 0x80 <= len(data) <= 0xFF:
42 res = pack('BB', 0x81, len(data)) + data
43 elif 0x100 <= len(data) <= 0xFFFF:
44 res = pack('!BH', 0x82, len(data)) + data
45 elif 0x10000 <= len(data) <= 0xffffff:
46 res = pack('!BBH', 0x83, len(data) >> 16, len(data) & 0xFFFF) + data
47 elif 0x1000000 <= len(data) <= 0xffffffff:
48 res = pack('!BL', 0x84, len(data)) + data
50 raise Exception('Error in asn1encode')
53 def asn1decode(data = ''):
54 len1 = unpack('B', data[:1])[0]
58 len2 = unpack('B',data[:pad])[0]
63 len2 = unpack('!H', data[:pad])[0]
67 pad = calcsize('B') + calcsize('!H')
68 len2, len3 = unpack('!BH', data[:pad])
70 ans = data[:len2 << 16 + len3]
73 len2 = unpack('!L', data[:pad])[0]
76 # 1 byte length, string <= 0x7F
80 return ans, len(ans)+pad+1
83 # Generic GSSAPI Header Format
84 def __init__(self, data = None):
86 self['UUID'] = GSS_API_SPNEGO_UUID
91 def __setitem__(self,key,value):
92 self.fields[key] = value
94 def __getitem__(self, key):
95 return self.fields[key]
97 def __delitem__(self, key):
101 return len(self.getData())
104 return len(self.getData())
106 def fromString(self, data = None):
107 # Manual parse of the GSSAPI Header Format
108 # It should be something like
109 # AID = 0x60 TAG, BER Length
112 # UUID data (BER Encoded)
114 next_byte = unpack('B',data[:1])[0]
115 if next_byte != ASN1_AID:
116 raise Exception('Unknown AID=%x' % next_byte)
118 decode_data, total_bytes = asn1decode(data)
119 # Now we should have a OID tag
120 next_byte = unpack('B',decode_data[:1])[0]
121 if next_byte != ASN1_OID:
122 raise Exception('OID tag not found %x' % next_byte)
123 decode_data = decode_data[1:]
124 # Now the OID contents, should be SPNEGO UUID
125 uuid, total_bytes = asn1decode(decode_data)
127 # the rest should be the data
128 self['Payload'] = decode_data[total_bytes:]
132 for i in self.fields.keys():
133 print "%s: {%r}" % (i,self[i])
136 ans = pack('B',ASN1_AID)
139 asn1encode(self['UUID']) +
143 class SPNEGO_NegTokenResp:
144 # http://tools.ietf.org/html/rfc4178#page-9
145 # NegTokenResp ::= SEQUENCE {
146 # negState [0] ENUMERATED {
147 # accept-completed (0),
148 # accept-incomplete (1),
152 # -- REQUIRED in the first reply from the target
153 # supportedMech [1] MechType OPTIONAL,
154 # -- present only in the first reply from the target
155 # responseToken [2] OCTET STRING OPTIONAL,
156 # mechListMIC [3] OCTET STRING OPTIONAL,
159 # This structure is not prepended by a GSS generic header!
160 SPNEGO_NEG_TOKEN_RESP = 0xa1
161 SPNEGO_NEG_TOKEN_TARG = 0xa0
163 def __init__(self, data = None):
166 self.fromString(data)
169 def __setitem__(self,key,value):
170 self.fields[key] = value
172 def __getitem__(self, key):
173 return self.fields[key]
175 def __delitem__(self, key):
179 return len(self.getData())
182 return len(self.getData())
184 def fromString(self, data = 0):
186 next_byte = unpack('B', payload[:1])[0]
187 if next_byte != SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_RESP:
188 raise Exception('NegTokenResp not found %x' % next_byte)
189 payload = payload[1:]
190 decode_data, total_bytes = asn1decode(payload)
191 next_byte = unpack('B', decode_data[:1])[0]
192 if next_byte != ASN1_SEQUENCE:
193 raise Exception('SEQUENCE tag not found %x' % next_byte)
194 decode_data = decode_data[1:]
195 decode_data, total_bytes = asn1decode(decode_data)
196 next_byte = unpack('B',decode_data[:1])[0]
198 if next_byte != ASN1_MECH_TYPE:
199 # MechType not found, could be an AUTH answer
200 if next_byte != ASN1_RESPONSE_TOKEN:
201 raise Exception('MechType/ResponseToken tag not found %x' % next_byte)
203 decode_data2 = decode_data[1:]
204 decode_data2, total_bytes = asn1decode(decode_data2)
205 next_byte = unpack('B', decode_data2[:1])[0]
206 if next_byte != ASN1_ENUMERATED:
207 raise Exception('Enumerated tag not found %x' % next_byte)
208 item, total_bytes2 = asn1decode(decode_data)
209 self['NegResult'] = item
210 decode_data = decode_data[1:]
211 decode_data = decode_data[total_bytes:]
213 # Do we have more data?
214 if len(decode_data) == 0:
217 next_byte = unpack('B', decode_data[:1])[0]
218 if next_byte != ASN1_SUPPORTED_MECH:
219 if next_byte != ASN1_RESPONSE_TOKEN:
220 raise Exception('Supported Mech/ResponseToken tag not found %x' % next_byte)
222 decode_data2 = decode_data[1:]
223 decode_data2, total_bytes = asn1decode(decode_data2)
224 next_byte = unpack('B', decode_data2[:1])[0]
225 if next_byte != ASN1_OID:
226 raise Exception('OID tag not found %x' % next_byte)
227 decode_data2 = decode_data2[1:]
228 item, total_bytes2 = asn1decode(decode_data2)
229 self['SupportedMech'] = item
231 decode_data = decode_data[1:]
232 decode_data = decode_data[total_bytes:]
233 next_byte = unpack('B', decode_data[:1])[0]
234 if next_byte != ASN1_RESPONSE_TOKEN:
235 raise Exception('Response token tag not found %x' % next_byte)
237 decode_data = decode_data[1:]
238 decode_data, total_bytes = asn1decode(decode_data)
239 next_byte = unpack('B', decode_data[:1])[0]
240 if next_byte != ASN1_OCTET_STRING:
241 raise Exception('Octet string token tag not found %x' % next_byte)
242 decode_data = decode_data[1:]
243 decode_data, total_bytes = asn1decode(decode_data)
244 self['ResponseToken'] = decode_data
247 for i in self.fields.keys():
248 print "%s: {%r}" % (i,self[i])
251 ans = pack('B',SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_RESP)
252 if self.fields.has_key('NegResult') and self.fields.has_key('SupportedMech'):
255 pack('B', ASN1_SEQUENCE) +
257 pack('B',SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_TARG) +
259 pack('B',ASN1_ENUMERATED) +
260 asn1encode( self['NegResult'] )) +
261 pack('B',ASN1_SUPPORTED_MECH) +
264 asn1encode(self['SupportedMech'])) +
265 pack('B',ASN1_RESPONSE_TOKEN ) +
267 pack('B', ASN1_OCTET_STRING) + asn1encode(self['ResponseToken']))))
268 elif self.fields.has_key('NegResult'):
271 pack('B', ASN1_SEQUENCE) +
273 pack('B', SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_TARG) +
275 pack('B',ASN1_ENUMERATED) +
276 asn1encode( self['NegResult'] ))))
280 pack('B', ASN1_SEQUENCE) +
282 pack('B', ASN1_RESPONSE_TOKEN) +
284 pack('B', ASN1_OCTET_STRING) + asn1encode(self['ResponseToken']))))
287 class SPNEGO_NegTokenInit(GSSAPI):
288 # http://tools.ietf.org/html/rfc4178#page-8
289 # NegTokeInit :: = SEQUENCE {
290 # mechTypes [0] MechTypeList,
291 # reqFlags [1] ContextFlags OPTIONAL,
292 # mechToken [2] OCTET STRING OPTIONAL,
293 # mechListMIC [3] OCTET STRING OPTIONAL,
295 SPNEGO_NEG_TOKEN_INIT = 0xa0
296 def fromString(self, data = 0):
297 GSSAPI.fromString(self, data)
298 payload = self['Payload']
299 next_byte = unpack('B', payload[:1])[0]
300 if next_byte != SPNEGO_NegTokenInit.SPNEGO_NEG_TOKEN_INIT:
301 raise Exception('NegTokenInit not found %x' % next_byte)
302 payload = payload[1:]
303 decode_data, total_bytes = asn1decode(payload)
304 # Now we should have a SEQUENCE Tag
305 next_byte = unpack('B', decode_data[:1])[0]
306 if next_byte != ASN1_SEQUENCE:
307 raise Exception('SEQUENCE tag not found %x' % next_byte)
308 decode_data = decode_data[1:]
309 decode_data, total_bytes2 = asn1decode(decode_data)
310 next_byte = unpack('B',decode_data[:1])[0]
311 if next_byte != ASN1_MECH_TYPE:
312 raise Exception('MechType tag not found %x' % next_byte)
313 decode_data = decode_data[1:]
314 remaining_data = decode_data
315 decode_data, total_bytes3 = asn1decode(decode_data)
316 next_byte = unpack('B', decode_data[:1])[0]
317 if next_byte != ASN1_SEQUENCE:
318 raise Exception('SEQUENCE tag not found %x' % next_byte)
319 decode_data = decode_data[1:]
320 decode_data, total_bytes4 = asn1decode(decode_data)
321 # And finally we should have the MechTypes
322 self['MechTypes'] = []
324 next_byte = unpack('B', decode_data[:1])[0]
325 if next_byte != ASN1_OID:
326 # Not a valid OID, there must be something else we won't unpack
328 decode_data = decode_data[1:]
329 item, total_bytes = asn1decode(decode_data)
330 self['MechTypes'].append(item)
331 decode_data = decode_data[total_bytes:]
333 # Do we have MechTokens as well?
334 decode_data = remaining_data[total_bytes3:]
335 if len(decode_data) > 0:
336 next_byte = unpack('B', decode_data[:1])[0]
337 if next_byte == ASN1_MECH_TOKEN:
338 # We have tokens in here!
339 decode_data = decode_data[1:]
340 decode_data, total_bytes = asn1decode(decode_data)
341 next_byte = unpack('B', decode_data[:1])[0]
342 if next_byte == ASN1_OCTET_STRING:
343 decode_data = decode_data[1:]
344 decode_data, total_bytes = asn1decode(decode_data)
345 self['MechToken'] = decode_data
349 for i in self['MechTypes']:
350 mechTypes += pack('B', ASN1_OID)
351 mechTypes += asn1encode(i)
354 # Do we have tokens to send?
355 if self.fields.has_key('MechToken'):
356 mechToken = pack('B', ASN1_MECH_TOKEN) + asn1encode(
357 pack('B', ASN1_OCTET_STRING) + asn1encode(
360 ans = pack('B',SPNEGO_NegTokenInit.SPNEGO_NEG_TOKEN_INIT)
362 pack('B', ASN1_SEQUENCE) +
364 pack('B', ASN1_MECH_TYPE) +
366 pack('B', ASN1_SEQUENCE) +
367 asn1encode(mechTypes)) + mechToken ))
370 self['Payload'] = ans
371 return GSSAPI.getData(self)