2 # Copyright 2020 gRPC authors.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Run xDS integration tests on GCP using Traffic Director."""
18 import googleapiclient.discovery
32 from oauth2client.client import GoogleCredentials
34 import python_utils.jobset as jobset
35 import python_utils.report_utils as report_utils
37 from src.proto.grpc.health.v1 import health_pb2
38 from src.proto.grpc.health.v1 import health_pb2_grpc
39 from src.proto.grpc.testing import empty_pb2
40 from src.proto.grpc.testing import messages_pb2
41 from src.proto.grpc.testing import test_pb2_grpc
43 logger = logging.getLogger()
44 console_handler = logging.StreamHandler()
45 formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s')
46 console_handler.setFormatter(formatter)
48 logger.addHandler(console_handler)
49 logger.setLevel(logging.WARNING)
53 'change_backend_service',
55 'load_report_based_failover',
57 'remove_instance_group',
59 'secondary_locality_gets_no_requests_on_partial_primary_failure',
60 'secondary_locality_gets_requests_on_primary_failure',
63 # Valid test cases, but not in all. So the tests can only run manually, and
64 # aren't enabled automatically for all languages.
66 # TODO: Move them into _TEST_CASES when support is ready in all languages.
67 _ADDITIONAL_TEST_CASES = [
75 # Test cases that require the V3 API. Skipped in older runs.
76 _V3_TEST_CASES = frozenset(['timeout', 'fault_injection'])
78 # Test cases that require the alpha API. Skipped for stable API runs.
79 _ALPHA_TEST_CASES = frozenset(['timeout'])
82 def parse_test_cases(arg):
85 arg_split = arg.split(',')
87 all_test_cases = _TEST_CASES + _ADDITIONAL_TEST_CASES
90 test_cases = test_cases.union(_TEST_CASES)
92 test_cases = test_cases.union([arg])
93 if not all([test_case in all_test_cases for test_case in test_cases]):
94 raise Exception('Failed to parse test cases %s' % arg)
96 return [x for x in all_test_cases if x in test_cases]
99 def parse_port_range(port_arg):
102 return range(port, port + 1)
104 port_min, port_max = port_arg.split(':')
105 return range(int(port_min), int(port_max) + 1)
108 argp = argparse.ArgumentParser(description='Run xDS interop tests on GCP')
109 # TODO(zdapeng): remove default value of project_id and project_num
110 argp.add_argument('--project_id', default='grpc-testing', help='GCP project id')
111 argp.add_argument('--project_num',
112 default='830293263384',
113 help='GCP project number')
117 help='Optional suffix for all generated GCP resource names. Useful to '
118 'ensure distinct names across test runs.')
122 type=parse_test_cases,
123 help='Comma-separated list of test cases to run. Available tests: %s, '
124 '(or \'all\' to run every test). '
125 'Alternative tests not included in \'all\': %s' %
126 (','.join(_TEST_CASES), ','.join(_ADDITIONAL_TEST_CASES)))
130 help='File to reference via GRPC_XDS_BOOTSTRAP. Disables built-in '
131 'bootstrap generation')
136 help='Support xDS v3 via GRPC_XDS_EXPERIMENTAL_V3_SUPPORT. '
137 'If a pre-created bootstrap file is provided via the --bootstrap_file '
138 'parameter, it should include xds_v3 in its server_features field.')
142 help='Command to launch xDS test client. {server_uri}, {stats_port} and '
143 '{qps} references will be replaced using str.format(). GRPC_XDS_BOOTSTRAP '
144 'will be set for the command')
148 help='Comma-separated list of hosts running client processes. If set, '
149 '--client_cmd is ignored and client processes are assumed to be running on '
150 'the specified hosts.')
151 argp.add_argument('--zone', default='us-central1-a')
152 argp.add_argument('--secondary_zone',
153 default='us-west1-b',
154 help='Zone to use for secondary TD locality tests')
155 argp.add_argument('--qps', default=100, type=int, help='Client QPS')
157 '--wait_for_backend_sec',
160 help='Time limit for waiting for created backend services to report '
161 'healthy when launching or updated GCP resources')
163 '--use_existing_gcp_resources',
167 'If set, find and use already created GCP resources instead of creating new'
170 '--keep_gcp_resources',
174 'Leave GCP VMs and configuration running after test. Default behavior is '
175 'to delete when tests complete.')
177 '--compute_discovery_document',
181 'If provided, uses this file instead of retrieving via the GCP discovery '
184 '--alpha_compute_discovery_document',
187 help='If provided, uses this file instead of retrieving via the alpha GCP '
189 argp.add_argument('--network',
190 default='global/networks/default',
191 help='GCP network to use')
192 argp.add_argument('--service_port_range',
194 type=parse_port_range,
195 help='Listening port for created gRPC backends. Specified as '
196 'either a single int or as a range in the format min:max, in '
197 'which case an available port p will be chosen s.t. min <= p '
203 help='Local port for the client process to expose the LB stats service')
204 argp.add_argument('--xds_server',
205 default='trafficdirector.googleapis.com:443',
207 argp.add_argument('--source_image',
208 default='projects/debian-cloud/global/images/family/debian-9',
209 help='Source image for VMs created during the test')
210 argp.add_argument('--path_to_server_binary',
213 help='If set, the server binary must already be pre-built on '
214 'the specified source image')
215 argp.add_argument('--machine_type',
216 default='e2-standard-2',
217 help='Machine type for VMs created during the test')
219 '--instance_group_size',
222 help='Number of VMs to create per instance group. Certain test cases (e.g., '
223 'round_robin) may not give meaningful results if this is set to a value '
225 argp.add_argument('--verbose',
226 help='verbose log output',
229 # TODO(ericgribkoff) Remove this param once the sponge-formatted log files are
230 # visible in all test environments.
231 argp.add_argument('--log_client_output',
232 help='Log captured client output',
235 # TODO(ericgribkoff) Remove this flag once all test environments are verified to
236 # have access to the alpha compute APIs.
237 argp.add_argument('--only_stable_gcp_apis',
238 help='Do not use alpha compute APIs. Some tests may be '
239 'incompatible with this option (gRPC health checks are '
240 'currently alpha and required for simulating server failure',
243 args = argp.parse_args()
246 logger.setLevel(logging.DEBUG)
249 if args.client_hosts:
250 CLIENT_HOSTS = args.client_hosts.split(',')
252 _DEFAULT_SERVICE_PORT = 80
253 _WAIT_FOR_BACKEND_SEC = args.wait_for_backend_sec
254 _WAIT_FOR_OPERATION_SEC = 1200
255 _INSTANCE_GROUP_SIZE = args.instance_group_size
256 _NUM_TEST_RPCS = 10 * args.qps
257 _WAIT_FOR_STATS_SEC = 360
258 _WAIT_FOR_VALID_CONFIG_SEC = 60
259 _WAIT_FOR_URL_MAP_PATCH_SEC = 300
260 _CONNECTION_TIMEOUT_SEC = 60
262 _BOOTSTRAP_TEMPLATE = """
267 "TRAFFICDIRECTOR_NETWORK_NAME": "%s"
277 "type": "google_default",
281 "server_features": {server_features}
283 }}""" % (args.network.split('/')[-1], args.zone, args.xds_server)
285 # TODO(ericgribkoff) Add change_backend_service to this list once TD no longer
286 # sends an update with no localities when adding the MIG to the backend service
287 # can race with the URL map patch.
288 _TESTS_TO_FAIL_ON_RPC_FAILURE = ['ping_pong', 'round_robin']
289 # Tests that run UnaryCall and EmptyCall.
290 _TESTS_TO_RUN_MULTIPLE_RPCS = ['path_matching', 'header_matching']
291 # Tests that make UnaryCall with test metadata.
292 _TESTS_TO_SEND_METADATA = ['header_matching']
293 _TEST_METADATA_KEY = 'xds_md'
294 _TEST_METADATA_VALUE_UNARY = 'unary_yranu'
295 _TEST_METADATA_VALUE_EMPTY = 'empty_ytpme'
296 # Extra RPC metadata whose value is a number, sent with UnaryCall only.
297 _TEST_METADATA_NUMERIC_KEY = 'xds_md_numeric'
298 _TEST_METADATA_NUMERIC_VALUE = '159'
299 _PATH_MATCHER_NAME = 'path-matcher'
300 _BASE_TEMPLATE_NAME = 'test-template'
301 _BASE_INSTANCE_GROUP_NAME = 'test-ig'
302 _BASE_HEALTH_CHECK_NAME = 'test-hc'
303 _BASE_FIREWALL_RULE_NAME = 'test-fw-rule'
304 _BASE_BACKEND_SERVICE_NAME = 'test-backend-service'
305 _BASE_URL_MAP_NAME = 'test-map'
306 _BASE_SERVICE_HOST = 'grpc-test'
307 _BASE_TARGET_PROXY_NAME = 'test-target-proxy'
308 _BASE_FORWARDING_RULE_NAME = 'test-forwarding-rule'
309 _TEST_LOG_BASE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)),
311 _SPONGE_LOG_NAME = 'sponge_log.log'
312 _SPONGE_XML_NAME = 'sponge_log.xml'
315 def get_client_stats(num_rpcs, timeout_sec):
319 hosts = ['localhost']
321 with grpc.insecure_channel('%s:%d' %
322 (host, args.stats_port)) as channel:
323 stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel)
324 request = messages_pb2.LoadBalancerStatsRequest()
325 request.num_rpcs = num_rpcs
326 request.timeout_sec = timeout_sec
327 rpc_timeout = timeout_sec + _CONNECTION_TIMEOUT_SEC
328 logger.debug('Invoking GetClientStats RPC to %s:%d:', host,
330 response = stub.GetClientStats(request,
333 logger.debug('Invoked GetClientStats RPC to %s: %s', host, response)
337 def get_client_accumulated_stats():
341 hosts = ['localhost']
343 with grpc.insecure_channel('%s:%d' %
344 (host, args.stats_port)) as channel:
345 stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel)
346 request = messages_pb2.LoadBalancerAccumulatedStatsRequest()
347 logger.debug('Invoking GetClientAccumulatedStats RPC to %s:%d:',
348 host, args.stats_port)
349 response = stub.GetClientAccumulatedStats(
350 request, wait_for_ready=True, timeout=_CONNECTION_TIMEOUT_SEC)
351 logger.debug('Invoked GetClientAccumulatedStats RPC to %s: %s',
356 def configure_client(rpc_types, metadata=[], timeout_sec=None):
360 hosts = ['localhost']
362 with grpc.insecure_channel('%s:%d' %
363 (host, args.stats_port)) as channel:
364 stub = test_pb2_grpc.XdsUpdateClientConfigureServiceStub(channel)
365 request = messages_pb2.ClientConfigureRequest()
366 request.types.extend(rpc_types)
367 for rpc_type, md_key, md_value in metadata:
368 md = request.metadata.add()
373 request.timeout_sec = timeout_sec
375 'Invoking XdsUpdateClientConfigureService RPC to %s:%d: %s',
376 host, args.stats_port, request)
377 stub.Configure(request,
379 timeout=_CONNECTION_TIMEOUT_SEC)
380 logger.debug('Invoked XdsUpdateClientConfigureService RPC to %s',
384 class RpcDistributionError(Exception):
388 def _verify_rpcs_to_given_backends(backends, timeout_sec, num_rpcs,
390 start_time = time.time()
392 logger.debug('Waiting for %d sec until backends %s receive load' %
393 (timeout_sec, backends))
394 while time.time() - start_time <= timeout_sec:
396 stats = get_client_stats(num_rpcs, timeout_sec)
397 rpcs_by_peer = stats.rpcs_by_peer
398 for backend in backends:
399 if backend not in rpcs_by_peer:
400 error_msg = 'Backend %s did not receive load' % backend
402 if not error_msg and len(rpcs_by_peer) > len(backends):
403 error_msg = 'Unexpected backend received load: %s' % rpcs_by_peer
404 if not allow_failures and stats.num_failures > 0:
405 error_msg = '%d RPCs failed' % stats.num_failures
408 raise RpcDistributionError(error_msg)
411 def wait_until_all_rpcs_go_to_given_backends_or_fail(backends,
413 num_rpcs=_NUM_TEST_RPCS):
414 _verify_rpcs_to_given_backends(backends,
420 def wait_until_all_rpcs_go_to_given_backends(backends,
422 num_rpcs=_NUM_TEST_RPCS):
423 _verify_rpcs_to_given_backends(backends,
426 allow_failures=False)
429 def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold):
430 '''Block until the test client reaches the state with the given number
431 of RPCs being outstanding stably.
434 rpc_type: A string indicating the RPC method to check for. Either
435 'UnaryCall' or 'EmptyCall'.
436 timeout_sec: Maximum number of seconds to wait until the desired state
438 num_rpcs: Expected number of RPCs to be in-flight.
439 threshold: Number within [0,100], the tolerable percentage by which
440 the actual number of RPCs in-flight can differ from the expected number.
442 if threshold < 0 or threshold > 100:
443 raise ValueError('Value error: Threshold should be between 0 to 100')
444 threshold_fraction = threshold / 100.0
445 start_time = time.time()
448 'Waiting for %d sec until %d %s RPCs (with %d%% tolerance) in-flight' %
449 (timeout_sec, num_rpcs, rpc_type, threshold))
450 while time.time() - start_time <= timeout_sec:
451 error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold,
454 logger.debug('Progress: %s', error_msg)
458 # Ensure the number of outstanding RPCs is stable.
461 error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold,
464 raise Exception("Wrong number of %s RPCs in-flight: %s" %
465 (rpc_type, error_msg))
468 def _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, threshold_fraction):
470 stats = get_client_accumulated_stats()
471 rpcs_started = stats.num_rpcs_started_by_method[rpc_type]
472 rpcs_succeeded = stats.num_rpcs_succeeded_by_method[rpc_type]
473 rpcs_failed = stats.num_rpcs_failed_by_method[rpc_type]
474 rpcs_in_flight = rpcs_started - rpcs_succeeded - rpcs_failed
475 if rpcs_in_flight < (num_rpcs * (1 - threshold_fraction)):
476 error_msg = ('actual(%d) < expected(%d - %d%%)' %
477 (rpcs_in_flight, num_rpcs, threshold))
478 elif rpcs_in_flight > (num_rpcs * (1 + threshold_fraction)):
479 error_msg = ('actual(%d) > expected(%d + %d%%)' %
480 (rpcs_in_flight, num_rpcs, threshold))
484 def compare_distributions(actual_distribution, expected_distribution,
486 """Compare if two distributions are similar.
489 actual_distribution: A list of floats, contains the actual distribution.
490 expected_distribution: A list of floats, contains the expected distribution.
491 threshold: Number within [0,100], the threshold percentage by which the
492 actual distribution can differ from the expected distribution.
495 The similarity between the distributions as a boolean. Returns true if the
496 actual distribution lies within the threshold of the expected
497 distribution, false otherwise.
500 ValueError: if threshold is not with in [0,100].
501 Exception: containing detailed error messages.
503 if len(expected_distribution) != len(actual_distribution):
505 'Error: expected and actual distributions have different size (%d vs %d)'
506 % (len(expected_distribution), len(actual_distribution)))
507 if threshold < 0 or threshold > 100:
508 raise ValueError('Value error: Threshold should be between 0 to 100')
509 threshold_fraction = threshold / 100.0
510 for expected, actual in zip(expected_distribution, actual_distribution):
511 if actual < (expected * (1 - threshold_fraction)):
512 raise Exception("actual(%f) < expected(%f-%d%%)" %
513 (actual, expected, threshold))
514 if actual > (expected * (1 + threshold_fraction)):
515 raise Exception("actual(%f) > expected(%f+%d%%)" %
516 (actual, expected, threshold))
520 def compare_expected_instances(stats, expected_instances):
521 """Compare if stats have expected instances for each type of RPC.
524 stats: LoadBalancerStatsResponse reported by interop client.
525 expected_instances: a dict with key as the RPC type (string), value as
526 the expected backend instances (list of strings).
529 Returns true if the instances are expected. False if not.
531 for rpc_type, expected_peers in expected_instances.items():
532 rpcs_by_peer_for_type = stats.rpcs_by_method[rpc_type]
533 rpcs_by_peer = rpcs_by_peer_for_type.rpcs_by_peer if rpcs_by_peer_for_type else None
534 logger.debug('rpc: %s, by_peer: %s', rpc_type, rpcs_by_peer)
535 peers = list(rpcs_by_peer.keys())
536 if set(peers) != set(expected_peers):
537 logger.info('unexpected peers for %s, got %s, want %s', rpc_type,
538 peers, expected_peers)
543 def test_backends_restart(gcp, backend_service, instance_group):
544 logger.info('Running test_backends_restart')
545 instance_names = get_instance_names(gcp, instance_group)
546 num_instances = len(instance_names)
547 start_time = time.time()
548 wait_until_all_rpcs_go_to_given_backends(instance_names,
551 resize_instance_group(gcp, instance_group, 0)
552 wait_until_all_rpcs_go_to_given_backends_or_fail([],
553 _WAIT_FOR_BACKEND_SEC)
555 resize_instance_group(gcp, instance_group, num_instances)
556 wait_for_healthy_backends(gcp, backend_service, instance_group)
557 new_instance_names = get_instance_names(gcp, instance_group)
558 wait_until_all_rpcs_go_to_given_backends(new_instance_names,
559 _WAIT_FOR_BACKEND_SEC)
562 def test_change_backend_service(gcp, original_backend_service, instance_group,
563 alternate_backend_service,
564 same_zone_instance_group):
565 logger.info('Running test_change_backend_service')
566 original_backend_instances = get_instance_names(gcp, instance_group)
567 alternate_backend_instances = get_instance_names(gcp,
568 same_zone_instance_group)
569 patch_backend_service(gcp, alternate_backend_service,
570 [same_zone_instance_group])
571 wait_for_healthy_backends(gcp, original_backend_service, instance_group)
572 wait_for_healthy_backends(gcp, alternate_backend_service,
573 same_zone_instance_group)
574 wait_until_all_rpcs_go_to_given_backends(original_backend_instances,
577 patch_url_map_backend_service(gcp, alternate_backend_service)
578 wait_until_all_rpcs_go_to_given_backends(alternate_backend_instances,
579 _WAIT_FOR_URL_MAP_PATCH_SEC)
581 patch_url_map_backend_service(gcp, original_backend_service)
582 patch_backend_service(gcp, alternate_backend_service, [])
585 def test_gentle_failover(gcp,
587 primary_instance_group,
588 secondary_instance_group,
589 swapped_primary_and_secondary=False):
590 logger.info('Running test_gentle_failover')
591 num_primary_instances = len(get_instance_names(gcp, primary_instance_group))
592 min_instances_for_gentle_failover = 3 # Need >50% failure to start failover
594 if num_primary_instances < min_instances_for_gentle_failover:
595 resize_instance_group(gcp, primary_instance_group,
596 min_instances_for_gentle_failover)
597 patch_backend_service(
598 gcp, backend_service,
599 [primary_instance_group, secondary_instance_group])
600 primary_instance_names = get_instance_names(gcp, primary_instance_group)
601 secondary_instance_names = get_instance_names(gcp,
602 secondary_instance_group)
603 wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
604 wait_for_healthy_backends(gcp, backend_service,
605 secondary_instance_group)
606 wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
608 instances_to_stop = primary_instance_names[:-1]
609 remaining_instances = primary_instance_names[-1:]
611 set_serving_status(instances_to_stop,
614 wait_until_all_rpcs_go_to_given_backends(
615 remaining_instances + secondary_instance_names,
616 _WAIT_FOR_BACKEND_SEC)
618 set_serving_status(primary_instance_names,
621 except RpcDistributionError as e:
622 if not swapped_primary_and_secondary and is_primary_instance_group(
623 gcp, secondary_instance_group):
624 # Swap expectation of primary and secondary instance groups.
625 test_gentle_failover(gcp,
627 secondary_instance_group,
628 primary_instance_group,
629 swapped_primary_and_secondary=True)
633 patch_backend_service(gcp, backend_service, [primary_instance_group])
634 resize_instance_group(gcp, primary_instance_group,
635 num_primary_instances)
636 instance_names = get_instance_names(gcp, primary_instance_group)
637 wait_until_all_rpcs_go_to_given_backends(instance_names,
638 _WAIT_FOR_BACKEND_SEC)
641 def test_load_report_based_failover(gcp, backend_service,
642 primary_instance_group,
643 secondary_instance_group):
644 logger.info('Running test_load_report_based_failover')
646 patch_backend_service(
647 gcp, backend_service,
648 [primary_instance_group, secondary_instance_group])
649 primary_instance_names = get_instance_names(gcp, primary_instance_group)
650 secondary_instance_names = get_instance_names(gcp,
651 secondary_instance_group)
652 wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
653 wait_for_healthy_backends(gcp, backend_service,
654 secondary_instance_group)
655 wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
657 # Set primary locality's balance mode to RATE, and RPS to 20% of the
658 # client's QPS. The secondary locality will be used.
659 max_rate = int(args.qps * 1 / 5)
660 logger.info('Patching backend service to RATE with %d max_rate',
662 patch_backend_service(
664 backend_service, [primary_instance_group, secondary_instance_group],
665 balancing_mode='RATE',
667 wait_until_all_rpcs_go_to_given_backends(
668 primary_instance_names + secondary_instance_names,
669 _WAIT_FOR_BACKEND_SEC)
671 # Set primary locality's balance mode to RATE, and RPS to 120% of the
672 # client's QPS. Only the primary locality will be used.
673 max_rate = int(args.qps * 6 / 5)
674 logger.info('Patching backend service to RATE with %d max_rate',
676 patch_backend_service(
678 backend_service, [primary_instance_group, secondary_instance_group],
679 balancing_mode='RATE',
681 wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
682 _WAIT_FOR_BACKEND_SEC)
683 logger.info("success")
685 patch_backend_service(gcp, backend_service, [primary_instance_group])
686 instance_names = get_instance_names(gcp, primary_instance_group)
687 wait_until_all_rpcs_go_to_given_backends(instance_names,
688 _WAIT_FOR_BACKEND_SEC)
691 def test_ping_pong(gcp, backend_service, instance_group):
692 logger.info('Running test_ping_pong')
693 wait_for_healthy_backends(gcp, backend_service, instance_group)
694 instance_names = get_instance_names(gcp, instance_group)
695 wait_until_all_rpcs_go_to_given_backends(instance_names,
699 def test_remove_instance_group(gcp, backend_service, instance_group,
700 same_zone_instance_group):
701 logger.info('Running test_remove_instance_group')
703 patch_backend_service(gcp,
705 [instance_group, same_zone_instance_group],
706 balancing_mode='RATE')
707 wait_for_healthy_backends(gcp, backend_service, instance_group)
708 wait_for_healthy_backends(gcp, backend_service,
709 same_zone_instance_group)
710 instance_names = get_instance_names(gcp, instance_group)
711 same_zone_instance_names = get_instance_names(gcp,
712 same_zone_instance_group)
714 wait_until_all_rpcs_go_to_given_backends(
715 instance_names + same_zone_instance_names,
716 _WAIT_FOR_OPERATION_SEC)
717 remaining_instance_group = same_zone_instance_group
718 remaining_instance_names = same_zone_instance_names
719 except RpcDistributionError as e:
720 # If connected to TD in a different zone, we may route traffic to
721 # only one instance group. Determine which group that is to continue
722 # with the remainder of the test case.
724 wait_until_all_rpcs_go_to_given_backends(
725 instance_names, _WAIT_FOR_STATS_SEC)
726 remaining_instance_group = same_zone_instance_group
727 remaining_instance_names = same_zone_instance_names
728 except RpcDistributionError as e:
729 wait_until_all_rpcs_go_to_given_backends(
730 same_zone_instance_names, _WAIT_FOR_STATS_SEC)
731 remaining_instance_group = instance_group
732 remaining_instance_names = instance_names
733 patch_backend_service(gcp,
734 backend_service, [remaining_instance_group],
735 balancing_mode='RATE')
736 wait_until_all_rpcs_go_to_given_backends(remaining_instance_names,
737 _WAIT_FOR_BACKEND_SEC)
739 patch_backend_service(gcp, backend_service, [instance_group])
740 wait_until_all_rpcs_go_to_given_backends(instance_names,
741 _WAIT_FOR_BACKEND_SEC)
744 def test_round_robin(gcp, backend_service, instance_group):
745 logger.info('Running test_round_robin')
746 wait_for_healthy_backends(gcp, backend_service, instance_group)
747 instance_names = get_instance_names(gcp, instance_group)
749 wait_until_all_rpcs_go_to_given_backends(instance_names,
751 # TODO(ericgribkoff) Delayed config propagation from earlier tests
752 # may result in briefly receiving an empty EDS update, resulting in failed
753 # RPCs. Retry distribution validation if this occurs; long-term fix is
754 # creating new backend resources for each individual test case.
755 # Each attempt takes 10 seconds. Config propagation can take several
758 for i in range(max_attempts):
759 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
760 requests_received = [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer]
761 total_requests_received = sum(requests_received)
762 if total_requests_received != _NUM_TEST_RPCS:
763 logger.info('Unexpected RPC failures, retrying: %s', stats)
765 expected_requests = total_requests_received / len(instance_names)
766 for instance in instance_names:
767 if abs(stats.rpcs_by_peer[instance] -
768 expected_requests) > threshold:
770 'RPC peer distribution differs from expected by more than %d '
771 'for instance %s (%s)' % (threshold, instance, stats))
773 raise Exception('RPC failures persisted through %d retries' % max_attempts)
776 def test_secondary_locality_gets_no_requests_on_partial_primary_failure(
779 primary_instance_group,
780 secondary_instance_group,
781 swapped_primary_and_secondary=False):
783 'Running secondary_locality_gets_no_requests_on_partial_primary_failure'
786 patch_backend_service(
787 gcp, backend_service,
788 [primary_instance_group, secondary_instance_group])
789 wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
790 wait_for_healthy_backends(gcp, backend_service,
791 secondary_instance_group)
792 primary_instance_names = get_instance_names(gcp, primary_instance_group)
793 wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
795 instances_to_stop = primary_instance_names[:1]
796 remaining_instances = primary_instance_names[1:]
798 set_serving_status(instances_to_stop,
801 wait_until_all_rpcs_go_to_given_backends(remaining_instances,
802 _WAIT_FOR_BACKEND_SEC)
804 set_serving_status(primary_instance_names,
807 except RpcDistributionError as e:
808 if not swapped_primary_and_secondary and is_primary_instance_group(
809 gcp, secondary_instance_group):
810 # Swap expectation of primary and secondary instance groups.
811 test_secondary_locality_gets_no_requests_on_partial_primary_failure(
814 secondary_instance_group,
815 primary_instance_group,
816 swapped_primary_and_secondary=True)
820 patch_backend_service(gcp, backend_service, [primary_instance_group])
823 def test_secondary_locality_gets_requests_on_primary_failure(
826 primary_instance_group,
827 secondary_instance_group,
828 swapped_primary_and_secondary=False):
829 logger.info('Running secondary_locality_gets_requests_on_primary_failure')
831 patch_backend_service(
832 gcp, backend_service,
833 [primary_instance_group, secondary_instance_group])
834 wait_for_healthy_backends(gcp, backend_service, primary_instance_group)
835 wait_for_healthy_backends(gcp, backend_service,
836 secondary_instance_group)
837 primary_instance_names = get_instance_names(gcp, primary_instance_group)
838 secondary_instance_names = get_instance_names(gcp,
839 secondary_instance_group)
840 wait_until_all_rpcs_go_to_given_backends(primary_instance_names,
843 set_serving_status(primary_instance_names,
846 wait_until_all_rpcs_go_to_given_backends(secondary_instance_names,
847 _WAIT_FOR_BACKEND_SEC)
849 set_serving_status(primary_instance_names,
852 except RpcDistributionError as e:
853 if not swapped_primary_and_secondary and is_primary_instance_group(
854 gcp, secondary_instance_group):
855 # Swap expectation of primary and secondary instance groups.
856 test_secondary_locality_gets_requests_on_primary_failure(
859 secondary_instance_group,
860 primary_instance_group,
861 swapped_primary_and_secondary=True)
865 patch_backend_service(gcp, backend_service, [primary_instance_group])
868 def prepare_services_for_urlmap_tests(gcp, original_backend_service,
869 instance_group, alternate_backend_service,
870 same_zone_instance_group):
872 This function prepares the services to be ready for tests that modifies
876 Returns original and alternate backend names as lists of strings.
878 logger.info('waiting for original backends to become healthy')
879 wait_for_healthy_backends(gcp, original_backend_service, instance_group)
881 patch_backend_service(gcp, alternate_backend_service,
882 [same_zone_instance_group])
883 logger.info('waiting for alternate to become healthy')
884 wait_for_healthy_backends(gcp, alternate_backend_service,
885 same_zone_instance_group)
887 original_backend_instances = get_instance_names(gcp, instance_group)
888 logger.info('original backends instances: %s', original_backend_instances)
890 alternate_backend_instances = get_instance_names(gcp,
891 same_zone_instance_group)
892 logger.info('alternate backends instances: %s', alternate_backend_instances)
894 # Start with all traffic going to original_backend_service.
895 logger.info('waiting for traffic to all go to original backends')
896 wait_until_all_rpcs_go_to_given_backends(original_backend_instances,
898 return original_backend_instances, alternate_backend_instances
901 def test_traffic_splitting(gcp, original_backend_service, instance_group,
902 alternate_backend_service, same_zone_instance_group):
903 # This test start with all traffic going to original_backend_service. Then
904 # it updates URL-map to set default action to traffic splitting between
905 # original and alternate. It waits for all backends in both services to
906 # receive traffic, then verifies that weights are expected.
907 logger.info('Running test_traffic_splitting')
909 original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
910 gcp, original_backend_service, instance_group,
911 alternate_backend_service, same_zone_instance_group)
914 # Patch urlmap, change route action to traffic splitting between
915 # original and alternate.
916 logger.info('patching url map with traffic splitting')
917 original_service_percentage, alternate_service_percentage = 20, 80
918 patch_url_map_backend_service(
920 services_with_weights={
921 original_backend_service: original_service_percentage,
922 alternate_backend_service: alternate_service_percentage,
924 # Split percentage between instances: [20,80] -> [10,10,40,40].
925 expected_instance_percentage = [
926 original_service_percentage * 1.0 / len(original_backend_instances)
927 ] * len(original_backend_instances) + [
928 alternate_service_percentage * 1.0 /
929 len(alternate_backend_instances)
930 ] * len(alternate_backend_instances)
932 # Wait for traffic to go to both services.
934 'waiting for traffic to go to all backends (including alternate)')
935 wait_until_all_rpcs_go_to_given_backends(
936 original_backend_instances + alternate_backend_instances,
939 # Verify that weights between two services are expected.
941 # Each attempt takes about 10 seconds, 10 retries is equivalent to 100
943 for i in range(retry_count):
944 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
945 got_instance_count = [
946 stats.rpcs_by_peer[i] for i in original_backend_instances
947 ] + [stats.rpcs_by_peer[i] for i in alternate_backend_instances]
948 total_count = sum(got_instance_count)
949 got_instance_percentage = [
950 x * 100.0 / total_count for x in got_instance_count
954 compare_distributions(got_instance_percentage,
955 expected_instance_percentage, 5)
956 except Exception as e:
957 logger.info('attempt %d', i)
958 logger.info('got percentage: %s', got_instance_percentage)
959 logger.info('expected percentage: %s',
960 expected_instance_percentage)
962 if i == retry_count - 1:
964 'RPC distribution (%s) differs from expected (%s)' %
965 (got_instance_percentage, expected_instance_percentage))
967 logger.info("success")
970 patch_url_map_backend_service(gcp, original_backend_service)
971 patch_backend_service(gcp, alternate_backend_service, [])
974 def test_path_matching(gcp, original_backend_service, instance_group,
975 alternate_backend_service, same_zone_instance_group):
976 # This test start with all traffic (UnaryCall and EmptyCall) going to
977 # original_backend_service.
979 # Then it updates URL-map to add routes, to make UnaryCall and EmptyCall to
980 # go different backends. It waits for all backends in both services to
981 # receive traffic, then verifies that traffic goes to the expected
983 logger.info('Running test_path_matching')
985 original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
986 gcp, original_backend_service, instance_group,
987 alternate_backend_service, same_zone_instance_group)
990 # A list of tuples (route_rules, expected_instances).
995 # FullPath EmptyCall -> alternate_backend_service.
997 'fullPathMatch': '/grpc.testing.TestService/EmptyCall'
999 'service': alternate_backend_service.url
1002 "EmptyCall": alternate_backend_instances,
1003 "UnaryCall": original_backend_instances
1008 # Prefix UnaryCall -> alternate_backend_service.
1010 'prefixMatch': '/grpc.testing.TestService/Unary'
1012 'service': alternate_backend_service.url
1015 "UnaryCall": alternate_backend_instances,
1016 "EmptyCall": original_backend_instances
1019 # This test case is similar to the one above (but with route
1020 # services swapped). This test has two routes (full_path and
1021 # the default) to match EmptyCall, and both routes set
1022 # alternative_backend_service as the action. This forces the
1023 # client to handle duplicate Clusters in the RDS response.
1027 # Prefix UnaryCall -> original_backend_service.
1029 'prefixMatch': '/grpc.testing.TestService/Unary'
1031 'service': original_backend_service.url
1035 # FullPath EmptyCall -> alternate_backend_service.
1038 '/grpc.testing.TestService/EmptyCall'
1040 'service': alternate_backend_service.url
1044 "UnaryCall": original_backend_instances,
1045 "EmptyCall": alternate_backend_instances
1050 # Regex UnaryCall -> alternate_backend_service.
1053 '^\/.*\/UnaryCall$' # Unary methods with any services.
1055 'service': alternate_backend_service.url
1058 "UnaryCall": alternate_backend_instances,
1059 "EmptyCall": original_backend_instances
1064 # ignoreCase EmptyCall -> alternate_backend_service.
1066 # Case insensitive matching.
1067 'fullPathMatch': '/gRpC.tEsTinG.tEstseRvice/empTycaLl',
1070 'service': alternate_backend_service.url
1073 "UnaryCall": original_backend_instances,
1074 "EmptyCall": alternate_backend_instances
1078 for (route_rules, expected_instances) in test_cases:
1079 logger.info('patching url map with %s', route_rules)
1080 patch_url_map_backend_service(gcp,
1081 original_backend_service,
1082 route_rules=route_rules)
1084 # Wait for traffic to go to both services.
1086 'waiting for traffic to go to all backends (including alternate)'
1088 wait_until_all_rpcs_go_to_given_backends(
1089 original_backend_instances + alternate_backend_instances,
1090 _WAIT_FOR_STATS_SEC)
1093 # Each attempt takes about 5 seconds, 80 retries is equivalent to 400
1095 for i in range(retry_count):
1096 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
1097 if not stats.rpcs_by_method:
1099 'stats.rpcs_by_method is None, the interop client stats service does not support this test case'
1101 logger.info('attempt %d', i)
1102 if compare_expected_instances(stats, expected_instances):
1103 logger.info("success")
1105 elif i == retry_count - 1:
1107 'timeout waiting for RPCs to the expected instances: %s'
1108 % expected_instances)
1110 patch_url_map_backend_service(gcp, original_backend_service)
1111 patch_backend_service(gcp, alternate_backend_service, [])
1114 def test_header_matching(gcp, original_backend_service, instance_group,
1115 alternate_backend_service, same_zone_instance_group):
1116 # This test start with all traffic (UnaryCall and EmptyCall) going to
1117 # original_backend_service.
1119 # Then it updates URL-map to add routes, to make RPCs with test headers to
1120 # go to different backends. It waits for all backends in both services to
1121 # receive traffic, then verifies that traffic goes to the expected
1123 logger.info('Running test_header_matching')
1125 original_backend_instances, alternate_backend_instances = prepare_services_for_urlmap_tests(
1126 gcp, original_backend_service, instance_group,
1127 alternate_backend_service, same_zone_instance_group)
1130 # A list of tuples (route_rules, expected_instances).
1135 # Header ExactMatch -> alternate_backend_service.
1136 # EmptyCall is sent with the metadata.
1141 'headerName': _TEST_METADATA_KEY,
1142 'exactMatch': _TEST_METADATA_VALUE_EMPTY
1145 'service': alternate_backend_service.url
1148 "EmptyCall": alternate_backend_instances,
1149 "UnaryCall": original_backend_instances
1154 # Header PrefixMatch -> alternate_backend_service.
1155 # UnaryCall is sent with the metadata.
1160 'headerName': _TEST_METADATA_KEY,
1161 'prefixMatch': _TEST_METADATA_VALUE_UNARY[:2]
1164 'service': alternate_backend_service.url
1167 "EmptyCall": original_backend_instances,
1168 "UnaryCall": alternate_backend_instances
1173 # Header SuffixMatch -> alternate_backend_service.
1174 # EmptyCall is sent with the metadata.
1179 'headerName': _TEST_METADATA_KEY,
1180 'suffixMatch': _TEST_METADATA_VALUE_EMPTY[-2:]
1183 'service': alternate_backend_service.url
1186 "EmptyCall": alternate_backend_instances,
1187 "UnaryCall": original_backend_instances
1192 # Header 'xds_md_numeric' present -> alternate_backend_service.
1193 # UnaryCall is sent with the metadata, so will be sent to alternative.
1198 'headerName': _TEST_METADATA_NUMERIC_KEY,
1199 'presentMatch': True
1202 'service': alternate_backend_service.url
1205 "EmptyCall": original_backend_instances,
1206 "UnaryCall": alternate_backend_instances
1211 # Header invert ExactMatch -> alternate_backend_service.
1212 # UnaryCall is sent with the metadata, so will be sent to
1213 # original. EmptyCall will be sent to alternative.
1218 'headerName': _TEST_METADATA_KEY,
1219 'exactMatch': _TEST_METADATA_VALUE_UNARY,
1223 'service': alternate_backend_service.url
1226 "EmptyCall": alternate_backend_instances,
1227 "UnaryCall": original_backend_instances
1232 # Header 'xds_md_numeric' range [100,200] -> alternate_backend_service.
1233 # UnaryCall is sent with the metadata in range.
1238 'headerName': _TEST_METADATA_NUMERIC_KEY,
1240 'rangeStart': '100',
1245 'service': alternate_backend_service.url
1248 "EmptyCall": original_backend_instances,
1249 "UnaryCall": alternate_backend_instances
1254 # Header RegexMatch -> alternate_backend_service.
1255 # EmptyCall is sent with the metadata.
1263 "^%s.*%s$" % (_TEST_METADATA_VALUE_EMPTY[:2],
1264 _TEST_METADATA_VALUE_EMPTY[-2:])
1267 'service': alternate_backend_service.url
1270 "EmptyCall": alternate_backend_instances,
1271 "UnaryCall": original_backend_instances
1275 for (route_rules, expected_instances) in test_cases:
1276 logger.info('patching url map with %s -> alternative',
1277 route_rules[0]['matchRules'])
1278 patch_url_map_backend_service(gcp,
1279 original_backend_service,
1280 route_rules=route_rules)
1282 # Wait for traffic to go to both services.
1284 'waiting for traffic to go to all backends (including alternate)'
1286 wait_until_all_rpcs_go_to_given_backends(
1287 original_backend_instances + alternate_backend_instances,
1288 _WAIT_FOR_STATS_SEC)
1291 # Each attempt takes about 5 seconds, 80 retries is equivalent to 400
1293 for i in range(retry_count):
1294 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
1295 if not stats.rpcs_by_method:
1297 'stats.rpcs_by_method is None, the interop client stats service does not support this test case'
1299 logger.info('attempt %d', i)
1300 if compare_expected_instances(stats, expected_instances):
1301 logger.info("success")
1303 elif i == retry_count - 1:
1305 'timeout waiting for RPCs to the expected instances: %s'
1306 % expected_instances)
1308 patch_url_map_backend_service(gcp, original_backend_service)
1309 patch_backend_service(gcp, alternate_backend_service, [])
1312 def test_circuit_breaking(gcp, original_backend_service, instance_group,
1313 same_zone_instance_group):
1315 Since backend service circuit_breakers configuration cannot be unset,
1316 which causes trouble for restoring validate_for_proxy flag in target
1317 proxy/global forwarding rule. This test uses dedicated backend sevices.
1318 The url_map and backend services undergoes the following state changes:
1321 original_backend_service -> [instance_group]
1322 extra_backend_service -> []
1323 more_extra_backend_service -> []
1325 url_map -> [original_backend_service]
1328 extra_backend_service (with circuit_breakers) -> [instance_group]
1329 more_extra_backend_service (with circuit_breakers) -> [same_zone_instance_group]
1331 url_map -> [extra_backend_service, more_extra_backend_service]
1334 original_backend_service -> [instance_group]
1335 extra_backend_service (with circuit_breakers) -> []
1336 more_extra_backend_service (with circuit_breakers) -> []
1338 url_map -> [original_backend_service]
1340 logger.info('Running test_circuit_breaking')
1341 additional_backend_services = []
1343 # TODO(chengyuanzhang): Dedicated backend services created for circuit
1344 # breaking test. Once the issue for unsetting backend service circuit
1345 # breakers is resolved or configuring backend service circuit breakers is
1346 # enabled for config validation, these dedicated backend services can be
1348 extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-extra' + gcp_suffix
1349 more_extra_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-more-extra' + gcp_suffix
1350 extra_backend_service = add_backend_service(gcp,
1351 extra_backend_service_name)
1352 additional_backend_services.append(extra_backend_service)
1353 more_extra_backend_service = add_backend_service(
1354 gcp, more_extra_backend_service_name)
1355 additional_backend_services.append(more_extra_backend_service)
1356 # The config validation for proxyless doesn't allow setting
1357 # circuit_breakers. Disable validate validate_for_proxyless
1358 # for this test. This can be removed when validation
1359 # accepts circuit_breakers.
1360 logger.info('disabling validate_for_proxyless in target proxy')
1361 set_validate_for_proxyless(gcp, False)
1362 extra_backend_service_max_requests = 500
1363 more_extra_backend_service_max_requests = 1000
1364 patch_backend_service(gcp,
1365 extra_backend_service, [instance_group],
1368 extra_backend_service_max_requests
1370 logger.info('Waiting for extra backends to become healthy')
1371 wait_for_healthy_backends(gcp, extra_backend_service, instance_group)
1372 patch_backend_service(gcp,
1373 more_extra_backend_service,
1374 [same_zone_instance_group],
1377 more_extra_backend_service_max_requests
1379 logger.info('Waiting for more extra backend to become healthy')
1380 wait_for_healthy_backends(gcp, more_extra_backend_service,
1381 same_zone_instance_group)
1382 extra_backend_instances = get_instance_names(gcp, instance_group)
1383 more_extra_backend_instances = get_instance_names(
1384 gcp, same_zone_instance_group)
1388 # UnaryCall -> extra_backend_service
1390 'fullPathMatch': '/grpc.testing.TestService/UnaryCall'
1392 'service': extra_backend_service.url
1396 # EmptyCall -> more_extra_backend_service
1398 'fullPathMatch': '/grpc.testing.TestService/EmptyCall'
1400 'service': more_extra_backend_service.url
1404 # Make client send UNARY_CALL and EMPTY_CALL.
1406 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1407 messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL
1409 logger.info('Patching url map with %s', route_rules)
1410 patch_url_map_backend_service(gcp,
1411 extra_backend_service,
1412 route_rules=route_rules)
1413 logger.info('Waiting for traffic to go to all backends')
1414 wait_until_all_rpcs_go_to_given_backends(
1415 extra_backend_instances + more_extra_backend_instances,
1416 _WAIT_FOR_STATS_SEC)
1418 # Make all calls keep-open.
1420 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1421 messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL
1422 ], [(messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1423 'rpc-behavior', 'keep-open'),
1424 (messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL,
1425 'rpc-behavior', 'keep-open')])
1426 wait_until_rpcs_in_flight(
1427 'UNARY_CALL', (_WAIT_FOR_BACKEND_SEC +
1428 int(extra_backend_service_max_requests / args.qps)),
1429 extra_backend_service_max_requests, 1)
1430 logger.info('UNARY_CALL reached stable state (%d)',
1431 extra_backend_service_max_requests)
1432 wait_until_rpcs_in_flight(
1434 (_WAIT_FOR_BACKEND_SEC +
1435 int(more_extra_backend_service_max_requests / args.qps)),
1436 more_extra_backend_service_max_requests, 1)
1437 logger.info('EMPTY_CALL reached stable state (%d)',
1438 more_extra_backend_service_max_requests)
1440 # Increment circuit breakers max_requests threshold.
1441 extra_backend_service_max_requests = 800
1442 patch_backend_service(gcp,
1443 extra_backend_service, [instance_group],
1446 extra_backend_service_max_requests
1448 wait_until_rpcs_in_flight(
1449 'UNARY_CALL', (_WAIT_FOR_BACKEND_SEC +
1450 int(extra_backend_service_max_requests / args.qps)),
1451 extra_backend_service_max_requests, 1)
1452 logger.info('UNARY_CALL reached stable state after increase (%d)',
1453 extra_backend_service_max_requests)
1454 logger.info('success')
1455 # Avoid new RPCs being outstanding (some test clients create threads
1456 # for sending RPCs) after restoring backend services.
1458 [messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL])
1460 patch_url_map_backend_service(gcp, original_backend_service)
1461 patch_backend_service(gcp, original_backend_service, [instance_group])
1462 for backend_service in additional_backend_services:
1463 delete_backend_service(gcp, backend_service)
1464 set_validate_for_proxyless(gcp, True)
1467 def test_timeout(gcp, original_backend_service, instance_group):
1468 logger.info('Running test_timeout')
1470 logger.info('waiting for original backends to become healthy')
1471 wait_for_healthy_backends(gcp, original_backend_service, instance_group)
1473 # UnaryCall -> maxStreamDuration:3s
1477 'fullPathMatch': '/grpc.testing.TestService/UnaryCall'
1479 'service': original_backend_service.url,
1481 'maxStreamDuration': {
1486 patch_url_map_backend_service(gcp,
1487 original_backend_service,
1488 route_rules=route_rules)
1489 # A list of tuples (testcase_name, {client_config}, {expected_results})
1492 'app_timeout_exceeded',
1493 # UnaryCall only with sleep-2; timeout=1s; calls timeout.
1496 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1499 (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1500 'rpc-behavior', 'sleep-2'),
1505 'UNARY_CALL': 4, # DEADLINE_EXCEEDED
1509 'timeout_not_exceeded',
1510 # UnaryCall only with no sleep; calls succeed.
1513 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1521 'timeout_exceeded (UNARY_CALL), timeout_different_route (EMPTY_CALL)',
1522 # UnaryCall and EmptyCall both sleep-4.
1523 # UnaryCall timeouts, EmptyCall succeeds.
1526 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1527 messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL,
1530 (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1531 'rpc-behavior', 'sleep-4'),
1532 (messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL,
1533 'rpc-behavior', 'sleep-4'),
1537 'UNARY_CALL': 4, # DEADLINE_EXCEEDED
1544 for (testcase_name, client_config, expected_results) in test_cases:
1545 logger.info('starting case %s', testcase_name)
1546 configure_client(**client_config)
1547 # wait a second to help ensure the client stops sending RPCs with
1548 # the old config. We will make multiple attempts if it is failing,
1549 # but this improves confidence that the test is valid if the
1550 # previous client_config would lead to the same results.
1552 # Each attempt takes 10 seconds; 20 attempts is equivalent to 200
1555 before_stats = get_client_accumulated_stats()
1556 if not before_stats.stats_per_method:
1558 'stats.stats_per_method is None, the interop client stats service does not support this test case'
1560 for i in range(attempt_count):
1561 logger.info('%s: attempt %d', testcase_name, i)
1563 test_runtime_secs = 10
1564 time.sleep(test_runtime_secs)
1565 after_stats = get_client_accumulated_stats()
1568 for rpc, status in expected_results.items():
1569 qty = (after_stats.stats_per_method[rpc].result[status] -
1570 before_stats.stats_per_method[rpc].result[status])
1571 want = test_runtime_secs * args.qps
1572 # Allow 10% deviation from expectation to reduce flakiness
1573 if qty < (want * .9) or qty > (want * 1.1):
1574 logger.info('%s: failed due to %s[%s]: got %d want ~%d',
1575 testcase_name, rpc, status, qty, want)
1578 logger.info('success')
1580 logger.info('%s attempt %d failed', testcase_name, i)
1581 before_stats = after_stats
1584 '%s: timeout waiting for expected results: %s; got %s' %
1585 (testcase_name, expected_results,
1586 after_stats.stats_per_method))
1588 patch_url_map_backend_service(gcp, original_backend_service)
1591 def test_fault_injection(gcp, original_backend_service, instance_group):
1592 logger.info('Running test_fault_injection')
1594 logger.info('waiting for original backends to become healthy')
1595 wait_for_healthy_backends(gcp, original_backend_service, instance_group)
1597 testcase_header = 'fi_testcase'
1599 def _route(pri, name, fi_policy):
1606 'headerName': testcase_header,
1610 'service': original_backend_service.url,
1612 'faultInjectionPolicy': fi_policy
1634 zero_route = _abort(0)
1635 zero_route.update(_delay(0))
1637 _route(0, 'zero_percent_fault_injection', zero_route),
1638 _route(1, 'always_delay', _delay(100)),
1639 _route(2, 'always_abort', _abort(100)),
1640 _route(3, 'delay_half', _delay(50)),
1641 _route(4, 'abort_half', _abort(50)),
1647 'service': original_backend_service.url,
1650 set_validate_for_proxyless(gcp, False)
1651 patch_url_map_backend_service(gcp,
1652 original_backend_service,
1653 route_rules=route_rules)
1654 # A list of tuples (testcase_name, {client_config}, {code: percent}). Each
1655 # test case will set the testcase_header with the testcase_name for routing
1656 # to the appropriate config for the case, defined above.
1659 'zero_percent_fault_injection',
1666 'non_matching_fault_injection', # Not in route_rules, above.
1679 }, # DEADLINE_EXCEEDED
1686 }, # UNAUTHENTICATED
1696 }, # DEADLINE_EXCEEDED / OK: 50% / 50%
1704 }, # UNAUTHENTICATED / OK: 50% / 50%
1710 for (testcase_name, client_config, expected_results) in test_cases:
1711 logger.info('starting case %s', testcase_name)
1713 client_config['metadata'] = [
1714 (messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1715 testcase_header, testcase_name)
1717 client_config['rpc_types'] = [
1718 messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL,
1720 configure_client(**client_config)
1721 # wait a second to help ensure the client stops sending RPCs with
1722 # the old config. We will make multiple attempts if it is failing,
1723 # but this improves confidence that the test is valid if the
1724 # previous client_config would lead to the same results.
1726 # Each attempt takes 10 seconds; 20 attempts is equivalent to 200
1732 before_stats = get_client_accumulated_stats()
1733 if not before_stats.stats_per_method:
1735 'stats.stats_per_method is None, the interop client stats service does not support this test case'
1737 for i in range(attempt_count):
1738 logger.info('%s: attempt %d', testcase_name, i)
1740 test_runtime_secs = 10
1741 time.sleep(test_runtime_secs)
1742 after_stats = get_client_accumulated_stats()
1745 for status, pct in expected_results.items():
1747 qty = (after_stats.stats_per_method[rpc].result[status] -
1748 before_stats.stats_per_method[rpc].result[status])
1749 want = pct * args.qps * test_runtime_secs
1750 # Allow 10% deviation from expectation to reduce flakiness
1751 VARIANCE_ALLOWED = 0.1
1752 if abs(qty - want) > want * VARIANCE_ALLOWED:
1753 logger.info('%s: failed due to %s[%s]: got %d want ~%d',
1754 testcase_name, rpc, status, qty, want)
1757 logger.info('success')
1759 logger.info('%s attempt %d failed', testcase_name, i)
1760 before_stats = after_stats
1763 '%s: timeout waiting for expected results: %s; got %s' %
1764 (testcase_name, expected_results,
1765 after_stats.stats_per_method))
1767 patch_url_map_backend_service(gcp, original_backend_service)
1768 set_validate_for_proxyless(gcp, True)
1771 def set_validate_for_proxyless(gcp, validate_for_proxyless):
1772 if not gcp.alpha_compute:
1774 'Not setting validateForProxy because alpha is not enabled')
1776 # This function deletes global_forwarding_rule and target_proxy, then
1777 # recreate target_proxy with validateForProxyless=False. This is necessary
1778 # because patching target_grpc_proxy isn't supported.
1779 delete_global_forwarding_rule(gcp)
1780 delete_target_proxy(gcp)
1781 create_target_proxy(gcp, gcp.target_proxy.name, validate_for_proxyless)
1782 create_global_forwarding_rule(gcp, gcp.global_forwarding_rule.name,
1786 def get_serving_status(instance, service_port):
1787 with grpc.insecure_channel('%s:%d' % (instance, service_port)) as channel:
1788 health_stub = health_pb2_grpc.HealthStub(channel)
1789 return health_stub.Check(health_pb2.HealthCheckRequest())
1792 def set_serving_status(instances, service_port, serving):
1793 logger.info('setting %s serving status to %s', instances, serving)
1794 for instance in instances:
1795 with grpc.insecure_channel('%s:%d' %
1796 (instance, service_port)) as channel:
1797 logger.info('setting %s serving status to %s', instance, serving)
1798 stub = test_pb2_grpc.XdsUpdateHealthServiceStub(channel)
1802 stub.SetServing(empty_pb2.Empty())
1804 stub.SetNotServing(empty_pb2.Empty())
1805 serving_status = get_serving_status(instance, service_port)
1806 logger.info('got instance service status %s', serving_status)
1807 want_status = health_pb2.HealthCheckResponse.SERVING if serving else health_pb2.HealthCheckResponse.NOT_SERVING
1808 if serving_status.status == want_status:
1810 if i == retry_count - 1:
1812 'failed to set instance service status after %d retries'
1816 def is_primary_instance_group(gcp, instance_group):
1817 # Clients may connect to a TD instance in a different region than the
1818 # client, in which case primary/secondary assignments may not be based on
1819 # the client's actual locality.
1820 instance_names = get_instance_names(gcp, instance_group)
1821 stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC)
1822 return all(peer in instance_names for peer in stats.rpcs_by_peer.keys())
1825 def get_startup_script(path_to_server_binary, service_port):
1826 if path_to_server_binary:
1827 return "nohup %s --port=%d 1>/dev/null &" % (path_to_server_binary,
1830 return """#!/bin/bash
1832 sudo apt install -y git default-jdk
1835 git clone https://github.com/grpc/grpc-java.git
1837 pushd interop-testing
1838 ../gradlew installDist -x test -PskipCodegen=true -PskipAndroid=true
1840 nohup build/install/grpc-interop-testing/bin/xds-test-server \
1841 --port=%d 1>/dev/null &""" % service_port
1844 def create_instance_template(gcp, name, network, source_image, machine_type,
1850 'items': ['allow-health-checks']
1852 'machineType': machine_type,
1853 'serviceAccounts': [{
1855 'scopes': ['https://www.googleapis.com/auth/cloud-platform',]
1857 'networkInterfaces': [{
1859 'type': 'ONE_TO_ONE_NAT'
1865 'initializeParams': {
1866 'sourceImage': source_image
1871 'key': 'startup-script',
1872 'value': startup_script
1878 logger.debug('Sending GCP request with body=%s', config)
1879 result = gcp.compute.instanceTemplates().insert(
1880 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
1881 wait_for_global_operation(gcp, result['name'])
1882 gcp.instance_template = GcpResource(config['name'], result['targetLink'])
1885 def add_instance_group(gcp, zone, name, size):
1888 'instanceTemplate': gcp.instance_template.url,
1892 'port': gcp.service_port
1896 logger.debug('Sending GCP request with body=%s', config)
1897 result = gcp.compute.instanceGroupManagers().insert(
1898 project=gcp.project, zone=zone,
1899 body=config).execute(num_retries=_GCP_API_RETRIES)
1900 wait_for_zone_operation(gcp, zone, result['name'])
1901 result = gcp.compute.instanceGroupManagers().get(
1902 project=gcp.project, zone=zone,
1903 instanceGroupManager=config['name']).execute(
1904 num_retries=_GCP_API_RETRIES)
1905 instance_group = InstanceGroup(config['name'], result['instanceGroup'],
1907 gcp.instance_groups.append(instance_group)
1908 wait_for_instance_group_to_reach_expected_size(gcp, instance_group, size,
1909 _WAIT_FOR_OPERATION_SEC)
1910 return instance_group
1913 def create_health_check(gcp, name):
1914 if gcp.alpha_compute:
1918 'grpcHealthCheck': {
1919 'portSpecification': 'USE_SERVING_PORT'
1922 compute_to_use = gcp.alpha_compute
1931 compute_to_use = gcp.compute
1932 logger.debug('Sending GCP request with body=%s', config)
1933 result = compute_to_use.healthChecks().insert(
1934 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
1935 wait_for_global_operation(gcp, result['name'])
1936 gcp.health_check = GcpResource(config['name'], result['targetLink'])
1939 def create_health_check_firewall_rule(gcp, name):
1942 'direction': 'INGRESS',
1946 'sourceRanges': ['35.191.0.0/16', '130.211.0.0/22'],
1947 'targetTags': ['allow-health-checks'],
1949 logger.debug('Sending GCP request with body=%s', config)
1950 result = gcp.compute.firewalls().insert(
1951 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
1952 wait_for_global_operation(gcp, result['name'])
1953 gcp.health_check_firewall_rule = GcpResource(config['name'],
1954 result['targetLink'])
1957 def add_backend_service(gcp, name):
1958 if gcp.alpha_compute:
1960 compute_to_use = gcp.alpha_compute
1963 compute_to_use = gcp.compute
1966 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED',
1967 'healthChecks': [gcp.health_check.url],
1969 'protocol': protocol
1971 logger.debug('Sending GCP request with body=%s', config)
1972 result = compute_to_use.backendServices().insert(
1973 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
1974 wait_for_global_operation(gcp, result['name'])
1975 backend_service = GcpResource(config['name'], result['targetLink'])
1976 gcp.backend_services.append(backend_service)
1977 return backend_service
1980 def create_url_map(gcp, name, backend_service, host_name):
1983 'defaultService': backend_service.url,
1985 'name': _PATH_MATCHER_NAME,
1986 'defaultService': backend_service.url,
1989 'hosts': [host_name],
1990 'pathMatcher': _PATH_MATCHER_NAME
1993 logger.debug('Sending GCP request with body=%s', config)
1994 result = gcp.compute.urlMaps().insert(
1995 project=gcp.project, body=config).execute(num_retries=_GCP_API_RETRIES)
1996 wait_for_global_operation(gcp, result['name'])
1997 gcp.url_map = GcpResource(config['name'], result['targetLink'])
2000 def patch_url_map_host_rule_with_port(gcp, name, backend_service, host_name):
2003 'hosts': ['%s:%d' % (host_name, gcp.service_port)],
2004 'pathMatcher': _PATH_MATCHER_NAME
2007 logger.debug('Sending GCP request with body=%s', config)
2008 result = gcp.compute.urlMaps().patch(
2009 project=gcp.project, urlMap=name,
2010 body=config).execute(num_retries=_GCP_API_RETRIES)
2011 wait_for_global_operation(gcp, result['name'])
2014 def create_target_proxy(gcp, name, validate_for_proxyless=True):
2015 if gcp.alpha_compute:
2018 'url_map': gcp.url_map.url,
2019 'validate_for_proxyless': validate_for_proxyless
2021 logger.debug('Sending GCP request with body=%s', config)
2022 result = gcp.alpha_compute.targetGrpcProxies().insert(
2023 project=gcp.project,
2024 body=config).execute(num_retries=_GCP_API_RETRIES)
2028 'url_map': gcp.url_map.url,
2030 logger.debug('Sending GCP request with body=%s', config)
2031 result = gcp.compute.targetHttpProxies().insert(
2032 project=gcp.project,
2033 body=config).execute(num_retries=_GCP_API_RETRIES)
2034 wait_for_global_operation(gcp, result['name'])
2035 gcp.target_proxy = GcpResource(config['name'], result['targetLink'])
2038 def create_global_forwarding_rule(gcp, name, potential_ports):
2039 if gcp.alpha_compute:
2040 compute_to_use = gcp.alpha_compute
2042 compute_to_use = gcp.compute
2043 for port in potential_ports:
2047 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED',
2048 'portRange': str(port),
2049 'IPAddress': '0.0.0.0',
2050 'network': args.network,
2051 'target': gcp.target_proxy.url,
2053 logger.debug('Sending GCP request with body=%s', config)
2054 result = compute_to_use.globalForwardingRules().insert(
2055 project=gcp.project,
2056 body=config).execute(num_retries=_GCP_API_RETRIES)
2057 wait_for_global_operation(gcp, result['name'])
2058 gcp.global_forwarding_rule = GcpResource(config['name'],
2059 result['targetLink'])
2060 gcp.service_port = port
2062 except googleapiclient.errors.HttpError as http_error:
2064 'Got error %s when attempting to create forwarding rule to '
2065 '0.0.0.0:%d. Retrying with another port.' % (http_error, port))
2068 def get_health_check(gcp, health_check_name):
2069 result = gcp.compute.healthChecks().get(
2070 project=gcp.project, healthCheck=health_check_name).execute()
2071 gcp.health_check = GcpResource(health_check_name, result['selfLink'])
2074 def get_health_check_firewall_rule(gcp, firewall_name):
2075 result = gcp.compute.firewalls().get(project=gcp.project,
2076 firewall=firewall_name).execute()
2077 gcp.health_check_firewall_rule = GcpResource(firewall_name,
2081 def get_backend_service(gcp, backend_service_name):
2082 result = gcp.compute.backendServices().get(
2083 project=gcp.project, backendService=backend_service_name).execute()
2084 backend_service = GcpResource(backend_service_name, result['selfLink'])
2085 gcp.backend_services.append(backend_service)
2086 return backend_service
2089 def get_url_map(gcp, url_map_name):
2090 result = gcp.compute.urlMaps().get(project=gcp.project,
2091 urlMap=url_map_name).execute()
2092 gcp.url_map = GcpResource(url_map_name, result['selfLink'])
2095 def get_target_proxy(gcp, target_proxy_name):
2096 if gcp.alpha_compute:
2097 result = gcp.alpha_compute.targetGrpcProxies().get(
2098 project=gcp.project, targetGrpcProxy=target_proxy_name).execute()
2100 result = gcp.compute.targetHttpProxies().get(
2101 project=gcp.project, targetHttpProxy=target_proxy_name).execute()
2102 gcp.target_proxy = GcpResource(target_proxy_name, result['selfLink'])
2105 def get_global_forwarding_rule(gcp, forwarding_rule_name):
2106 result = gcp.compute.globalForwardingRules().get(
2107 project=gcp.project, forwardingRule=forwarding_rule_name).execute()
2108 gcp.global_forwarding_rule = GcpResource(forwarding_rule_name,
2112 def get_instance_template(gcp, template_name):
2113 result = gcp.compute.instanceTemplates().get(
2114 project=gcp.project, instanceTemplate=template_name).execute()
2115 gcp.instance_template = GcpResource(template_name, result['selfLink'])
2118 def get_instance_group(gcp, zone, instance_group_name):
2119 result = gcp.compute.instanceGroups().get(
2120 project=gcp.project, zone=zone,
2121 instanceGroup=instance_group_name).execute()
2122 gcp.service_port = result['namedPorts'][0]['port']
2123 instance_group = InstanceGroup(instance_group_name, result['selfLink'],
2125 gcp.instance_groups.append(instance_group)
2126 return instance_group
2129 def delete_global_forwarding_rule(gcp):
2131 result = gcp.compute.globalForwardingRules().delete(
2132 project=gcp.project,
2133 forwardingRule=gcp.global_forwarding_rule.name).execute(
2134 num_retries=_GCP_API_RETRIES)
2135 wait_for_global_operation(gcp, result['name'])
2136 except googleapiclient.errors.HttpError as http_error:
2137 logger.info('Delete failed: %s', http_error)
2140 def delete_target_proxy(gcp):
2142 if gcp.alpha_compute:
2143 result = gcp.alpha_compute.targetGrpcProxies().delete(
2144 project=gcp.project,
2145 targetGrpcProxy=gcp.target_proxy.name).execute(
2146 num_retries=_GCP_API_RETRIES)
2148 result = gcp.compute.targetHttpProxies().delete(
2149 project=gcp.project,
2150 targetHttpProxy=gcp.target_proxy.name).execute(
2151 num_retries=_GCP_API_RETRIES)
2152 wait_for_global_operation(gcp, result['name'])
2153 except googleapiclient.errors.HttpError as http_error:
2154 logger.info('Delete failed: %s', http_error)
2157 def delete_url_map(gcp):
2159 result = gcp.compute.urlMaps().delete(
2160 project=gcp.project,
2161 urlMap=gcp.url_map.name).execute(num_retries=_GCP_API_RETRIES)
2162 wait_for_global_operation(gcp, result['name'])
2163 except googleapiclient.errors.HttpError as http_error:
2164 logger.info('Delete failed: %s', http_error)
2167 def delete_backend_service(gcp, backend_service):
2169 result = gcp.compute.backendServices().delete(
2170 project=gcp.project, backendService=backend_service.name).execute(
2171 num_retries=_GCP_API_RETRIES)
2172 wait_for_global_operation(gcp, result['name'])
2173 except googleapiclient.errors.HttpError as http_error:
2174 logger.info('Delete failed: %s', http_error)
2177 def delete_backend_services(gcp):
2178 for backend_service in gcp.backend_services:
2179 delete_backend_service(gcp, backend_service)
2182 def delete_firewall(gcp):
2184 result = gcp.compute.firewalls().delete(
2185 project=gcp.project,
2186 firewall=gcp.health_check_firewall_rule.name).execute(
2187 num_retries=_GCP_API_RETRIES)
2188 wait_for_global_operation(gcp, result['name'])
2189 except googleapiclient.errors.HttpError as http_error:
2190 logger.info('Delete failed: %s', http_error)
2193 def delete_health_check(gcp):
2195 result = gcp.compute.healthChecks().delete(
2196 project=gcp.project, healthCheck=gcp.health_check.name).execute(
2197 num_retries=_GCP_API_RETRIES)
2198 wait_for_global_operation(gcp, result['name'])
2199 except googleapiclient.errors.HttpError as http_error:
2200 logger.info('Delete failed: %s', http_error)
2203 def delete_instance_groups(gcp):
2204 for instance_group in gcp.instance_groups:
2206 result = gcp.compute.instanceGroupManagers().delete(
2207 project=gcp.project,
2208 zone=instance_group.zone,
2209 instanceGroupManager=instance_group.name).execute(
2210 num_retries=_GCP_API_RETRIES)
2211 wait_for_zone_operation(gcp,
2212 instance_group.zone,
2214 timeout_sec=_WAIT_FOR_BACKEND_SEC)
2215 except googleapiclient.errors.HttpError as http_error:
2216 logger.info('Delete failed: %s', http_error)
2219 def delete_instance_template(gcp):
2221 result = gcp.compute.instanceTemplates().delete(
2222 project=gcp.project,
2223 instanceTemplate=gcp.instance_template.name).execute(
2224 num_retries=_GCP_API_RETRIES)
2225 wait_for_global_operation(gcp, result['name'])
2226 except googleapiclient.errors.HttpError as http_error:
2227 logger.info('Delete failed: %s', http_error)
2230 def patch_backend_service(gcp,
2233 balancing_mode='UTILIZATION',
2235 circuit_breakers=None):
2236 if gcp.alpha_compute:
2237 compute_to_use = gcp.alpha_compute
2239 compute_to_use = gcp.compute
2242 'group': instance_group.url,
2243 'balancingMode': balancing_mode,
2244 'maxRate': max_rate if balancing_mode == 'RATE' else None
2245 } for instance_group in instance_groups],
2246 'circuitBreakers': circuit_breakers,
2248 logger.debug('Sending GCP request with body=%s', config)
2249 result = compute_to_use.backendServices().patch(
2250 project=gcp.project, backendService=backend_service.name,
2251 body=config).execute(num_retries=_GCP_API_RETRIES)
2252 wait_for_global_operation(gcp,
2254 timeout_sec=_WAIT_FOR_BACKEND_SEC)
2257 def resize_instance_group(gcp,
2260 timeout_sec=_WAIT_FOR_OPERATION_SEC):
2261 result = gcp.compute.instanceGroupManagers().resize(
2262 project=gcp.project,
2263 zone=instance_group.zone,
2264 instanceGroupManager=instance_group.name,
2265 size=new_size).execute(num_retries=_GCP_API_RETRIES)
2266 wait_for_zone_operation(gcp,
2267 instance_group.zone,
2270 wait_for_instance_group_to_reach_expected_size(gcp, instance_group,
2271 new_size, timeout_sec)
2274 def patch_url_map_backend_service(gcp,
2275 backend_service=None,
2276 services_with_weights=None,
2278 '''change url_map's backend service
2280 Only one of backend_service and service_with_weights can be not None.
2282 if gcp.alpha_compute:
2283 compute_to_use = gcp.alpha_compute
2285 compute_to_use = gcp.compute
2287 if backend_service and services_with_weights:
2289 'both backend_service and service_with_weights are not None.')
2291 default_service = backend_service.url if backend_service else None
2292 default_route_action = {
2293 'weightedBackendServices': [{
2294 'backendService': service.url,
2296 } for service, w in services_with_weights.items()]
2297 } if services_with_weights else None
2301 'name': _PATH_MATCHER_NAME,
2302 'defaultService': default_service,
2303 'defaultRouteAction': default_route_action,
2304 'routeRules': route_rules,
2307 logger.debug('Sending GCP request with body=%s', config)
2308 result = compute_to_use.urlMaps().patch(
2309 project=gcp.project, urlMap=gcp.url_map.name,
2310 body=config).execute(num_retries=_GCP_API_RETRIES)
2311 wait_for_global_operation(gcp, result['name'])
2314 def wait_for_instance_group_to_reach_expected_size(gcp, instance_group,
2315 expected_size, timeout_sec):
2316 start_time = time.time()
2318 current_size = len(get_instance_names(gcp, instance_group))
2319 if current_size == expected_size:
2321 if time.time() - start_time > timeout_sec:
2323 'Instance group had expected size %d but actual size %d' %
2324 (expected_size, current_size))
2328 def wait_for_global_operation(gcp,
2330 timeout_sec=_WAIT_FOR_OPERATION_SEC):
2331 start_time = time.time()
2332 while time.time() - start_time <= timeout_sec:
2333 result = gcp.compute.globalOperations().get(
2334 project=gcp.project,
2335 operation=operation).execute(num_retries=_GCP_API_RETRIES)
2336 if result['status'] == 'DONE':
2337 if 'error' in result:
2338 raise Exception(result['error'])
2341 raise Exception('Operation %s did not complete within %d' %
2342 (operation, timeout_sec))
2345 def wait_for_zone_operation(gcp,
2348 timeout_sec=_WAIT_FOR_OPERATION_SEC):
2349 start_time = time.time()
2350 while time.time() - start_time <= timeout_sec:
2351 result = gcp.compute.zoneOperations().get(
2352 project=gcp.project, zone=zone,
2353 operation=operation).execute(num_retries=_GCP_API_RETRIES)
2354 if result['status'] == 'DONE':
2355 if 'error' in result:
2356 raise Exception(result['error'])
2359 raise Exception('Operation %s did not complete within %d' %
2360 (operation, timeout_sec))
2363 def wait_for_healthy_backends(gcp,
2366 timeout_sec=_WAIT_FOR_BACKEND_SEC):
2367 start_time = time.time()
2368 config = {'group': instance_group.url}
2369 instance_names = get_instance_names(gcp, instance_group)
2370 expected_size = len(instance_names)
2371 while time.time() - start_time <= timeout_sec:
2372 for instance_name in instance_names:
2374 status = get_serving_status(instance_name, gcp.service_port)
2375 logger.info('serving status response from %s: %s',
2376 instance_name, status)
2377 except grpc.RpcError as rpc_error:
2378 logger.info('checking serving status of %s failed: %s',
2379 instance_name, rpc_error)
2380 result = gcp.compute.backendServices().getHealth(
2381 project=gcp.project,
2382 backendService=backend_service.name,
2383 body=config).execute(num_retries=_GCP_API_RETRIES)
2384 if 'healthStatus' in result:
2385 logger.info('received GCP healthStatus: %s', result['healthStatus'])
2387 for instance in result['healthStatus']:
2388 if instance['healthState'] != 'HEALTHY':
2391 if healthy and expected_size == len(result['healthStatus']):
2394 logger.info('no healthStatus received from GCP')
2396 raise Exception('Not all backends became healthy within %d seconds: %s' %
2397 (timeout_sec, result))
2400 def get_instance_names(gcp, instance_group):
2402 result = gcp.compute.instanceGroups().listInstances(
2403 project=gcp.project,
2404 zone=instance_group.zone,
2405 instanceGroup=instance_group.name,
2407 'instanceState': 'ALL'
2408 }).execute(num_retries=_GCP_API_RETRIES)
2409 if 'items' not in result:
2411 for item in result['items']:
2412 # listInstances() returns the full URL of the instance, which ends with
2413 # the instance name. compute.instances().get() requires using the
2414 # instance name (not the full URL) to look up instance details, so we
2415 # just extract the name manually.
2416 instance_name = item['instance'].split('/')[-1]
2417 instance_names.append(instance_name)
2418 logger.info('retrieved instance names: %s', instance_names)
2419 return instance_names
2423 if gcp.global_forwarding_rule:
2424 delete_global_forwarding_rule(gcp)
2425 if gcp.target_proxy:
2426 delete_target_proxy(gcp)
2429 delete_backend_services(gcp)
2430 if gcp.health_check_firewall_rule:
2431 delete_firewall(gcp)
2432 if gcp.health_check:
2433 delete_health_check(gcp)
2434 delete_instance_groups(gcp)
2435 if gcp.instance_template:
2436 delete_instance_template(gcp)
2439 class InstanceGroup(object):
2441 def __init__(self, name, url, zone):
2447 class GcpResource(object):
2449 def __init__(self, name, url):
2454 class GcpState(object):
2456 def __init__(self, compute, alpha_compute, project, project_num):
2457 self.compute = compute
2458 self.alpha_compute = alpha_compute
2459 self.project = project
2460 self.project_num = project_num
2461 self.health_check = None
2462 self.health_check_firewall_rule = None
2463 self.backend_services = []
2465 self.target_proxy = None
2466 self.global_forwarding_rule = None
2467 self.service_port = None
2468 self.instance_template = None
2469 self.instance_groups = []
2472 alpha_compute = None
2473 if args.compute_discovery_document:
2474 with open(args.compute_discovery_document, 'r') as discovery_doc:
2475 compute = googleapiclient.discovery.build_from_document(
2476 discovery_doc.read())
2477 if not args.only_stable_gcp_apis and args.alpha_compute_discovery_document:
2478 with open(args.alpha_compute_discovery_document, 'r') as discovery_doc:
2479 alpha_compute = googleapiclient.discovery.build_from_document(
2480 discovery_doc.read())
2482 compute = googleapiclient.discovery.build('compute', 'v1')
2483 if not args.only_stable_gcp_apis:
2484 alpha_compute = googleapiclient.discovery.build('compute', 'alpha')
2487 gcp = GcpState(compute, alpha_compute, args.project_id, args.project_num)
2488 gcp_suffix = args.gcp_suffix
2489 health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix
2490 if not args.use_existing_gcp_resources:
2491 if args.keep_gcp_resources:
2492 # Auto-generating a unique suffix in case of conflict should not be
2493 # combined with --keep_gcp_resources, as the suffix actually used
2494 # for GCP resources will not match the provided --gcp_suffix value.
2498 for i in range(num_attempts):
2500 logger.info('Using GCP suffix %s', gcp_suffix)
2501 create_health_check(gcp, health_check_name)
2503 except googleapiclient.errors.HttpError as http_error:
2504 gcp_suffix = '%s-%04d' % (gcp_suffix, random.randint(0, 9999))
2505 health_check_name = _BASE_HEALTH_CHECK_NAME + gcp_suffix
2506 logger.exception('HttpError when creating health check')
2507 if gcp.health_check is None:
2508 raise Exception('Failed to create health check name after %d '
2509 'attempts' % num_attempts)
2510 firewall_name = _BASE_FIREWALL_RULE_NAME + gcp_suffix
2511 backend_service_name = _BASE_BACKEND_SERVICE_NAME + gcp_suffix
2512 alternate_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-alternate' + gcp_suffix
2513 url_map_name = _BASE_URL_MAP_NAME + gcp_suffix
2514 service_host_name = _BASE_SERVICE_HOST + gcp_suffix
2515 target_proxy_name = _BASE_TARGET_PROXY_NAME + gcp_suffix
2516 forwarding_rule_name = _BASE_FORWARDING_RULE_NAME + gcp_suffix
2517 template_name = _BASE_TEMPLATE_NAME + gcp_suffix
2518 instance_group_name = _BASE_INSTANCE_GROUP_NAME + gcp_suffix
2519 same_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-same-zone' + gcp_suffix
2520 secondary_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-secondary-zone' + gcp_suffix
2521 if args.use_existing_gcp_resources:
2522 logger.info('Reusing existing GCP resources')
2523 get_health_check(gcp, health_check_name)
2525 get_health_check_firewall_rule(gcp, firewall_name)
2526 except googleapiclient.errors.HttpError as http_error:
2527 # Firewall rule may be auto-deleted periodically depending on GCP
2529 logger.exception('Failed to find firewall rule, recreating')
2530 create_health_check_firewall_rule(gcp, firewall_name)
2531 backend_service = get_backend_service(gcp, backend_service_name)
2532 alternate_backend_service = get_backend_service(
2533 gcp, alternate_backend_service_name)
2534 get_url_map(gcp, url_map_name)
2535 get_target_proxy(gcp, target_proxy_name)
2536 get_global_forwarding_rule(gcp, forwarding_rule_name)
2537 get_instance_template(gcp, template_name)
2538 instance_group = get_instance_group(gcp, args.zone, instance_group_name)
2539 same_zone_instance_group = get_instance_group(
2540 gcp, args.zone, same_zone_instance_group_name)
2541 secondary_zone_instance_group = get_instance_group(
2542 gcp, args.secondary_zone, secondary_zone_instance_group_name)
2544 create_health_check_firewall_rule(gcp, firewall_name)
2545 backend_service = add_backend_service(gcp, backend_service_name)
2546 alternate_backend_service = add_backend_service(
2547 gcp, alternate_backend_service_name)
2548 create_url_map(gcp, url_map_name, backend_service, service_host_name)
2549 create_target_proxy(gcp, target_proxy_name)
2550 potential_service_ports = list(args.service_port_range)
2551 random.shuffle(potential_service_ports)
2552 create_global_forwarding_rule(gcp, forwarding_rule_name,
2553 potential_service_ports)
2554 if not gcp.service_port:
2556 'Failed to find a valid ip:port for the forwarding rule')
2557 if gcp.service_port != _DEFAULT_SERVICE_PORT:
2558 patch_url_map_host_rule_with_port(gcp, url_map_name,
2561 startup_script = get_startup_script(args.path_to_server_binary,
2563 create_instance_template(gcp, template_name, args.network,
2564 args.source_image, args.machine_type,
2566 instance_group = add_instance_group(gcp, args.zone, instance_group_name,
2567 _INSTANCE_GROUP_SIZE)
2568 patch_backend_service(gcp, backend_service, [instance_group])
2569 same_zone_instance_group = add_instance_group(
2570 gcp, args.zone, same_zone_instance_group_name, _INSTANCE_GROUP_SIZE)
2571 secondary_zone_instance_group = add_instance_group(
2572 gcp, args.secondary_zone, secondary_zone_instance_group_name,
2573 _INSTANCE_GROUP_SIZE)
2575 wait_for_healthy_backends(gcp, backend_service, instance_group)
2578 client_env = dict(os.environ)
2579 bootstrap_server_features = []
2580 if gcp.service_port == _DEFAULT_SERVICE_PORT:
2581 server_uri = service_host_name
2583 server_uri = service_host_name + ':' + str(gcp.service_port)
2584 if args.xds_v3_support:
2585 client_env['GRPC_XDS_EXPERIMENTAL_V3_SUPPORT'] = 'true'
2586 bootstrap_server_features.append('xds_v3')
2587 if args.bootstrap_file:
2588 bootstrap_path = os.path.abspath(args.bootstrap_file)
2590 with tempfile.NamedTemporaryFile(delete=False) as bootstrap_file:
2591 bootstrap_file.write(
2592 _BOOTSTRAP_TEMPLATE.format(
2593 node_id='projects/%s/networks/%s/nodes/%s' %
2594 (gcp.project_num, args.network.split('/')[-1],
2596 server_features=json.dumps(
2597 bootstrap_server_features)).encode('utf-8'))
2598 bootstrap_path = bootstrap_file.name
2599 client_env['GRPC_XDS_BOOTSTRAP'] = bootstrap_path
2600 client_env['GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING'] = 'true'
2601 client_env['GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT'] = 'true'
2602 client_env['GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION'] = 'true'
2605 for test_case in args.test_case:
2606 if test_case in _V3_TEST_CASES and not args.xds_v3_support:
2607 logger.info('skipping test %s due to missing v3 support',
2610 if test_case in _ALPHA_TEST_CASES and not gcp.alpha_compute:
2611 logger.info('skipping test %s due to missing alpha support',
2614 result = jobset.JobResult()
2615 log_dir = os.path.join(_TEST_LOG_BASE_DIR, test_case)
2616 if not os.path.exists(log_dir):
2617 os.makedirs(log_dir)
2618 test_log_filename = os.path.join(log_dir, _SPONGE_LOG_NAME)
2619 test_log_file = open(test_log_filename, 'w+')
2620 client_process = None
2622 if test_case in _TESTS_TO_RUN_MULTIPLE_RPCS:
2623 rpcs_to_send = '--rpc="UnaryCall,EmptyCall"'
2625 rpcs_to_send = '--rpc="UnaryCall"'
2627 if test_case in _TESTS_TO_SEND_METADATA:
2628 metadata_to_send = '--metadata="EmptyCall:{keyE}:{valueE},UnaryCall:{keyU}:{valueU},UnaryCall:{keyNU}:{valueNU}"'.format(
2629 keyE=_TEST_METADATA_KEY,
2630 valueE=_TEST_METADATA_VALUE_EMPTY,
2631 keyU=_TEST_METADATA_KEY,
2632 valueU=_TEST_METADATA_VALUE_UNARY,
2633 keyNU=_TEST_METADATA_NUMERIC_KEY,
2634 valueNU=_TEST_METADATA_NUMERIC_VALUE)
2636 # Setting the arg explicitly to empty with '--metadata=""'
2637 # makes C# client fail
2638 # (see https://github.com/commandlineparser/commandline/issues/412),
2639 # so instead we just rely on clients using the default when
2640 # metadata arg is not specified.
2641 metadata_to_send = ''
2643 # TODO(ericgribkoff) Temporarily disable fail_on_failed_rpc checks
2644 # in the client. This means we will ignore intermittent RPC
2645 # failures (but this framework still checks that the final result
2648 # Reason for disabling this is, the resources are shared by
2649 # multiple tests, and a change in previous test could be delayed
2650 # until the second test starts. The second test may see
2651 # intermittent failures because of that.
2653 # A fix is to not share resources between tests (though that does
2654 # mean the tests will be significantly slower due to creating new
2656 fail_on_failed_rpc = ''
2659 if not CLIENT_HOSTS:
2660 client_cmd_formatted = args.client_cmd.format(
2661 server_uri=server_uri,
2662 stats_port=args.stats_port,
2664 fail_on_failed_rpc=fail_on_failed_rpc,
2665 rpcs_to_send=rpcs_to_send,
2666 metadata_to_send=metadata_to_send)
2667 logger.debug('running client: %s', client_cmd_formatted)
2668 client_cmd = shlex.split(client_cmd_formatted)
2669 client_process = subprocess.Popen(client_cmd,
2671 stderr=subprocess.STDOUT,
2672 stdout=test_log_file)
2673 if test_case == 'backends_restart':
2674 test_backends_restart(gcp, backend_service, instance_group)
2675 elif test_case == 'change_backend_service':
2676 test_change_backend_service(gcp, backend_service,
2678 alternate_backend_service,
2679 same_zone_instance_group)
2680 elif test_case == 'gentle_failover':
2681 test_gentle_failover(gcp, backend_service, instance_group,
2682 secondary_zone_instance_group)
2683 elif test_case == 'load_report_based_failover':
2684 test_load_report_based_failover(
2685 gcp, backend_service, instance_group,
2686 secondary_zone_instance_group)
2687 elif test_case == 'ping_pong':
2688 test_ping_pong(gcp, backend_service, instance_group)
2689 elif test_case == 'remove_instance_group':
2690 test_remove_instance_group(gcp, backend_service,
2692 same_zone_instance_group)
2693 elif test_case == 'round_robin':
2694 test_round_robin(gcp, backend_service, instance_group)
2695 elif test_case == 'secondary_locality_gets_no_requests_on_partial_primary_failure':
2696 test_secondary_locality_gets_no_requests_on_partial_primary_failure(
2697 gcp, backend_service, instance_group,
2698 secondary_zone_instance_group)
2699 elif test_case == 'secondary_locality_gets_requests_on_primary_failure':
2700 test_secondary_locality_gets_requests_on_primary_failure(
2701 gcp, backend_service, instance_group,
2702 secondary_zone_instance_group)
2703 elif test_case == 'traffic_splitting':
2704 test_traffic_splitting(gcp, backend_service, instance_group,
2705 alternate_backend_service,
2706 same_zone_instance_group)
2707 elif test_case == 'path_matching':
2708 test_path_matching(gcp, backend_service, instance_group,
2709 alternate_backend_service,
2710 same_zone_instance_group)
2711 elif test_case == 'header_matching':
2712 test_header_matching(gcp, backend_service, instance_group,
2713 alternate_backend_service,
2714 same_zone_instance_group)
2715 elif test_case == 'circuit_breaking':
2716 test_circuit_breaking(gcp, backend_service, instance_group,
2717 same_zone_instance_group)
2718 elif test_case == 'timeout':
2719 test_timeout(gcp, backend_service, instance_group)
2720 elif test_case == 'fault_injection':
2721 test_fault_injection(gcp, backend_service, instance_group)
2723 logger.error('Unknown test case: %s', test_case)
2725 if client_process and client_process.poll() is not None:
2727 'Client process exited prematurely with exit code %d' %
2728 client_process.returncode)
2729 result.state = 'PASSED'
2730 result.returncode = 0
2731 except Exception as e:
2732 logger.exception('Test case %s failed', test_case)
2733 failed_tests.append(test_case)
2734 result.state = 'FAILED'
2735 result.message = str(e)
2738 if client_process.returncode:
2739 logger.info('Client exited with code %d' %
2740 client_process.returncode)
2742 client_process.terminate()
2743 test_log_file.close()
2744 # Workaround for Python 3, as report_utils will invoke decode() on
2745 # result.message, which has a default value of ''.
2746 result.message = result.message.encode('UTF-8')
2747 test_results[test_case] = [result]
2748 if args.log_client_output:
2749 logger.info('Client output:')
2750 with open(test_log_filename, 'r') as client_output:
2751 logger.info(client_output.read())
2752 if not os.path.exists(_TEST_LOG_BASE_DIR):
2753 os.makedirs(_TEST_LOG_BASE_DIR)
2754 report_utils.render_junit_xml_report(test_results,
2758 suite_name='xds_tests',
2761 logger.error('Test case(s) %s failed', failed_tests)
2764 if not args.keep_gcp_resources:
2765 logger.info('Cleaning up GCP resources. This may take some time.')