Upstream version 10.39.225.0
[platform/framework/web/crosswalk.git] / src / net / tools / testserver / testserver.py
index 0a1f59b..d020ca3 100755 (executable)
@@ -27,6 +27,7 @@ import re
 import select
 import socket
 import SocketServer
+import ssl
 import struct
 import sys
 import threading
@@ -38,20 +39,36 @@ import zlib
 BASE_DIR = os.path.dirname(os.path.abspath(__file__))
 ROOT_DIR = os.path.dirname(os.path.dirname(os.path.dirname(BASE_DIR)))
 
-import echo_message
-import testserver_base
+# Temporary hack to deal with tlslite 0.3.8 -> 0.4.6 upgrade.
+#
+# TODO(davidben): Remove this when it has cycled through all the bots and
+# developer checkouts or when http://crbug.com/356276 is resolved.
+try:
+  os.remove(os.path.join(ROOT_DIR, 'third_party', 'tlslite',
+                         'tlslite', 'utils', 'hmac.pyc'))
+except Exception:
+  pass
 
 # Append at the end of sys.path, it's fine to use the system library.
 sys.path.append(os.path.join(ROOT_DIR, 'third_party', 'pyftpdlib', 'src'))
-sys.path.append(os.path.join(ROOT_DIR, 'third_party', 'tlslite'))
-import pyftpdlib.ftpserver
-import tlslite
-import tlslite.api
 
-# Insert at the beginning of the path, we want this to be used
+# Insert at the beginning of the path, we want to use our copies of the library
 # unconditionally.
 sys.path.insert(0, os.path.join(ROOT_DIR, 'third_party', 'pywebsocket', 'src'))
+sys.path.insert(0, os.path.join(ROOT_DIR, 'third_party', 'tlslite'))
+
+import mod_pywebsocket.standalone
 from mod_pywebsocket.standalone import WebSocketServer
+# import manually
+mod_pywebsocket.standalone.ssl = ssl
+
+import pyftpdlib.ftpserver
+
+import tlslite
+import tlslite.api
+
+import echo_message
+import testserver_base
 
 SERVER_HTTP = 0
 SERVER_FTP = 1
@@ -84,7 +101,9 @@ class WebSocketOptions:
     self.certificate = None
     self.tls_client_auth = False
     self.tls_client_ca = None
+    self.tls_module = 'ssl'
     self.use_basic_auth = False
+    self.basic_auth_credential = 'Basic ' + base64.b64encode('test:test')
 
 
 class RecordingSSLSessionCache(object):
