11 import SimpleHTTPServer
15 from cryptoIDlib.api import *
16 cryptoIDlibLoaded = True
18 cryptoIDlibLoaded = False
20 if __name__ != "__main__":
21 raise "This must be run as a command, not used as a module!"
24 #from tlslite.constants import AlertDescription, Fault
26 #from tlslite.utils.jython_compat import formatExceptionTrace
27 #from tlslite.X509 import X509, X509CertChain
29 from tlslite.api import *
31 def parsePrivateKey(s):
33 return parsePEMKey(s, private=True)
36 return parseXMLKey(s, private=True)
39 def clientTest(address, dir):
41 #Split address into hostname/port tuple
42 address = address.split(":")
44 address.append("4443")
45 address = ( address[0], int(address[1]) )
48 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
49 if hasattr(sock, 'settimeout'): #It's a python 2.3 feature
52 c = TLSConnection(sock)
59 print "Test 1 - good shared key"
60 connection = connect()
61 connection.handshakeClientSharedKey("shared", "key")
63 connection.sock.close()
65 print "Test 2 - shared key faults"
66 for fault in Fault.clientSharedKeyFaults + Fault.genericFaults:
67 connection = connect()
68 connection.fault = fault
70 connection.handshakeClientSharedKey("shared", "key")
71 print " Good Fault %s" % (Fault.faultNames[fault])
72 except TLSFaultError, e:
73 print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
75 connection.sock.close()
77 print "Test 3 - good SRP"
78 connection = connect()
79 connection.handshakeClientSRP("test", "password")
82 print "Test 4 - SRP faults"
83 for fault in Fault.clientSrpFaults + Fault.genericFaults:
84 connection = connect()
85 connection.fault = fault
87 connection.handshakeClientSRP("test", "password")
88 print " Good Fault %s" % (Fault.faultNames[fault])
89 except TLSFaultError, e:
90 print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
92 connection.sock.close()
94 print "Test 5 - good SRP: unknown_srp_username idiom"
96 return ("test", "password")
97 connection = connect()
98 connection.handshakeClientUnknown(srpCallback=srpCallback)
100 connection.sock.close()
102 print "Test 6 - good SRP: with X.509 certificate"
103 connection = connect()
104 connection.handshakeClientSRP("test", "password")
105 assert(isinstance(connection.session.serverCertChain, X509CertChain))
107 connection.sock.close()
109 print "Test 7 - X.509 with SRP faults"
110 for fault in Fault.clientSrpFaults + Fault.genericFaults:
111 connection = connect()
112 connection.fault = fault
114 connection.handshakeClientSRP("test", "password")
115 print " Good Fault %s" % (Fault.faultNames[fault])
116 except TLSFaultError, e:
117 print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
119 connection.sock.close()
121 if cryptoIDlibLoaded:
122 print "Test 8 - good SRP: with cryptoID certificate chain"
123 connection = connect()
124 connection.handshakeClientSRP("test", "password")
125 assert(isinstance(connection.session.serverCertChain, CertChain))
126 if not (connection.session.serverCertChain.validate()):
127 print connection.session.serverCertChain.validate(listProblems=True)
130 connection.sock.close()
132 print "Test 9 - CryptoID with SRP faults"
133 for fault in Fault.clientSrpFaults + Fault.genericFaults:
134 connection = connect()
135 connection.fault = fault
137 connection.handshakeClientSRP("test", "password")
138 print " Good Fault %s" % (Fault.faultNames[fault])
139 except TLSFaultError, e:
140 print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
142 connection.sock.close()
144 print "Test 10 - good X509"
145 connection = connect()
146 connection.handshakeClientCert()
147 assert(isinstance(connection.session.serverCertChain, X509CertChain))
149 connection.sock.close()
151 print "Test 10.a - good X509, SSLv3"
152 connection = connect()
153 settings = HandshakeSettings()
154 settings.minVersion = (3,0)
155 settings.maxVersion = (3,0)
156 connection.handshakeClientCert(settings=settings)
157 assert(isinstance(connection.session.serverCertChain, X509CertChain))
159 connection.sock.close()
161 print "Test 11 - X.509 faults"
162 for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
163 connection = connect()
164 connection.fault = fault
166 connection.handshakeClientCert()
167 print " Good Fault %s" % (Fault.faultNames[fault])
168 except TLSFaultError, e:
169 print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
171 connection.sock.close()
173 if cryptoIDlibLoaded:
174 print "Test 12 - good cryptoID"
175 connection = connect()
176 connection.handshakeClientCert()
177 assert(isinstance(connection.session.serverCertChain, CertChain))
178 assert(connection.session.serverCertChain.validate())
180 connection.sock.close()
182 print "Test 13 - cryptoID faults"
183 for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
184 connection = connect()
185 connection.fault = fault
187 connection.handshakeClientCert()
188 print " Good Fault %s" % (Fault.faultNames[fault])
189 except TLSFaultError, e:
190 print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
192 connection.sock.close()
194 print "Test 14 - good mutual X509"
195 x509Cert = X509().parse(open(os.path.join(dir, "clientX509Cert.pem")).read())
196 x509Chain = X509CertChain([x509Cert])
197 s = open(os.path.join(dir, "clientX509Key.pem")).read()
198 x509Key = parsePEMKey(s, private=True)
200 connection = connect()
201 connection.handshakeClientCert(x509Chain, x509Key)
202 assert(isinstance(connection.session.serverCertChain, X509CertChain))
204 connection.sock.close()
206 print "Test 14.a - good mutual X509, SSLv3"
207 connection = connect()
208 settings = HandshakeSettings()
209 settings.minVersion = (3,0)
210 settings.maxVersion = (3,0)
211 connection.handshakeClientCert(x509Chain, x509Key, settings=settings)
212 assert(isinstance(connection.session.serverCertChain, X509CertChain))
214 connection.sock.close()
216 print "Test 15 - mutual X.509 faults"
217 for fault in Fault.clientCertFaults + Fault.genericFaults:
218 connection = connect()
219 connection.fault = fault
221 connection.handshakeClientCert(x509Chain, x509Key)
222 print " Good Fault %s" % (Fault.faultNames[fault])
223 except TLSFaultError, e:
224 print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
226 connection.sock.close()
228 if cryptoIDlibLoaded:
229 print "Test 16 - good mutual cryptoID"
230 cryptoIDChain = CertChain().parse(open(os.path.join(dir, "serverCryptoIDChain.xml"), "r").read())
231 cryptoIDKey = parseXMLKey(open(os.path.join(dir, "serverCryptoIDKey.xml"), "r").read(), private=True)
233 connection = connect()
234 connection.handshakeClientCert(cryptoIDChain, cryptoIDKey)
235 assert(isinstance(connection.session.serverCertChain, CertChain))
236 assert(connection.session.serverCertChain.validate())
238 connection.sock.close()
240 print "Test 17 - mutual cryptoID faults"
241 for fault in Fault.clientCertFaults + Fault.genericFaults:
242 connection = connect()
243 connection.fault = fault
245 connection.handshakeClientCert(cryptoIDChain, cryptoIDKey)
246 print " Good Fault %s" % (Fault.faultNames[fault])
247 except TLSFaultError, e:
248 print " BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))
250 connection.sock.close()
252 print "Test 18 - good SRP, prepare to resume..."
253 connection = connect()
254 connection.handshakeClientSRP("test", "password")
256 connection.sock.close()
257 session = connection.session
259 print "Test 19 - resumption"
260 connection = connect()
261 connection.handshakeClientSRP("test", "garbage", session=session)
262 #Don't close! -- see below
264 print "Test 20 - invalidated resumption"
265 connection.sock.close() #Close the socket without a close_notify!
266 connection = connect()
268 connection.handshakeClientSRP("test", "garbage", session=session)
270 except TLSRemoteAlert, alert:
271 if alert.description != AlertDescription.bad_record_mac:
273 connection.sock.close()
275 print "Test 21 - HTTPS test X.509"
276 address = address[0], address[1]+1
277 if hasattr(socket, "timeout"):
278 timeoutEx = socket.timeout
280 timeoutEx = socket.error
284 htmlBody = open(os.path.join(dir, "index.html")).read()
287 h = HTTPTLSConnection(\
288 address[0], address[1], x509Fingerprint=fingerprint)
290 h.request("GET", "/index.html")
292 assert(r.status == 200)
294 assert(s == htmlBody)
295 fingerprint = h.tlsSession.serverCertChain.getFingerprint()
300 print "timeout, retrying..."
303 if cryptoIDlibLoaded:
304 print "Test 21a - HTTPS test SRP+cryptoID"
305 address = address[0], address[1]+1
306 if hasattr(socket, "timeout"):
307 timeoutEx = socket.timeout
309 timeoutEx = socket.error
312 time.sleep(2) #Time to generate key and cryptoID
313 htmlBody = open(os.path.join(dir, "index.html")).read()
317 h = HTTPTLSConnection(\
318 address[0], address[1],
319 username="test", password="password",
320 cryptoID=fingerprint, protocol=protocol)
322 h.request("GET", "/index.html")
324 assert(r.status == 200)
326 assert(s == htmlBody)
327 fingerprint = h.tlsSession.serverCertChain.cryptoID
329 protocol = "urn:whatever"
333 print "timeout, retrying..."
336 address = address[0], address[1]+1
340 implementations.append("cryptlib")
342 implementations.append("openssl")
344 implementations.append("pycrypto")
345 implementations.append("python")
347 print "Test 22 - different ciphers"
348 for implementation in implementations:
349 for cipher in ["aes128", "aes256", "rc4"]:
352 connection = connect()
354 settings = HandshakeSettings()
355 settings.cipherNames = [cipher]
356 settings.cipherImplementations = [implementation, "python"]
357 connection.handshakeClientSharedKey("shared", "key", settings=settings)
358 print ("%s %s" % (connection.getCipherName(), connection.getCipherImplementation()))
360 connection.write("hello")
361 h = connection.read(min=5, max=5)
364 connection.sock.close()
366 print "Test 23 - throughput test"
367 for implementation in implementations:
368 for cipher in ["aes128", "aes256", "3des", "rc4"]:
369 if cipher == "3des" and implementation not in ("openssl", "cryptlib", "pycrypto"):
373 connection = connect()
375 settings = HandshakeSettings()
376 settings.cipherNames = [cipher]
377 settings.cipherImplementations = [implementation, "python"]
378 connection.handshakeClientSharedKey("shared", "key", settings=settings)
379 print ("%s %s:" % (connection.getCipherName(), connection.getCipherImplementation())),
381 startTime = time.clock()
382 connection.write("hello"*10000)
383 h = connection.read(min=50000, max=50000)
384 stopTime = time.clock()
385 print "100K exchanged at rate of %d bytes/sec" % int(100000/(stopTime-startTime))
387 assert(h == "hello"*10000)
389 connection.sock.close()
391 print "Test 24 - Internet servers test"
393 i = IMAP4_TLS("cyrus.andrew.cmu.edu")
394 i.login("anonymous", "anonymous@anonymous.net")
396 print "Test 24: IMAP4 good"
397 p = POP3_TLS("pop.gmail.com")
399 print "Test 24: POP3 good"
400 except socket.error, e:
401 print "Non-critical error: socket error trying to reach internet server: ", e
404 print "Test succeeded"
409 def serverTest(address, dir):
410 #Split address into hostname/port tuple
411 address = address.split(":")
413 address.append("4443")
414 address = ( address[0], int(address[1]) )
417 lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
422 return TLSConnection(lsock.accept()[0])
424 print "Test 1 - good shared key"
425 sharedKeyDB = SharedKeyDB()
426 sharedKeyDB["shared"] = "key"
427 sharedKeyDB["shared2"] = "key2"
428 connection = connect()
429 connection.handshakeServer(sharedKeyDB=sharedKeyDB)
431 connection.sock.close()
433 print "Test 2 - shared key faults"
434 for fault in Fault.clientSharedKeyFaults + Fault.genericFaults:
435 connection = connect()
436 connection.fault = fault
438 connection.handshakeServer(sharedKeyDB=sharedKeyDB)
442 connection.sock.close()
444 print "Test 3 - good SRP"
445 #verifierDB = tlslite.VerifierDB(os.path.join(dir, "verifierDB"))
447 verifierDB = VerifierDB()
449 entry = VerifierDB.makeVerifier("test", "password", 1536)
450 verifierDB["test"] = entry
452 connection = connect()
453 connection.handshakeServer(verifierDB=verifierDB)
455 connection.sock.close()
457 print "Test 4 - SRP faults"
458 for fault in Fault.clientSrpFaults + Fault.genericFaults:
459 connection = connect()
460 connection.fault = fault
462 connection.handshakeServer(verifierDB=verifierDB)
466 connection.sock.close()
468 print "Test 5 - good SRP: unknown_srp_username idiom"
469 connection = connect()
470 connection.handshakeServer(verifierDB=verifierDB)
472 connection.sock.close()
474 print "Test 6 - good SRP: with X.509 cert"
475 x509Cert = X509().parse(open(os.path.join(dir, "serverX509Cert.pem")).read())
476 x509Chain = X509CertChain([x509Cert])
477 s = open(os.path.join(dir, "serverX509Key.pem")).read()
478 x509Key = parsePEMKey(s, private=True)
480 connection = connect()
481 connection.handshakeServer(verifierDB=verifierDB, \
482 certChain=x509Chain, privateKey=x509Key)
484 connection.sock.close()
486 print "Test 7 - X.509 with SRP faults"
487 for fault in Fault.clientSrpFaults + Fault.genericFaults:
488 connection = connect()
489 connection.fault = fault
491 connection.handshakeServer(verifierDB=verifierDB, \
492 certChain=x509Chain, privateKey=x509Key)
496 connection.sock.close()
498 if cryptoIDlibLoaded:
499 print "Test 8 - good SRP: with cryptoID certs"
500 cryptoIDChain = CertChain().parse(open(os.path.join(dir, "serverCryptoIDChain.xml"), "r").read())
501 cryptoIDKey = parseXMLKey(open(os.path.join(dir, "serverCryptoIDKey.xml"), "r").read(), private=True)
502 connection = connect()
503 connection.handshakeServer(verifierDB=verifierDB, \
504 certChain=cryptoIDChain, privateKey=cryptoIDKey)
506 connection.sock.close()
508 print "Test 9 - cryptoID with SRP faults"
509 for fault in Fault.clientSrpFaults + Fault.genericFaults:
510 connection = connect()
511 connection.fault = fault
513 connection.handshakeServer(verifierDB=verifierDB, \
514 certChain=cryptoIDChain, privateKey=cryptoIDKey)
518 connection.sock.close()
520 print "Test 10 - good X.509"
521 connection = connect()
522 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
524 connection.sock.close()
526 print "Test 10.a - good X.509, SSL v3"
527 connection = connect()
528 settings = HandshakeSettings()
529 settings.minVersion = (3,0)
530 settings.maxVersion = (3,0)
531 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)
533 connection.sock.close()
535 print "Test 11 - X.509 faults"
536 for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
537 connection = connect()
538 connection.fault = fault
540 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
544 connection.sock.close()
546 if cryptoIDlibLoaded:
547 print "Test 12 - good cryptoID"
548 connection = connect()
549 connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey)
551 connection.sock.close()
553 print "Test 13 - cryptoID faults"
554 for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
555 connection = connect()
556 connection.fault = fault
558 connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey)
562 connection.sock.close()
564 print "Test 14 - good mutual X.509"
565 connection = connect()
566 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True)
567 assert(isinstance(connection.session.serverCertChain, X509CertChain))
569 connection.sock.close()
571 print "Test 14a - good mutual X.509, SSLv3"
572 connection = connect()
573 settings = HandshakeSettings()
574 settings.minVersion = (3,0)
575 settings.maxVersion = (3,0)
576 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings)
577 assert(isinstance(connection.session.serverCertChain, X509CertChain))
579 connection.sock.close()
581 print "Test 15 - mutual X.509 faults"
582 for fault in Fault.clientCertFaults + Fault.genericFaults:
583 connection = connect()
584 connection.fault = fault
586 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True)
590 connection.sock.close()
592 if cryptoIDlibLoaded:
593 print "Test 16 - good mutual cryptoID"
594 connection = connect()
595 connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey, reqCert=True)
596 assert(isinstance(connection.session.serverCertChain, CertChain))
597 assert(connection.session.serverCertChain.validate())
599 connection.sock.close()
601 print "Test 17 - mutual cryptoID faults"
602 for fault in Fault.clientCertFaults + Fault.genericFaults:
603 connection = connect()
604 connection.fault = fault
606 connection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey, reqCert=True)
610 connection.sock.close()
612 print "Test 18 - good SRP, prepare to resume"
613 sessionCache = SessionCache()
614 connection = connect()
615 connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
617 connection.sock.close()
619 print "Test 19 - resumption"
620 connection = connect()
621 connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
622 #Don't close! -- see next test
624 print "Test 20 - invalidated resumption"
626 connection.read(min=1, max=1)
627 assert() #Client is going to close the socket without a close_notify
628 except TLSAbruptCloseError, e:
630 connection = connect()
632 connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
633 except TLSLocalAlert, alert:
634 if alert.description != AlertDescription.bad_record_mac:
636 connection.sock.close()
638 print "Test 21 - HTTPS test X.509"
640 #Close the current listening socket
643 #Create and run an HTTP Server using TLSSocketServerMixIn
644 class MyHTTPServer(TLSSocketServerMixIn,
645 BaseHTTPServer.HTTPServer):
646 def handshake(self, tlsConnection):
647 tlsConnection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
651 address = address[0], address[1]+1
652 httpd = MyHTTPServer(address, SimpleHTTPServer.SimpleHTTPRequestHandler)
654 httpd.handle_request()
658 if cryptoIDlibLoaded:
659 print "Test 21a - HTTPS test SRP+cryptoID"
661 #Create and run an HTTP Server using TLSSocketServerMixIn
662 class MyHTTPServer(TLSSocketServerMixIn,
663 BaseHTTPServer.HTTPServer):
664 def handshake(self, tlsConnection):
665 tlsConnection.handshakeServer(certChain=cryptoIDChain, privateKey=cryptoIDKey,
666 verifierDB=verifierDB)
670 address = address[0], address[1]+1
671 httpd = MyHTTPServer(address, SimpleHTTPServer.SimpleHTTPRequestHandler)
673 httpd.handle_request()
677 #Re-connect the listening socket
678 lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
679 address = address[0], address[1]+1
684 return TLSConnection(lsock.accept()[0])
688 implementations.append("cryptlib")
690 implementations.append("openssl")
692 implementations.append("pycrypto")
693 implementations.append("python")
695 print "Test 22 - different ciphers"
696 for implementation in ["python"] * len(implementations):
697 for cipher in ["aes128", "aes256", "rc4"]:
700 connection = connect()
702 settings = HandshakeSettings()
703 settings.cipherNames = [cipher]
704 settings.cipherImplementations = [implementation, "python"]
706 connection.handshakeServer(sharedKeyDB=sharedKeyDB, settings=settings)
707 print connection.getCipherName(), connection.getCipherImplementation()
708 h = connection.read(min=5, max=5)
712 connection.sock.close()
714 print "Test 23 - throughput test"
715 for implementation in implementations:
716 for cipher in ["aes128", "aes256", "3des", "rc4"]:
717 if cipher == "3des" and implementation not in ("openssl", "cryptlib", "pycrypto"):
721 connection = connect()
723 settings = HandshakeSettings()
724 settings.cipherNames = [cipher]
725 settings.cipherImplementations = [implementation, "python"]
727 connection.handshakeServer(sharedKeyDB=sharedKeyDB, settings=settings)
728 print connection.getCipherName(), connection.getCipherImplementation()
729 h = connection.read(min=50000, max=50000)
730 assert(h == "hello"*10000)
733 connection.sock.close()
735 print "Test succeeded"
748 if len(sys.argv) == 1 or (len(sys.argv)==2 and sys.argv[1].lower().endswith("help")):
750 print "Version: 0.3.8"
752 print "RNG: %s" % prngName
756 print " cryptlib_py : Loaded"
758 print " cryptlib_py : Not Loaded"
760 print " M2Crypto : Loaded"
762 print " M2Crypto : Not Loaded"
764 print " pycrypto : Loaded"
766 print " pycrypto : Not Loaded"
768 print " GMPY : Loaded"
770 print " GMPY : Not Loaded"
771 if cryptoIDlibLoaded:
772 print " cryptoIDlib : Loaded"
774 print " cryptoIDlib : Not Loaded"
778 print " clientcert <server> [<chain> <key>]"
779 print " clientsharedkey <server> <user> <pass>"
780 print " clientsrp <server> <user> <pass>"
781 print " clienttest <server> <dir>"
783 print " serversrp <server> <verifierDB>"
784 print " servercert <server> <chain> <key> [req]"
785 print " serversrpcert <server> <verifierDB> <chain> <key>"
786 print " serversharedkey <server> <sharedkeyDB>"
787 print " servertest <server> <dir>"
790 cmd = sys.argv[1].lower()
793 def __init__(self, argv):
795 def get(self, index):
796 if len(self.argv)<=index:
797 raise SyntaxError("Not enough arguments")
798 return self.argv[index]
799 def getLast(self, index):
800 if len(self.argv)>index+1:
801 raise SyntaxError("Too many arguments")
802 return self.get(index)
804 args = Args(sys.argv)
806 def reformatDocString(s):
807 lines = s.splitlines()
810 newLines.append(" " + line.strip())
811 return "\n".join(newLines)
814 if cmd == "clienttest":
815 address = args.get(2)
816 dir = args.getLast(3)
817 clientTest(address, dir)
820 elif cmd.startswith("client"):
821 address = args.get(2)
823 #Split address into hostname/port tuple
824 address = address.split(":")
826 address.append("4443")
827 address = ( address[0], int(address[1]) )
831 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
832 if hasattr(sock, "settimeout"):
834 sock.connect(address)
836 #Instantiate TLSConnections
837 return TLSConnection(sock)
840 if cmd == "clientsrp":
841 username = args.get(3)
842 password = args.getLast(4)
843 connection = connect()
845 connection.handshakeClientSRP(username, password)
846 elif cmd == "clientsharedkey":
847 username = args.get(3)
848 password = args.getLast(4)
849 connection = connect()
851 connection.handshakeClientSharedKey(username, password)
852 elif cmd == "clientcert":
855 if len(sys.argv) > 3:
856 certFilename = args.get(3)
857 keyFilename = args.getLast(4)
859 s1 = open(certFilename, "rb").read()
860 s2 = open(keyFilename, "rb").read()
862 #Try to create cryptoID cert chain
863 if cryptoIDlibLoaded:
865 certChain = CertChain().parse(s1)
866 privateKey = parsePrivateKey(s2)
871 #Try to create X.509 cert chain
875 certChain = X509CertChain([x509])
876 privateKey = parsePrivateKey(s2)
878 connection = connect()
880 connection.handshakeClientCert(certChain, privateKey)
882 raise SyntaxError("Unknown command")
884 except TLSLocalAlert, a:
885 if a.description == AlertDescription.bad_record_mac:
886 if cmd == "clientsharedkey":
887 print "Bad sharedkey password"
890 elif a.description == AlertDescription.user_canceled:
895 except TLSRemoteAlert, a:
896 if a.description == AlertDescription.unknown_srp_username:
897 if cmd == "clientsrp":
898 print "Unknown username"
901 elif a.description == AlertDescription.bad_record_mac:
902 if cmd == "clientsrp":
903 print "Bad username or password"
906 elif a.description == AlertDescription.handshake_failure:
907 print "Unable to negotiate mutually acceptable parameters"
913 print "Handshake success"
914 print " Handshake time: %.4f seconds" % (stop - start)
915 print " Version: %s.%s" % connection.version
916 print " Cipher: %s %s" % (connection.getCipherName(), connection.getCipherImplementation())
917 if connection.session.srpUsername:
918 print " Client SRP username: %s" % connection.session.srpUsername
919 if connection.session.sharedKeyUsername:
920 print " Client shared key username: %s" % connection.session.sharedKeyUsername
921 if connection.session.clientCertChain:
922 print " Client fingerprint: %s" % connection.session.clientCertChain.getFingerprint()
923 if connection.session.serverCertChain:
924 print " Server fingerprint: %s" % connection.session.serverCertChain.getFingerprint()
926 connection.sock.close()
928 elif cmd.startswith("server"):
929 address = args.get(2)
931 #Split address into hostname/port tuple
932 address = address.split(":")
934 address.append("4443")
935 address = ( address[0], int(address[1]) )
937 verifierDBFilename = None
938 sharedKeyDBFilename = None
944 if cmd == "serversrp":
945 verifierDBFilename = args.getLast(3)
946 elif cmd == "servercert":
947 certFilename = args.get(3)
948 keyFilename = args.get(4)
950 req = args.getLast(5)
951 if req.lower() != "req":
954 elif cmd == "serversrpcert":
955 verifierDBFilename = args.get(3)
956 certFilename = args.get(4)
957 keyFilename = args.getLast(5)
958 elif cmd == "serversharedkey":
959 sharedKeyDBFilename = args.getLast(3)
960 elif cmd == "servertest":
961 address = args.get(2)
962 dir = args.getLast(3)
963 serverTest(address, dir)
967 if verifierDBFilename:
968 verifierDB = VerifierDB(verifierDBFilename)
972 if sharedKeyDBFilename:
973 sharedKeyDB = SharedKeyDB(sharedKeyDBFilename)
979 s1 = open(certFilename, "rb").read()
980 s2 = open(keyFilename, "rb").read()
982 #Try to create cryptoID cert chain
983 if cryptoIDlibLoaded:
985 certChain = CertChain().parse(s1)
986 privateKey = parsePrivateKey(s2)
991 #Try to create X.509 cert chain
995 certChain = X509CertChain([x509])
996 privateKey = parsePrivateKey(s2)
1000 #Create handler function - performs handshake, then echos all bytes received
1003 connection = TLSConnection(sock)
1004 settings = HandshakeSettings()
1005 connection.handshakeServer(sharedKeyDB=sharedKeyDB, verifierDB=verifierDB, \
1006 certChain=certChain, privateKey=privateKey, \
1007 reqCert=reqCert, settings=settings)
1008 print "Handshake success"
1009 print " Version: %s.%s" % connection.version
1010 print " Cipher: %s %s" % (connection.getCipherName(), connection.getCipherImplementation())
1011 if connection.session.srpUsername:
1012 print " Client SRP username: %s" % connection.session.srpUsername
1013 if connection.session.sharedKeyUsername:
1014 print " Client shared key username: %s" % connection.session.sharedKeyUsername
1015 if connection.session.clientCertChain:
1016 print " Client fingerprint: %s" % connection.session.clientCertChain.getFingerprint()
1017 if connection.session.serverCertChain:
1018 print " Server fingerprint: %s" % connection.session.serverCertChain.getFingerprint()
1022 newS = connection.read()
1029 except TLSLocalAlert, a:
1030 if a.description == AlertDescription.unknown_srp_username:
1031 print "Unknown SRP username"
1032 elif a.description == AlertDescription.bad_record_mac:
1033 if cmd == "serversrp" or cmd == "serversrpcert":
1034 print "Bad SRP password for:", connection.allegedSrpUsername
1037 elif a.description == AlertDescription.handshake_failure:
1038 print "Unable to negotiate mutually acceptable parameters"
1041 except TLSRemoteAlert, a:
1042 if a.description == AlertDescription.bad_record_mac:
1043 if cmd == "serversharedkey":
1044 print "Bad sharedkey password for:", connection.allegedSharedKeyUsername
1047 elif a.description == AlertDescription.user_canceled:
1048 print "Handshake cancelled"
1049 elif a.description == AlertDescription.handshake_failure:
1050 print "Unable to negotiate mutually acceptable parameters"
1051 elif a.description == AlertDescription.close_notify:
1056 #Run multi-threaded server
1057 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1061 (newsock, cliAddress) = sock.accept()
1062 thread.start_new_thread(handler, (newsock,))
1066 print "Bad command: '%s'" % cmd
1067 except TLSRemoteAlert, a: