# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
"""HTTP2 Test Server"""
import argparse
import test_data_frame_padding
_TEST_CASE_MAPPING = {
- 'rst_after_header': test_rst_after_header.TestcaseRstStreamAfterHeader,
- 'rst_after_data': test_rst_after_data.TestcaseRstStreamAfterData,
- 'rst_during_data': test_rst_during_data.TestcaseRstStreamDuringData,
- 'goaway': test_goaway.TestcaseGoaway,
- 'ping': test_ping.TestcasePing,
- 'max_streams': test_max_streams.TestcaseSettingsMaxStreams,
-
- # Positive tests below:
- 'data_frame_padding': test_data_frame_padding.TestDataFramePadding,
- 'no_df_padding_sanity_test': test_data_frame_padding.TestDataFramePadding,
+ 'rst_after_header': test_rst_after_header.TestcaseRstStreamAfterHeader,
+ 'rst_after_data': test_rst_after_data.TestcaseRstStreamAfterData,
+ 'rst_during_data': test_rst_during_data.TestcaseRstStreamDuringData,
+ 'goaway': test_goaway.TestcaseGoaway,
+ 'ping': test_ping.TestcasePing,
+ 'max_streams': test_max_streams.TestcaseSettingsMaxStreams,
+
+ # Positive tests below:
+ 'data_frame_padding': test_data_frame_padding.TestDataFramePadding,
+ 'no_df_padding_sanity_test': test_data_frame_padding.TestDataFramePadding,
}
_exit_code = 0
+
class H2Factory(twisted.internet.protocol.Factory):
- def __init__(self, testcase):
- logging.info('Creating H2Factory for new connection (%s)', testcase)
- self._num_streams = 0
- self._testcase = testcase
-
- def buildProtocol(self, addr):
- self._num_streams += 1
- logging.info('New Connection: %d' % self._num_streams)
- if not _TEST_CASE_MAPPING.has_key(self._testcase):
- logging.error('Unknown test case: %s' % self._testcase)
- assert(0)
- else:
- t = _TEST_CASE_MAPPING[self._testcase]
-
- if self._testcase == 'goaway':
- return t(self._num_streams).get_base_server()
- elif self._testcase == 'no_df_padding_sanity_test':
- return t(use_padding=False).get_base_server()
- else:
- return t().get_base_server()
+
+ def __init__(self, testcase):
+ logging.info('Creating H2Factory for new connection (%s)', testcase)
+ self._num_streams = 0
+ self._testcase = testcase
+
+ def buildProtocol(self, addr):
+ self._num_streams += 1
+ logging.info('New Connection: %d' % self._num_streams)
+ if not _TEST_CASE_MAPPING.has_key(self._testcase):
+ logging.error('Unknown test case: %s' % self._testcase)
+ assert (0)
+ else:
+ t = _TEST_CASE_MAPPING[self._testcase]
+
+ if self._testcase == 'goaway':
+ return t(self._num_streams).get_base_server()
+ elif self._testcase == 'no_df_padding_sanity_test':
+ return t(use_padding=False).get_base_server()
+ else:
+ return t().get_base_server()
+
def parse_arguments():
- parser = argparse.ArgumentParser()
- parser.add_argument('--base_port', type=int, default=8080,
- help='base port to run the servers (default: 8080). One test server is '
- 'started on each incrementing port, beginning with base_port, in the '
- 'following order: data_frame_padding,goaway,max_streams,'
- 'no_df_padding_sanity_test,ping,rst_after_data,rst_after_header,'
- 'rst_during_data'
- )
- return parser.parse_args()
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--base_port',
+ type=int,
+ default=8080,
+ help='base port to run the servers (default: 8080). One test server is '
+ 'started on each incrementing port, beginning with base_port, in the '
+ 'following order: data_frame_padding,goaway,max_streams,'
+ 'no_df_padding_sanity_test,ping,rst_after_data,rst_after_header,'
+ 'rst_during_data')
+ return parser.parse_args()
+
def listen(endpoint, test_case):
- deferred = endpoint.listen(H2Factory(test_case))
- def listen_error(reason):
- # If listening fails, we stop the reactor and exit the program
- # with exit code 1.
- global _exit_code
- _exit_code = 1
- logging.error('Listening failed: %s' % reason.value)
- twisted.internet.reactor.stop()
- deferred.addErrback(listen_error)
+ deferred = endpoint.listen(H2Factory(test_case))
+
+ def listen_error(reason):
+ # If listening fails, we stop the reactor and exit the program
+ # with exit code 1.
+ global _exit_code
+ _exit_code = 1
+ logging.error('Listening failed: %s' % reason.value)
+ twisted.internet.reactor.stop()
+
+ deferred.addErrback(listen_error)
+
def start_test_servers(base_port):
- """ Start one server per test case on incrementing port numbers
+ """ Start one server per test case on incrementing port numbers
beginning with base_port """
- index = 0
- for test_case in sorted(_TEST_CASE_MAPPING.keys()):
- portnum = base_port + index
- logging.warning('serving on port %d : %s'%(portnum, test_case))
- endpoint = twisted.internet.endpoints.TCP4ServerEndpoint(
- twisted.internet.reactor, portnum, backlog=128)
- # Wait until the reactor is running before calling endpoint.listen().
- twisted.internet.reactor.callWhenRunning(listen, endpoint, test_case)
+ index = 0
+ for test_case in sorted(_TEST_CASE_MAPPING.keys()):
+ portnum = base_port + index
+ logging.warning('serving on port %d : %s' % (portnum, test_case))
+ endpoint = twisted.internet.endpoints.TCP4ServerEndpoint(
+ twisted.internet.reactor, portnum, backlog=128)
+ # Wait until the reactor is running before calling endpoint.listen().
+ twisted.internet.reactor.callWhenRunning(listen, endpoint, test_case)
+
+ index += 1
- index += 1
if __name__ == '__main__':
- logging.basicConfig(
- format='%(levelname) -10s %(asctime)s %(module)s:%(lineno)s | %(message)s',
- level=logging.INFO)
- args = parse_arguments()
- start_test_servers(args.base_port)
- twisted.internet.reactor.run()
- sys.exit(_exit_code)
+ logging.basicConfig(
+ format=
+ '%(levelname) -10s %(asctime)s %(module)s:%(lineno)s | %(message)s',
+ level=logging.INFO)
+ args = parse_arguments()
+ start_test_servers(args.base_port)
+ twisted.internet.reactor.run()
+ sys.exit(_exit_code)