@@ -134,10 +153,13 @@ class HTTPSServer(tlslite.api.TLSSocketServerMixIn,
   client verification."""
 
   def __init__(self, server_address, request_hander_class, pem_cert_and_key,
-               ssl_client_auth, ssl_client_cas, ssl_bulk_ciphers,
-               record_resume_info, tls_intolerant, signed_cert_timestamps,
-               fallback_scsv_enabled, ocsp_response):
-    self.cert_chain = tlslite.api.X509CertChain().parseChain(pem_cert_and_key)
+               ssl_client_auth, ssl_client_cas, ssl_client_cert_types,
+               ssl_bulk_ciphers, ssl_key_exchanges, enable_npn,
+               record_resume_info, tls_intolerant,
+               tls_intolerance_type, signed_cert_timestamps,
+               fallback_scsv_enabled, ocsp_response, disable_session_cache):
+    self.cert_chain = tlslite.api.X509CertChain()
+    self.cert_chain.parsePemList(pem_cert_and_key)
     # Force using only python implementation - otherwise behavior is different
     # depending on whether m2crypto Python module is present (error is thrown
     # when it is). m2crypto uses a C (based on OpenSSL) implementation under
@@ -147,21 +169,42 @@ class HTTPSServer(tlslite.api.TLSSocketServerMixIn,
                                                implementations=['python'])
     self.ssl_client_auth = ssl_client_auth
     self.ssl_client_cas = []
-    self.tls_intolerant = tls_intolerant
+    self.ssl_client_cert_types = []
+    if enable_npn:
+      self.next_protos = ['http/1.1']
+    else:
+      self.next_protos = None
     self.signed_cert_timestamps = signed_cert_timestamps
     self.fallback_scsv_enabled = fallback_scsv_enabled
     self.ocsp_response = ocsp_response
 
-    for ca_file in ssl_client_cas:
-      s = open(ca_file).read()
-      x509 = tlslite.api.X509()
-      x509.parse(s)
-      self.ssl_client_cas.append(x509.subject)
+    if ssl_client_auth:
+      for ca_file in ssl_client_cas:
+        s = open(ca_file).read()
+        x509 = tlslite.api.X509()
+        x509.parse(s)
+        self.ssl_client_cas.append(x509.subject)
+
+      for cert_type in ssl_client_cert_types:
+        self.ssl_client_cert_types.append({
+            "rsa_sign": tlslite.api.ClientCertificateType.rsa_sign,
+            "dss_sign": tlslite.api.ClientCertificateType.dss_sign,
+            "ecdsa_sign": tlslite.api.ClientCertificateType.ecdsa_sign,
+            }[cert_type])
+
     self.ssl_handshake_settings = tlslite.api.HandshakeSettings()
     if ssl_bulk_ciphers is not None:
       self.ssl_handshake_settings.cipherNames = ssl_bulk_ciphers
+    if ssl_key_exchanges is not None:
+      self.ssl_handshake_settings.keyExchangeNames = ssl_key_exchanges
+    if tls_intolerant != 0:
+      self.ssl_handshake_settings.tlsIntolerant = (3, tls_intolerant)
+      self.ssl_handshake_settings.tlsIntoleranceType = tls_intolerance_type
+
 
-    if record_resume_info:
+    if disable_session_cache:
+      self.session_cache = None
+    elif record_resume_info:
       # If record_resume_info is true then we'll replace the session cache with
       # an object that records the lookups and inserts that it sees.
       self.session_cache = RecordingSSLSessionCache()
@@ -182,7 +225,8 @@ class HTTPSServer(tlslite.api.TLSSocketServerMixIn,
                                     reqCert=self.ssl_client_auth,
                                     settings=self.ssl_handshake_settings,
                                     reqCAs=self.ssl_client_cas,
-                                    tlsIntolerant=self.tls_intolerant,
+                                    reqCertTypes=self.ssl_client_cert_types,
+                                    nextProtos=self.next_protos,
                                     signedCertTimestamps=
                                     self.signed_cert_timestamps,
                                     fallbackSCSV=self.fallback_scsv_enabled,
@@ -288,10 +332,10 @@ class TestPageHandler(testserver_base.BasePageHandler):
       self.NoContentHandler,
       self.ServerRedirectHandler,
       self.ClientRedirectHandler,
-      self.MultipartHandler,
       self.GetSSLSessionCacheHandler,
       self.SSLManySmallRecords,
       self.GetChannelID,
+      self.ClientCipherListHandler,
       self.CloseSocketHandler,
       self.RangeResetHandler,
       self.DefaultResponseHandler]
@@ -1398,29 +1442,6 @@ class TestPageHandler(testserver_base.BasePageHandler):
 
     return True
 
-  def MultipartHandler(self):
-    """Send a multipart response (10 text/html pages)."""
-
-    test_name = '/multipart'
-    if not self._ShouldHandleRequest(test_name):
-      return False
-
-    num_frames = 10
-    bound = '12345'
-    self.send_response(200)
-    self.send_header('Content-Type',
-                     'multipart/x-mixed-replace;boundary=' + bound)
-    self.end_headers()
-
-    for i in xrange(num_frames):
-      self.wfile.write('--' + bound + '\r\n')
-      self.wfile.write('Content-Type: text/html\r\n\r\n')
-      self.wfile.write('<title>page ' + str(i) + '</title>')
-      self.wfile.write('page ' + str(i))
-
-    self.wfile.write('--' + bound + '--')
-    return True
-
   def GetSSLSessionCacheHandler(self):
     """Send a reply containing a log of the session cache operations."""
 
@@ -1431,11 +1452,14 @@ class TestPageHandler(testserver_base.BasePageHandler):
     self.send_header('Content-Type', 'text/plain')
     self.end_headers()
     try:
-      for (action, sessionID) in self.server.session_cache.log:
-        self.wfile.write('%s\t%s\n' % (action, sessionID.encode('hex')))
+      log = self.server.session_cache.log
     except AttributeError:
       self.wfile.write('Pass --https-record-resume in order to use' +
                        ' this request')
+      return True
+
+    for (action, sessionID) in log:
+      self.wfile.write('%s\t%s\n' % (action, bytes(sessionID).encode('hex')))
     return True
 
   def SSLManySmallRecords(self):
@@ -1465,10 +1489,27 @@ class TestPageHandler(testserver_base.BasePageHandler):
     self.send_response(200)
     self.send_header('Content-Type', 'text/plain')
     self.end_headers()
-    channel_id = self.server.tlsConnection.channel_id.tostring()
+    channel_id = bytes(self.server.tlsConnection.channel_id)
     self.wfile.write(hashlib.sha256(channel_id).digest().encode('base64'))
     return True
 
+  def ClientCipherListHandler(self):
+    """Send a reply containing the cipher suite list that the client
+    provided. Each cipher suite value is serialized in decimal, followed by a
+    newline."""
+
+    if not self._ShouldHandleRequest('/client-cipher-list'):
+      return False
+
+    self.send_response(200)
+    self.send_header('Content-Type', 'text/plain')
+    self.end_headers()
+
+    for cipher_suite in self.server.tlsConnection.clientHello.cipher_suites:
+      self.wfile.write(str(cipher_suite))
+      self.wfile.write('\n')
+    return True
+
   def CloseSocketHandler(self):
     """Closes the socket without sending anything."""
 
@@ -1956,17 +1997,24 @@ class ServerRunner(testserver_base.TestServerRunner):
         server = HTTPSServer((host, port), TestPageHandler, pem_cert_and_key,
                              self.options.ssl_client_auth,
                              self.options.ssl_client_ca,
+                             self.options.ssl_client_cert_type,
                              self.options.ssl_bulk_cipher,
+                             self.options.ssl_key_exchange,
+                             self.options.enable_npn,
                              self.options.record_resume,
                              self.options.tls_intolerant,
+                             self.options.tls_intolerance_type,
                              self.options.signed_cert_timestamps_tls_ext.decode(
                                  "base64"),
                              self.options.fallback_scsv,
-                             stapled_ocsp_response)
-        print 'HTTPS server started on %s:%d...' % (host, server.server_port)
+                             stapled_ocsp_response,
+                             self.options.disable_session_cache)
+        print 'HTTPS server started on https://%s:%d...' % \
+            (host, server.server_port)
       else:
         server = HTTPServer((host, port), TestPageHandler)
-        print 'HTTP server started on %s:%d...' % (host, server.server_port)
+        print 'HTTP server started on http://%s:%d...' % \
+            (host, server.server_port)
 
       server.data_dir = self.__make_data_dir()
       server.file_root_url = self.options.file_root_url
@@ -1979,11 +2027,14 @@ class ServerRunner(testserver_base.TestServerRunner):
       # is required to work correctly. It should be fixed from pywebsocket side.
       os.chdir(self.__make_data_dir())
       websocket_options = WebSocketOptions(host, port, '.')
+      scheme = "ws"
       if self.options.cert_and_key_file:
+        scheme = "wss"
         websocket_options.use_tls = True
         websocket_options.private_key = self.options.cert_and_key_file
         websocket_options.certificate = self.options.cert_and_key_file
       if self.options.ssl_client_auth:
+        websocket_options.tls_client_cert_optional = False
         websocket_options.tls_client_auth = True
         if len(self.options.ssl_client_ca) != 1:
           raise testserver_base.OptionError(
@@ -1994,8 +2045,10 @@ class ServerRunner(testserver_base.TestServerRunner):
               self.options.ssl_client_ca[0] + ' exiting...')
         websocket_options.tls_client_ca = self.options.ssl_client_ca[0]
       server = WebSocketServer(websocket_options)
-      print 'WebSocket server started on %s:%d...' % (host, server.server_port)
+      print 'WebSocket server started on %s://%s:%d...' % \
+          (scheme, host, server.server_port)
       server_data['port'] = server.server_port
+      websocket_options.use_basic_auth = self.options.ws_basic_auth
     elif self.options.server_type == SERVER_TCP_ECHO:
       # Used for generating the key (randomly) that encodes the "echo request"
       # message.
@@ -2055,6 +2108,11 @@ class ServerRunner(testserver_base.TestServerRunner):
 
   def add_options(self):
     testserver_base.TestServerRunner.add_options(self)
+    self.option_parser.add_option('--disable-session-cache',
+                                  action='store_true',
+                                  dest='disable_session_cache',
+                                  help='tells the server to disable the'
+                                  'TLS session cache.')
     self.option_parser.add_option('-f', '--ftp', action='store_const',
                                   const=SERVER_FTP, default=SERVER_HTTP,
                                   dest='server_type',
@@ -2100,6 +2158,12 @@ class ServerRunner(testserver_base.TestServerRunner):
                                   'aborted. 2 means TLS 1.1 or higher will be '
                                   'aborted. 3 means TLS 1.2 or higher will be '
                                   'aborted.')
+    self.option_parser.add_option('--tls-intolerance-type',
+                                  dest='tls_intolerance_type',
+                                  default="alert",
+                                  help='Controls how the server reacts to a '
+                                  'TLS version it is intolerant to. Valid '
+                                  'values are "alert", "close", and "reset".')
     self.option_parser.add_option('--signed-cert-timestamps-tls-ext',
                                   dest='signed_cert_timestamps_tls_ext',
                                   default='',
@@ -2137,6 +2201,15 @@ class ServerRunner(testserver_base.TestServerRunner):
                                   'file. This option may appear multiple '
                                   'times, indicating multiple CA names should '
                                   'be sent in the request.')
+    self.option_parser.add_option('--ssl-client-cert-type', action='append',
+                                  default=[], help='Specify that the client '
+                                  'certificate request should include the '
+                                  'specified certificate_type value. This '
+                                  'option may appear multiple times, '
+                                  'indicating multiple values should be send '
+                                  'in the request. Valid values are '
+                                  '"rsa_sign", "dss_sign", and "ecdsa_sign". '
+                                  'If omitted, "rsa_sign" will be used.')
     self.option_parser.add_option('--ssl-bulk-cipher', action='append',
                                   help='Specify the bulk encryption '
                                   'algorithm(s) that will be accepted by the '
@@ -2145,8 +2218,27 @@ class ServerRunner(testserver_base.TestServerRunner):
                                   'algorithms will be used. This option may '
                                   'appear multiple times, indicating '
                                   'multiple algorithms should be enabled.');
+    self.option_parser.add_option('--ssl-key-exchange', action='append',
+                                  help='Specify the key exchange algorithm(s)'
+                                  'that will be accepted by the SSL server. '
+                                  'Valid values are "rsa", "dhe_rsa". If '
+                                  'omitted, all algorithms will be used. This '
+                                  'option may appear multiple times, '
+                                  'indicating multiple algorithms should be '
+                                  'enabled.');
+    # TODO(davidben): Add ALPN support to tlslite.
+    self.option_parser.add_option('--enable-npn', dest='enable_npn',
+                                  default=False, const=True,
+                                  action='store_const',
+                                  help='Enable server support for the NPN '
+                                  'extension. The server will advertise '
+                                  'support for exactly one protocol, http/1.1')
     self.option_parser.add_option('--file-root-url', default='/files/',
                                   help='Specify a root URL for files served.')
+    # TODO(ricea): Generalize this to support basic auth for HTTP too.
+    self.option_parser.add_option('--ws-basic-auth', action='store_true',
+                                  dest='ws_basic_auth',
+                                  help='Enable basic-auth for WebSocket')
 
 
 if __name__ == '__main__':